Three different implementations of MapReduce
So far as I can see, there are three implementations of MapReduce that matter for enterprise analytic use – Hadoop, Greenplum’s, and Aster Data’s.* Hadoop has of course been available for a while, and used for a number of different things, while Greenplum’s and Aster Data’s versions of MapReduce – both in late-stage beta – have far fewer users.
*Perhaps Nokia’s Disco or another implementation will at some point join the list.
Earlier this evening I posted some Mike Stonebraker criticisms of MapReduce. It turns out that they aren’t all accurate across all MapReduce implementations. So this seems like a good time for me to stop stalling and put up a few notes about specific features of different MapReduce implementations. Here goes.
-
Hadoop doesn’t have much to do with SQL or traditional database management. As far as I know, all of Mike’s criticisms of MapReduce are indeed accurate about Hadoop. Edit: Oops. A comment on the post linked above suggests otherwise.
-
Aster Data’s MapReduce implementation is closely tied to SQL, and indeed described as “in-database MapReduce.” Every Map or Reduce step is a embedded in a SELECT clause.
-
Even so, Aster Data Map and Reduce functions can be expressed in a broad variety of programming languages, such as Java, C++, C, Python, Perl, or R.
-
Greenplum’s MapReduce implementation lives outside the Greenplum database; its controlling syntax is in the form of a markup language. Inputs can be database tables or other files. If they’re database tables, SQL is used to talk to them.
-
In particular, contrary to what Mike thought, Greenplum’s (and presumably also Aster Data’s) MapReduce implementations can use SQL Analytics syntax.
-
Greenplum’s Map and Reduce steps can also be written in a lot of programming languages, although I’m not sure whether the list is as long as Aster Data’s.
-
Contrary to what Mike believed, Greenplum’s MapReduce doesn’t need to write every intermediate result set to disk. The same is surely true of Aster Data’s implementation, given that a whole MapReduce program is just a single SQL statement.
-
Greenplum’s MapReduce implementation offers pipelining, so that a program can be working on multiple Map and Reduce steps at once. Again, I’d guess the same is true of Aster Data’s implementation.
Aster Data has actually posted quite a bit about MapReduce, including in its excellent blog, which has joined Vertica’s more academic Database Column as being among the best vendor blogs in the business. There’s also an excellent Aster Data MapReduce white paper, if you can stand to register for it, which among other virtues illustrates Aster’s MapReduce syntax.
Greenplum’s MapReduce marketing materials, while sparser, aren’t bad either. The syntax isn’t published yet, however, so here’s a documentation excerpt Greenplum helpfully emailed to me.
# The following map reduce script takes as input two tables:
# – documents (doc_id integer, url text, data text)
# – keywords (keyword_id integer, keyword text)
#
# The keywords are “searched for” in the documents data and the results
# of url, data and keyword found are printed to the screen. A given
# keyword can actually be composed of multiple terms:
# ie “computer software” is a keyword that contains two terms.
%YAML 1.1
—
VERSION: 1.0.0.1
DATABASE: webdata
USER: jsmith
DEFINE:
# First we declare both of the input tables.
# Alternative input formats include accessing external files
# or executing arbitrary SQL.
– INPUT:
NAME: doc
QUERY: select * from documents
– INPUT:
NAME: kw
QUERY: select * from keywords
# Next we create MAP functions to extract “terms” from both documents
and from
# keywords. This implementation simply splits on whitespace, in
general it would be
# possible to make use of a python library like nltk (the natural
language toolkit) to
# perform more complex tokenization and word stemming.
– MAP:
NAME: doc_map
LANGUAGE: python
FUNCTION: |
i = 0 # “i” tracks the index of a word within the
document
terms = {} # a hash of terms with their indices within the
document
# Lower-case and split the text string on space
for term in data.lower().split():
i = i + 1 # Increment i
# Check for the term in the terms list:
#
# – If stem word already exists, append the i value to the
array entry
# corresponding to the term. This will allow for multiple
times for a
# word’s occurance to show up.
#
# – If stem word does not already exist in the terms list for
the line of
# text currently being processed, the stem word is added to
the dictionary
# along with the position i.
#
# Example:
# data: “a computer is a machine that manipulates data”
# “a” [1, 4]
# “computer” [2]
# “machine” [3]
# …
if term in terms:
terms[term] += ‘,’+str(i)
else:
terms[term] = str(i)
# Return multiple lines for each document. Each line consists of
the doc_id, a
# term and the positions in the data where the term appeared.
#
# Example:
# (doc_id => 100, term => “a”, [1,4]
# (doc_id => 100, term => “computer”, [2]
# …
for term in terms:
yield([doc_id, term, terms[term]])
OPTIMIZE: STRICT IMMUTABLE
MODE: MULTI
PARAMETERS:
– doc_id integer
– data text
RETURNS:
– doc_id integer
– term text
– positions text
# The function for keywords is almost identical to the one for
documents,
# but we also want a count of the number of terms in the keyword.
– MAP:
NAME: kw_map
LANGUAGE: python
FUNCTION: |
i = 0
terms = {}
for term in keyword.lower().split():
i = i + 1
if term in terms:
terms[term] += ‘,’+str(i)
else:
terms[term] = str(i)
# note we output 4 values including “i” the total count
for term in terms:
yield([keyword_id, i, term, terms[term]])
MODE: MULTI
OPTIMIZE: STRICT IMMUTABLE
PARAMETERS:
– keyword_id integer
– keyword text
RETURNS:
– keyword_id integer
– nterms integer
– term text
– positions text
# A “TASK” is an object that defines an entire “input/map/reduce” stage
within a map
# reduce pipeline, it is like an execution except that it is not
immediately executed
# and can be used as inputs to further proccessing stages.
#
# Identify a task called doc_prep which takes in the “doc” source (INPUT
defined above)
# This task uses the mapper called doc_map that returns doc_id, term,
[term_position] records
– TASK:
NAME: doc_prep
SOURCE: doc
MAPPER: doc_map
# Identify a task called kw_prep which takes in the “kw” source (INPUT
defined above)
# This task uses the mapped called kw_map that returns kw_id, term,
[term_position] records
– TASK:
NAME: kw_prep
SOURCE: kw
MAPPER: kw_map
# One of the advantages of Greenplum MapReduce is that it can be freely
mixed with SQL
# operations either as an input for MapReduce stages or as processing
over MapReduce
# stages.
#
# Identify an input source that will be created from a sql join of the
doc_prep output
# and the kw_prep output where terms are the same. This creates a list
of the “candidate”
# keywords in a document, ie any keyword that shares at least one term
with the document.
– INPUT:
NAME: term_join
QUERY: |
SELECT doc.doc_id, kw.keyword_id, kw.term, kw.nterms,
doc.positions as doc_positions,
kw.positions as kw_positions
FROM doc_prep doc INNER JOIN kw_prep kw ON (doc.term =
kw.term)
# In Greenplum MapReduce a REDUCER is defined with one or more
functions.
#
# A reducer has an initial ‘state’ variable defined for each grouping
key that is
# adjusted by every value in that key grouping by means of a TRANSITION
function.
#
# If present an optional (name pending) function can be defined that
combines multiple
# ‘state’ variables. This allows the TRANSITION function to be executed
locally on the
# mapping segments and only ship the accumulated ‘state’ over the
network thus reducing
# network bandwidth.
#
# If present an optional FINALIZER function can be used to perform final
computation on a
# state and emit one or more rows of output from the state.
#
# – Create a reducer function that is called term_reducer, with
transition function
# called term_transition and finalizer called term_finalizer
– REDUCER:
NAME: term_reducer
TRANSITION: term_transition
FINALIZER: term_finalizer
– TRANSITION:
NAME: term_transition
LANGUAGE: python
PARAMETERS:
– state text
– term text
– nterms integer
– doc_positions text
– kw_positions text
FUNCTION: |
# STATE
# – initial value: ”
# – is a colon delimited set of keyword positions
# – keyword positions are a comma delimited set of integers
#
# an example STATE could be ‘1,3,2:4:’
#
# If we have an existing state split it into the set of keyword
positions
# Otherwise construct an set of nterms keyword positions – all
empty
if state:
kw_split = state.split(‘:’)
else:
kw_split = []
for i in range(0,nterms):
kw_split.append(”)
# kw_positions is a comma delimited field of integers indicating
what
# positions a single term occur within a given keyword.
Splitting based on ‘,’
# converts the string into a python list.
# add doc_positions for the current term
for kw_p in kw_positions.split(‘,’):
kw_split[int(kw_p)-1] = doc_positions
# reconstruct the delimited state
# This section takes each element in the kw_split array and
strings them together
# placing a “:” in between each element from the array.
#
# Example:
# For the keyword “computer software computer hardware”, the
kw_split array
# matched up to the document data “in the business of computer
software software
# engineers” would look like:
# [‘5’, ‘6,7’, ‘5’, ”]
# and the outstate would look like:
# 5:6,7:5:
outstate = kw_split[0]
for s in kw_split[1:]:
outstate = outstate + ‘:’ + s
return outstate
– FINALIZER:
NAME: term_finalizer
LANGUAGE: python
RETURNS:
– count integer
FUNCTION: |
if not state:
return 0
kw_split = state.split(‘:’)
# We adjust each document position list based on the offset of
the term in the
# keyword and then intersect all resulting lists
# This function does the following:
# 1) Splits kw_split which looks like 1,5,7:2,8 on “:”
# which creates ‘1,5,7’ and ‘2,8’
# 2) For each group of positions in kw_split, splits the set on
‘,’ to create
# [‘1′,’5′,’7’] from Set 0: 1,5,7 and eventually [‘2’, ‘8’]
from Set 1: 2,8
# 3) Checks for empty strings
# 4) Adjusts the split sets by subtracting the position of the
set in the
# kw_split array
# [‘1′,’5′,’7’] – 0 from each element = [‘1′,’5′,’7’]
# [‘2’, ‘8’] – 1 from each element = [‘1’, ‘7’]
# 5) Resulting arrays after subtracting the offset in step 4
will be intersected
# and their overlaping values kept:
[‘1′,’5′,’7’].intersect[‘1’, ‘7’] = [1,7]
# 6) Determine the length of the intersection and this will be
the number of
# times that an entire keyword (with all its pieces) matches
in the document
# data.
#
# Further extensions could involve more in the way of fuzzy
matching.
previous = None
for i in range(0,len(kw_split)):
isplit = kw_split[i].split(‘,’)
if any(map(lambda(x): x == ”, isplit)):
return 0
adjusted = set(map(lambda(x): int(x)-i, isplit))
if (previous):
previous = adjusted.intersection(previous)
else:
previous = adjusted
# return the final count
if previous:
return len(previous)
return 0
– TASK:
NAME: term_match
SOURCE: term_join
REDUCER: term_reducer
– INPUT:
NAME: final_output
QUERY: |
SELECT doc.*, kw.*, tm.count
FROM documents doc, keywords kw, term_match tm
WHERE doc.doc_id = tm.doc_id
AND kw.keyword_id = tm.keyword_id
AND tm.count > 0
EXECUTE:
– RUN:
SOURCE: final_output
TARGET: STDOUT
Comments
3 Responses to “Three different implementations of MapReduce”
Leave a Reply
Phew! It took me a while to scroll down here. 🙂
Great discussion on this topic. Glad to see the community correcting misperceptions on MapReduce. Just a few points on Aster:
Re: “SQL analytics”, Aster can:
– Implement SQL analytics-like functionality using In-Database MapReduce
– Feed MapReduce results into SQL analytic functions
– Pass results from MapReduce to analytic functions
Re: “materializing intermediate result sets” and “pipelining”
Aster can avoid materializing intermediate results on disk – this is consistent with Aster’s philosophy for general SQL, as well. Similarly, In-Database MapReduce supports full pipelining of analytics/MR to avoid materialization of intermediate results on disk by passing on data from one phase to the next (we avoid multiple passes of data).
Re: “Aster support of different languages for Map and Reduce steps”
We’ll write more about this separately but it’s worth noting that Aster is not using Postgres UDFs at all in its MapReduce implementation. Instead, we can run in an arbitrary language runtime, and would rather not force the developer to choose one from PostgreSQL’s collection. This lets Aster (and the developer) support/utilize all the popular languages (Java/C/C++/Python/Perl…). It is NOT PL/R, PL/Python, etc…
[…] see if you like it. But for serious use, I don’t know why you wouldn’t prefer MapReduce more closely integrated into a DBMS. Share: These icons link to social bookmarking sites where readers can share and discover new web […]
[…] has had integrated MapReduce for quite a […]