MapReduce is a programming model and an associated implementation for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key.
Programs written in this functional style are automatically parallelised and executed on a large cluster of commodity machines. The run-time system takes care of the details of partitioning the input data, scheduling the program's execution across a set of machines, handling machine failures, and managing the required inter-machine communication. This allows programmers without any experience with parallel and distributed systems to easily utilise the resources of a large distributed system.
The abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages. Map and reduce operations allows easy parallelisation at the same time that re-execution serves as the primary mechanism for fault-tolerance.
The computation takes a set of input key/value pairs, and produces a set of output key/value pairs. The user of the MapReduce library expresses the computations as two functions: Map and Reduce.
Map, written by the user, takes an input pair and produces a set of intermediate key/value pairs. The MapReduce library groups together all intermediate values associated with the same intermediate Key I and passes them to the Reduce function.
The Reduce function, also written by the user, accepts and intermediary key I and a set of values for that key. It merges together these values to form a possibly smaller set of values. Typically just zero or one output value is produced per Reduce invocation.
- Distributed Grep: The map function emits a line if it matches a supplied pattern. The reduce function is an identity function that just copies the supplied intermediate data to the output.
- Count of URL access frequency: The map function processes logs of a web page requests and outputs
<URL, 1>
. The reduce function adds together all values for the same URL and emits a<URL, total count>
pair. - Distributed sort: The map functions extracts the key from each record, and emits a
<key, record>
pair. The reduce function emits all pairs unchanged.
One implementation may be suitable for a small shared-memory machine, another for a large NUMA multi-processor, and yet another for an even larger collection of networked machines.
The Map invocations are distributed across multiple machines by automatically partitioning the input data into a set of M splits. The input splits can be processed in parallel by different machines. Reduce invocations are distributed by partitioning the intermediate key space into R pieces using a partitioning function. The number of partitions (R) and the partitioning function are specified by the user.
Overall flow:
- The MapReduce library in the user program splits the input files into M pieces typically 16MB per piece. It then starts up many copies of the program on a cluster of machines.
- One of the copies of the program is special, the master. The rest are workers that are assigned work by the master. There are M map tasks and R reduce tasks to assign. The master picks idle workers and assigns each one a map task or reduce task.
- A worker who is assigned a map task reads the contents of the corresponding input split. It parses key/value pairs out of the input data and passes each pair to the user-defined Map function. The intermediate key/value pairs produced by the Map function are buffered in memory.
- Periodically, the buffered pairs are written to local disk, partitioned into R regions by the partitioning function. The locations of these buffered pairs on the local disk are passed back to the master, who is responsible for forwarding these locations to the reduce workers.
- When a reduce worker is notified by the master about these locations, it uses remote procedure calls to read the buffered data from the local disks of the map workers. When a reduce worker has read all intermediate data, it sorts it by the intermediate keys so that all occurrences of the same key are grouped together.
- The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate values to the user's Reduce function. The output of the Reduce function is appended to a final output file for this reduce partition.
- When all map tasks and reduce tasks have been completed, the master wakes up the user program. At this point the MapReduce call in the user program returns back to the user code.
After successful completion, the output of the MapReduce execution is available in the R output files (one per reduce task, with the file names as specified by the user). Users usually don't need to combine these R output files into one file, they can either use another MapReduce call or use a distributed application to process them.
For each map task and reduce task, master stores the state (idle, in-progress, or completed) and the identity of the worker machine.
For each completed map task, the master stores the locations and sizes of the R intermediate file regions produced by the map task. Updates to this location and size information are received as map tasks are compelted. The information is pushed incrementally to workers that have in-progress reduce tasks.
The master pings every worker periodically. If no response is received from a worker in a certain amount of time, the master marks the worker as failed. Any map tasks completed by the worker are reset back to their initial idle state, and therefore become eligible for scheduling on other workers. Similarly, any map task or reduce task in progress on a failed worker is also reset to idle and becomes eligible for rescheduling.
Completed map tasks are re-executed on failure because their output is stored on the local disks of the failed machine and is therefore inaccessible. Completed reduce tass do not need to be re-executed since their output is stored in a global file system.
When a map task is executed first by worker A, and later executed by worker B (because A failed), all workers executing reduce tasks are notified of the re-execution. Any reduce task that has not already read the data from worker A will read the data from worker B.
The MapReduce master simply re-executes the work done by unreachable worker machines and continues to make forward progress.
It's easy to make the master write periodic checkpoints of the master data structures. If the master task dies, a new copy can be started from the last checkpointed state. Current implementation aborts the MapReduce computation if the master fails.
The vast majority of map and reduce operators are deterministic, and the fact that the semantics are equivalent to a sequential execution in this case makes it very easy for programmers to reason about their program's behaviour.
Network bandwith can be conserved by taking advantage of the fact that the input data (managed by GFS) is stored on the local disks of the machine that make up the cluster. Most input data is read locally and consumes no network bandwidth.
The map phase is subdivided into M pieces and reduce phase is divided into R Pieces. Having each worker perform many different tasks improves dynamic load balancing and also speeds up recovery when a worker fails.
One of the common causes that lengthens the total time taken for a MapReduce operation is a "straggler", a machine that takes an unusually long time to complete one of the last few map or reduce tasks in the computation.
There is a general mechanism to alleviate the problem of stragglers. When a MapReduce operation is close to completion, the master schedules backup executions of the remaining in-progress tasks. The task is marked as completed whenever either the primary or the backup execution completes. A distributed sorting program can take up to 44% longer without the backup task mechanism.
Users of MapReduce specify the number of reduce task/output files that they desire (R). A default partitioning function is provided that uses hashing (peg: hash(key) mod R
), which tends to result in fairly well-balanced partitions. In some cases, it is useful to partition data by some other function of the key, for example sometimes the output keys are URLs, and is convenient to have a single host end up in the same output file. A partitioning function can be provided to MapReduce, for example hash(Hostname(urlkey)) mod R
.
Within a given partition, the intermediate key/value pairs are guaranteed to be processed in increasing key order, which makes it easy to generate a sorted output file per partition.
MapReduce allows the user to specify a Combiner function that does partial merging of the data before it is sent over the network.
The Combiner function is executed on each machine that performs a map task. The difference between a reduce function and a combiner function is how the MapReduce library handles the output of the function. The output of a reduce function is written to the final output file. The output of a combiner function is written to an intermediate field that will be sent to a reduce task.
"Text" mode input treats each line as a key/value pair: the key is the offset in the file and the value is the contents of the line. Another common supported format stores a sequence of a key/value pairs sorted by key.
Users can add support for a new input type by providing an implementation of a simple reader interface, though most users just use one of a small number of predefined input types.
In a similar fashion, a set of output types for producing data in different formats is supported.
Users of MapReduce may find convenient to produce auxiliary files. The application writer can make such side-effects atomic and idempotent.
Sometimes there are bugs in user code that can cause the Map or Reduce functions to crash deterministically on certain records, which prevents the whole MapReduce operation from completing.
There is an optional mode of execution where the MapReduce library detects which records cause deterministic crashes and skips these records in order to make forward progress. Each worker process installs a signal handler that catches segmentation violations and bus errors. Before invoking a user Map or Reduce operation, the MapReduce library stores the sequence number of the argument in a variable. The signal handler sends a "last gasp" UDP packet that contains the sequence number to the master. When the master has seen more than one failure on a particular record, it indicates that the record should be skipped.
To help facilitate debugging, profiling, and small-scale testing, there is an alternative implementation of the MapReduce library that sequentially executes all of the work for a MapReduce operation on the local machine.
The master runs an internal HTTP server and exports a set of status pages for human consumption.
The MapReduce library provides a counter facility to count occurrences of various events, like the count of total number of words processed, documents indexed, etc.
The counter values from individual worker machines are periodically propagated to the master. The master aggregates the counter values from successful map and reduce tasks and returns them to the user code when the MapReduce operation is completed. The master eliminates the effects of duplicate executions of the same map or reduce task to avoid double counting.
MapReduce has been used at Google for a wide range of domains including:
- Large-scale machine learning problems.
- Clustering problems for the Google News and Froggle products.
- Extraction of data used to produce reports of popular queries.
- Extraction of the properties of web pages for new experiments and products.
- Large-scale graph computations.
MapReduce has been so successful because it makes it possible to write a simple program and run it efficiently on a thousand machines int he course of half an hour, greatly speeding up the development and prototyping cycle. It allows programmers who have no experience with distributed and/or parallel systems to exploit large amounts of resources easily.
Probably the most significant use of MapReduce is the rewrite of the production indexing system that produces the data structures used for the Google web search service. Using MapReduce has provided several benefits:
- The indexing code is simpler, smaller, and easier to understand, because the code that deals with fault tolerance, distribution, and parallelisation is hidden within the MapReduce library.
- The performance of the MapReduce library is good enough that it can keep conceptually unrelated computations separate, instead of mixing them together to avoid extra passes over the data.
- The indexing process has become much easier to operate, because most of the problems caused by machine failures, slow machines, and networking hiccups are dealt with automatically by the MapReduce library without operator intervention.
MapReduce exploits a restricted programming model to parallelise the user program automatically and to provide transparent fault-tolerance.
By restricting the programming model, the MapReduce framework is able to partition the problem into a larger number of fine-grained tasks. These tasks are dynamically scheduled on available workers so that faster worker process more tasks. The restricted programming model also allows to schedule redundant execution of tasks near the end of the job which greatly reduces completion time in the presence of non-uniformities.
The success of MapReduce is attributed to:
- The model is easy to use, even for programmers without experience with parallel and distributed systems, since it hides the details of parallelisation, fault-tolerance, locality optimisation, and load balancing.
- A large variety of problems are easily expressible as MapReduce computations.
- There is an implementation of MapReduce that scales to large clusters of machines compromising thousand of machines.
Some learnings:
- Restricting the programming model makes it easy to parallelise and distribute computations and to make such computations fault-tolerance.
- Network bandwidth is a scarce resource. A number of optimisations in the system have been done towards reducing the amount of data sent across the network.
- Redundant execution can be used to reduce the impact of slow machines, and to handle machine failures and data loss.