Saturday, February 27, 2010

MapReduce in a Nutshell: Map

You have a problem to solve, but the set of input data is far too large to solve it by simply looping over all the data.

What if you broke the input data into chunks and analysed only a chunk at a time, somehow aggregating the intermediate results into something meaningful at a later stage?

How do you perform this partial analysis in such a way that an aggregation is possible?

The standard Wikipedia example counts all the words in a collection of books. Only problem is: the collection of files is too large, or the intermediate word count is too large to run on one conventional computer.

So. First of all, we have the superset of books: the library. We need to break it up. Maybe if it was 1/5th the size, we could analyze it in a single process? We will call each 1/5th a slice of the library.

We still need to define our function that will do the first step of analysis on each book, remembering it will be called once* for each and every book in the library.
void Map(Book book)
{
foreach (string word in book.Words)
EmitIntermediate(word, 1);
}


Now we have the function, where do we put it? On a Mapper object, of course. We will create one* mapper for each slice of the library, then we will invoke the Map() function once per book in that library slice. Then for every word in the book, we will call the mysterious EmitIntermediate() function.

For simplicity, let's just assume that EmitIntermediate delegates its call to an Emitter object. We will probably have one Emitter per library slice; each Mapper will have its own Emitter. This is good for isolationg Mappers: it will mean we're not sharing state between mappers so there won't be any concurrency issues even though we're dealing with multiple Mappers working in parallel.

Here is a possible implementation of an Emitter for our example - it differs from the standard example in that it performs some arithmetic continuously updating the slice's word count:
class Emitter
{
private Dictionary intermediate; // constructed elsewhere
public void Emit(string key, long value)
{
if (!intermediate.ContainsKey(key))
intermediate.Add(key, 0);
intermediate[key].Value += value;
}
}


Here is another possible implementation of an Emitter - it's more like the standard example as it stores a list of values, to be summed up at a later stage. Because we know that "1" is the only value we ever emit, we can reduce the amount of storage space required by defining the values as a list of bytes.
class AnotherEmitter
{
private Dictionary<string, List<byte>> intermediate; // constructed elsewhere
public void Emit(string key, byte value)
{
if (!intermediate.ContainsKey(key))
intermediate.Add(key, new List<byte>());
intermediate[key].Add(value);
}
}


There's no requirement for the Emitter's keys and values to be backed by an object in memory, this is just a simplification to explain the MapReduce concept.

Let's start this program up and run it. We take our library and divide it into slices. For each slice we create a Mapper and an Emitter. For each book in the slice we call the Map() function, which calls EmitIntermediate() for every word it finds. Once all the slices have been mapped, we inspect the Emitters. There's one Emitter per slice, and it contains all of the words for all the books in that slice.

Here's a concrete example of our entire library:
SLICE:0
BOOK:"Fight Club"
WORDS:"Sticking feathers up your butt does not make you a chicken"
BOOK:"The Bible"
WORDS:"In the beginning God created the heaven and the earth"
SLICE:1
BOOK:"Still Life With Woodpecker"
WORDS:"who cared which crossed the road first, the chicken or the egg"
BOOK:"FIASCO"
WORDS:"would not be reported till years down the road"
SLICE:N
BOOK:*
WORDS:*


When we example our intermediate values for the Emitter on slice 0, we find "the" 3 times (all from the Bible) and "chicken" just 1 time. Unsurprisingly, not a lot of cross over between Fight Club and the Bible. The Emitter on slice 1 has slightly different results: "the" comes in 4 times (3 times from Still Life, and 1 time from FIASCO), but "chicken" is there just 1 time (thanks again to Still Life).

What we need now is something that will aggregate our intermediate results so that we get 6x "the"s and 2x "chicken"s. In the next post, it's time for the Reduce step.

* Lies to children: if we expand our example to become more robust, we could end up retrying to Map() a book that previously failed, even retrying an entire slice with a brand new Mapper if the compute node failed.