The future has already arrived. It’s just not evenly distributed yet.
William Gibson
MapReduce is a programming model for performing parallel processing on large datasets. Although it is a powerful technique, its basics are relatively simple.
Imagine we have a collection of items we’d like to process somehow. For instance, the items might be website logs, the texts of various books, image files, or anything else. A basic version of the MapReduce algorithm consists of the following steps:
Use a mapper
function to turn each item into zero or more key/value pairs. (Often this is called the map
function, but there is already a Python function called map
and we don’t need to confuse the two.)
Collect together all the pairs with identical keys.
Use a reducer
function on each collection of grouped values to produce output values for the corresponding key.
MapReduce is sort of passé, so much so that I considered removing this chapter from the second edition. But I decided it’s still an interesting topic, so I ended up leaving it in (obviously).
This is all sort of abstract, so let’s look at a specific example. There are few absolute rules of data science, but one of them is that your first MapReduce example has to involve counting words.
DataSciencester has grown to millions of users! This is great for your job security, but it makes routine analyses slightly more difficult.
For example, your VP of Content wants to know what sorts of things people are talking about in their status updates. As a first attempt, you decide to count the words that appear, so that you can prepare a report on the most frequent ones.
When you had a few hundred users, this was simple to do:
from
typing
import
List
from
collections
import
Counter
def
tokenize
(
document
:
str
)
->
List
[
str
]:
"""Just split on whitespace"""
return
document
.
split
()
def
word_count_old
(
documents
:
List
[
str
]):
"""Word count not using MapReduce"""
return
Counter
(
word
for
document
in
documents
for
word
in
tokenize
(
document
))
With millions of users the set of documents
(status updates) is suddenly too
big to fit on your computer. If you can just fit this into the MapReduce model,
you can use some “big data” infrastructure that your engineers have implemented.
First, we need a function that turns a document into a sequence of key/value pairs. We’ll want our output to be grouped by word, which means that the keys should be words. And for each word, we’ll just emit the value 1
to indicate that this pair corresponds to one occurrence of the word:
from
typing
import
Iterator
,
Tuple
def
wc_mapper
(
document
:
str
)
->
Iterator
[
Tuple
[
str
,
int
]]:
"""For each word in the document, emit (word, 1)"""
for
word
in
tokenize
(
document
):
yield
(
word
,
1
)
Skipping the “plumbing” step 2 for the moment, imagine that for some word we’ve collected a list of the corresponding counts we emitted. To produce the overall count for that word, then, we just need:
from
typing
import
Iterable
def
wc_reducer
(
word
:
str
,
counts
:
Iterable
[
int
])
->
Iterator
[
Tuple
[
str
,
int
]]:
"""Sum up the counts for a word"""
yield
(
word
,
sum
(
counts
))
Returning to step 2, we now need to collect the results from wc_mapper
and feed them to wc_reducer
. Let’s think about how we would do this on just one computer:
from
collections
import
defaultdict
def
word_count
(
documents
:
List
[
str
])
->
List
[
Tuple
[
str
,
int
]]:
"""Count the words in the input documents using MapReduce"""
collector
=
defaultdict
(
list
)
# To store grouped values
for
document
in
documents
:
for
word
,
count
in
wc_mapper
(
document
):
collector
[
word
]
.
append
(
count
)
return
[
output
for
word
,
counts
in
collector
.
items
()
for
output
in
wc_reducer
(
word
,
counts
)]
Imagine that we have three documents ["data science", "big data", "science fiction"]
.
Then wc_mapper
applied to the first document yields the two pairs ("data", 1)
and ("science", 1)
. After we’ve gone through all three documents, the collector
contains:
{
"data"
:
[
1
,
1
],
"science"
:
[
1
,
1
],
"big"
:
[
1
],
"fiction"
:
[
1
]}
Then wc_reducer
produces the counts for each word:
[(
"data"
,
2
),
(
"science"
,
2
),
(
"big"
,
1
),
(
"fiction"
,
1
)]
As mentioned earlier, the primary benefit of MapReduce is that it allows us to distribute computations by moving the processing to the data. Imagine we want to word-count across billions of documents.
Our original (non-MapReduce) approach requires the machine doing the processing to have access to every document. This means that the documents all need to either live on that machine or else be transferred to it during processing. More important, it means that the machine can process only one document at a time.
Possibly it can process up to a few at a time if it has multiple cores and if the code is rewritten to take advantage of them. But even so, all the documents still have to get to that machine.
Imagine now that our billions of documents are scattered across 100 machines. With the right infrastructure (and glossing over some of the details), we can do the following:
Have each machine run the mapper on its documents, producing lots of key/value pairs.
Distribute those key/value pairs to a number of “reducing” machines, making sure that the pairs corresponding to any given key all end up on the same machine.
Have each reducing machine group the pairs by key and then run the reducer on each set of values.
Return each (key, output) pair.
What is amazing about this is that it scales horizontally. If we double the number of machines, then (ignoring certain fixed costs of running a MapReduce system) our computation should run approximately twice as fast. Each mapper machine will only need to do half as much work, and (assuming there are enough distinct keys to further distribute the reducer work) the same is true for the reducer machines.
If you think about it for a minute, all of the word count–specific code in the previous example is contained in the wc_mapper
and wc_reducer
functions. This means that with a couple of changes we have a much more general framework (that still runs on a single machine).
We could use generic types to fully type-annotate
our map_reduce
function, but it would end up being
kind of a mess pedagogically, so in this chapter
we’ll be much more casual about our type annotations:
from
typing
import
Callable
,
Iterable
,
Any
,
Tuple
# A key/value pair is just a 2-tuple
KV
=
Tuple
[
Any
,
Any
]
# A Mapper is a function that returns an Iterable of key/value pairs
Mapper
=
Callable
[
...
,
Iterable
[
KV
]]
# A Reducer is a function that takes a key and an iterable of values
# and returns a key/value pair
Reducer
=
Callable
[[
Any
,
Iterable
],
KV
]
Now we can write a general map_reduce
function:
def
map_reduce
(
inputs
:
Iterable
,
mapper
:
Mapper
,
reducer
:
Reducer
)
->
List
[
KV
]:
"""Run MapReduce on the inputs using mapper and reducer"""
collector
=
defaultdict
(
list
)
for
input
in
inputs
:
for
key
,
value
in
mapper
(
input
):
collector
[
key
]
.
append
(
value
)
return
[
output
for
key
,
values
in
collector
.
items
()
for
output
in
reducer
(
key
,
values
)]
Then we can count words simply by using:
word_counts
=
map_reduce
(
documents
,
wc_mapper
,
wc_reducer
)
This gives us the flexibility to solve a wide variety of problems.
Before we proceed, notice that wc_reducer
is just summing the values corresponding to each key. This kind of aggregation is common enough that it’s worth abstracting it out:
def
values_reducer
(
values_fn
:
Callable
)
->
Reducer
:
"""Return a reducer that just applies values_fn to its values"""
def
reduce
(
key
,
values
:
Iterable
)
->
KV
:
return
(
key
,
values_fn
(
values
))
return
reduce
After which we can easily create:
sum_reducer
=
values_reducer
(
sum
)
max_reducer
=
values_reducer
(
max
)
min_reducer
=
values_reducer
(
min
)
count_distinct_reducer
=
values_reducer
(
lambda
values
:
len
(
set
(
values
)))
assert
sum_reducer
(
"key"
,
[
1
,
2
,
3
,
3
])
==
(
"key"
,
9
)
assert
min_reducer
(
"key"
,
[
1
,
2
,
3
,
3
])
==
(
"key"
,
1
)
assert
max_reducer
(
"key"
,
[
1
,
2
,
3
,
3
])
==
(
"key"
,
3
)
assert
count_distinct_reducer
(
"key"
,
[
1
,
2
,
3
,
3
])
==
(
"key"
,
3
)
and so on.
The content VP was impressed with the word counts and asks what else you can learn from people’s status updates. You manage to extract a dataset of status updates that look like:
status_updates
=
[
{
"id"
:
2
,
"username"
:
"joelgrus"
,
"text"
:
"Should I write a second edition of my data science book?"
,
"created_at"
:
datetime
.
datetime
(
2018
,
2
,
21
,
11
,
47
,
0
),
"liked_by"
:
[
"data_guy"
,
"data_gal"
,
"mike"
]
},
# ...
]
Let’s say we need to figure out which day of the week people talk the most about data science. In order to find this, we’ll just count how many data science updates there are on each day of the week. This means we’ll need to group by the day of week, so that’s our key. And if we emit a value of 1
for each update that contains “data science,” we can simply get the total number using sum
:
def
data_science_day_mapper
(
status_update
:
dict
)
->
Iterable
:
"""Yields (day_of_week, 1) if status_update contains "data science" """
if
"data science"
in
status_update
[
"text"
]
.
lower
():
day_of_week
=
status_update
[
"created_at"
]
.
weekday
()
yield
(
day_of_week
,
1
)
data_science_days
=
map_reduce
(
status_updates
,
data_science_day_mapper
,
sum_reducer
)
As a slightly more complicated example, imagine we need to find out for each user the most common word that she puts in her status updates. There are three possible approaches that spring to mind for the mapper
:
Put the username in the key; put the words and counts in the values.
Put the word in the key; put the usernames and counts in the values.
Put the username and word in the key; put the counts in the values.
If you think about it a bit more, we definitely want to group by username
, because we want to consider each person’s words separately. And we don’t want to group by word
, since our reducer will need to see all the words for each person to find out which is the most popular. This means that the first option is the right choice:
def
words_per_user_mapper
(
status_update
:
dict
):
user
=
status_update
[
"username"
]
for
word
in
tokenize
(
status_update
[
"text"
]):
yield
(
user
,
(
word
,
1
))
def
most_popular_word_reducer
(
user
:
str
,
words_and_counts
:
Iterable
[
KV
]):
"""
Given a sequence of (word, count) pairs,
return the word with the highest total count
"""
word_counts
=
Counter
()
for
word
,
count
in
words_and_counts
:
word_counts
[
word
]
+=
count
word
,
count
=
word_counts
.
most_common
(
1
)[
0
]
yield
(
user
,
(
word
,
count
))
user_words
=
map_reduce
(
status_updates
,
words_per_user_mapper
,
most_popular_word_reducer
)
Or we could find out the number of distinct status-likers for each user:
def
liker_mapper
(
status_update
:
dict
):
user
=
status_update
[
"username"
]
for
liker
in
status_update
[
"liked_by"
]:
yield
(
user
,
liker
)
distinct_likers_per_user
=
map_reduce
(
status_updates
,
liker_mapper
,
count_distinct_reducer
)
Recall from “Matrix Multiplication” that given an [n, m]
matrix A
and an [m, k]
matrix B
, we can multiply them to form an [n, k]
matrix C
, where the element of C
in row i
and column j
is given by:
C
[
i
][
j
]
=
sum
(
A
[
i
][
x
]
*
B
[
x
][
j
]
for
x
in
range
(
m
))
This works if we represent our matrices as lists of lists, as we’ve been doing.
But large matrices are sometimes sparse, which means that most of their elements equal 0. For large sparse matrices, a list of lists can be a very wasteful representation. A more compact representation stores only the locations with nonzero values:
from
typing
import
NamedTuple
class
Entry
(
NamedTuple
):
name
:
str
i
:
int
j
:
int
value
:
float
For example, a 1 billion × 1 billion matrix has 1 quintillion entries, which would not be easy to store on a computer. But if there are only a few nonzero entries in each row, this alternative representation is many orders of magnitude smaller.
Given this sort of representation, it turns out that we can use MapReduce to perform matrix multiplication in a distributed manner.
To motivate our algorithm, notice that each element A[i][j]
is only used to compute the elements of C
in row i
, and each element B[i][j]
is only used to compute the elements of C
in column j
. Our goal will be for each output of our reducer
to be a single entry of C
, which means we’ll need our mapper to emit keys identifying a single entry of C
. This suggests the following:
def
matrix_multiply_mapper
(
num_rows_a
:
int
,
num_cols_b
:
int
)
->
Mapper
:
# C[x][y] = A[x][0] * B[0][y] + ... + A[x][m] * B[m][y]
#
# so an element A[i][j] goes into every C[i][y] with coef B[j][y]
# and an element B[i][j] goes into every C[x][j] with coef A[x][i]
def
mapper
(
entry
:
Entry
):
if
entry
.
name
==
"A"
:
for
y
in
range
(
num_cols_b
):
key
=
(
entry
.
i
,
y
)
# which element of C
value
=
(
entry
.
j
,
entry
.
value
)
# which entry in the sum
yield
(
key
,
value
)
else
:
for
x
in
range
(
num_rows_a
):
key
=
(
x
,
entry
.
j
)
# which element of C
value
=
(
entry
.
i
,
entry
.
value
)
# which entry in the sum
yield
(
key
,
value
)
return
mapper
And then:
def
matrix_multiply_reducer
(
key
:
Tuple
[
int
,
int
],
indexed_values
:
Iterable
[
Tuple
[
int
,
int
]]):
results_by_index
=
defaultdict
(
list
)
for
index
,
value
in
indexed_values
:
results_by_index
[
index
]
.
append
(
value
)
# Multiply the values for positions with two values
# (one from A, and one from B) and sum them up.
sumproduct
=
sum
(
values
[
0
]
*
values
[
1
]
for
values
in
results_by_index
.
values
()
if
len
(
values
)
==
2
)
if
sumproduct
!=
0.0
:
yield
(
key
,
sumproduct
)
For example, if you had these two matrices:
A
=
[[
3
,
2
,
0
],
[
0
,
0
,
0
]]
B
=
[[
4
,
-
1
,
0
],
[
10
,
0
,
0
],
[
0
,
0
,
0
]]
you could rewrite them as tuples:
entries
=
[
Entry
(
"A"
,
0
,
0
,
3
),
Entry
(
"A"
,
0
,
1
,
2
),
Entry
(
"B"
,
0
,
0
,
4
),
Entry
(
"B"
,
0
,
1
,
-
1
),
Entry
(
"B"
,
1
,
0
,
10
)]
mapper
=
matrix_multiply_mapper
(
num_rows_a
=
2
,
num_cols_b
=
3
)
reducer
=
matrix_multiply_reducer
# Product should be [[32, -3, 0], [0, 0, 0]].
# So it should have two entries.
assert
(
set
(
map_reduce
(
entries
,
mapper
,
reducer
))
==
{((
0
,
1
),
-
3
),
((
0
,
0
),
32
)})
This isn’t terribly interesting on such small matrices, but if you had millions of rows and millions of columns, it could help you a lot.
One thing you have probably noticed is that many of our mappers seem to include a bunch of extra information. For example, when counting words, rather than emitting (word, 1)
and summing over the values, we could have emitted (word, None)
and just taken the length.
One reason we didn’t do this is that, in the distributed setting, we sometimes want to use combiners to reduce the amount of data that has to be transferred around from machine to machine. If one of our mapper machines sees the word data 500 times, we can tell it to combine the 500 instances of ("data", 1)
into a single ("data", 500)
before handing off to the reducing machine. This results in a lot less data getting moved around, which can make our algorithm substantially faster still.
Because of the way we wrote our reducer, it would handle this combined data correctly. (If we’d written it using len
, it would not have.)
Like I said, MapReduce feels a lot less popular now than it did when I wrote the first edition. It’s probably not worth investing a ton of your time.
That said, the most widely used MapReduce system is Hadoop. There are various commercial and noncommercial distributions and a huge ecosystem of Hadoop-related tools.
Amazon.com offers an Elastic MapReduce service that’s probably easier than setting up your own cluster.
Hadoop jobs are typically high-latency, which makes them a poor choice for “real-time” analytics. A popular choice for these workloads is Spark, which can be MapReduce-y.