- Mastering Hadoop
- Sandeep Karanth
- 606字
- 2025-02-23 16:44:05
The Reduce task
The Reduce task is an aggregation step. If the number of Reduce tasks is not specified, the default number is one. The risk of running one Reduce task would mean overloading that particular node. Having too many Reduce tasks would mean shuffle complexity and proliferation of output files that puts pressure on the NameNode. It is important to understand the data distribution and the partitioning function to decide the optimal number of Reduce tasks.
The number of Reduce tasks can be set using the mapreduce.job.reduces
parameter. It can be programmatically set by calling the setNumReduceTasks()
method on the Job
object. There is a cap on the number of Reduce tasks that can be executed by a single node. It is given by the mapreduce.tasktracker.reduce.maximum
property.
Note
The heuristic to determine the right number of reducers is as follows:
0.95 * (nodes * mapreduce.tasktracker.reduce.maximum
)
Alternatively, you can use the following:
1.75 * (nodes * mapreduce.tasktracker.reduce.maximum
)
At 0.95, each of the reducers can launch immediately after the Map tasks are completed, and at 1.75, the faster nodes will finish their first Reduce task and move onto the second one. This is a better setting for load balancing.
Fetching intermediate outputs – Reduce-side
The Reduce task fetches relevant partitions from a Map task as and when they finish. This is called the Copy phase. The number of Map tasks from whom a Reduce task can fetch data in parallel is determined by the value of the mapreduce.shuffle.reduce.parallelcopies
parameter. The lower this value, the more the queuing on the Reduce side. The Reduce task might have to wait for an available slot to fetch data from a Map task.
In situations where a Reduce task cannot reach the output data of the Map task due to network connectivity issues, it retries the fetch in an exponential backoff fashion. The retries continue until the time value specified by the mapred.reduce.copy.backoff
property is reached. After that, the Reduce task is marked as failed.
Merge and spill of intermediate outputs
Similar to the Map task's sort and spill, the Reduce task also needs to merge and invoke the Reduce on files on multiple Map task outputs. The next diagram illustrates this process. Depending on the size of the Map task output, they are either copied to a memory buffer or to the disk. The mapreduce.reduce.shuffle.input.buffer.percent
property configures the size of this buffer as a percentage of the heap size allocated to the task.
The value of the mapreduce.reduce.shuffle.merge.percent
property determines the threshold beyond which this buffer has to be spilt to disk. The default value of this setting is 0.66. The mapreduce.reduce.merge.inmem.threshold
property sets the threshold for the number of map outputs that can reside in memory before a disk spill happens. The default value of this property is 1000. When either threshold is reached, the map outputs are written onto the disk.

A background thread continuously merges the disk files. After all the outputs are received, the Reduce task moves into the Merge or Sort phase. Again, like the Map task merge, the number of file streams that are merged simultaneously is determined by the value of the mapreduce.task.io.sort.factor
attribute. The tuning of these parameters can be done in a fashion similar to the Map-side spill and merge parameters. The key is to process as much as possible in the memory.
In later versions of Hadoop, two parameters, mapreduce.reduce.merge.memtomem.enabled
and mapreduce.reduce.merge.memtomem.threshold
, enable merging within the memory.
Any compression used for Map task outputs gets reversed in the memory during merging.