前言
我们在介绍MapReduce Shuffle阶段时,曾经提及过Combiner
的概念。Combiner在某种场景下,可以减少Map任务和Reduce任务之间的数据传输,属于优化方案。简单来说,即便不使用Combiner,我们的程序也没有任何影响,如果要使用Combiner,可以适当提升运行速度,不过有条件限制。
Combiner概念
Combiner主要用于局部汇总,可在Map任务输出以及Shuffle合并阶段对数据进行局部汇总处理,减少Map和Reduce之间的数据量,其功能与Reducer类似。
下面我们通过一个例子来说明Combiner的概念以及处理过程。
还是以WordCount为例,假设有两个Map任务,第一个Map任务的输出如下:
1 | (hadoop, 1) |
第二个Map任务的输出如下:
1 | (hadoop, 1) |
在Reduce阶段,输入数据如下:
1 | (hadoop, [1, 1, 1, 1, 1, 1]) |
Reduce通过遍历每个Key对应的Value列表,累加计算出每个Key出现的次数。
程序当然没有问题。不过第一个Map任务最终有7条数据,第二个Map任务最终有5条数据,Reduce任务需要从两个Map任务复制7+5=12
条数据。
考虑一下,假如第一个Map任务输出如下:
1 | (hadoop, 4) |
第二个Map任务输出如下:
1 | (hadoop, 2) |
在Reduce阶段,输入数据如下:
1 | (hadoop, [4, 2]) |
每个Map任务最终有3条数据,Reduce任务需要从两个Map任务复制3+3=6
条数据,数据量相比于第一种方式减少了一半,效率自然就提升了。
在第二种方式中,数据在传输之间,相当于做了一次局部的汇总,其效果和Reduce任务所达到的效果一样。因此,很多时候,我们编写的Reducer任务可以直接拿来作为Combiner任务执行。
使用Combiner
明白了Combiner的概念以及作用之后,下面我们通过代码来体验一把Combiner的使用,还是以WordCount为例,下面是代码。
WordCountMapper
1 | public class WordCountMapper extends Mapper<LongWritable, Text, Text, VIntWritable> { |
WordCountReducer
1 | public class WordCountReducer extends Reducer<Text, VIntWritable, Text, VIntWritable> { |
WordCountRunner
1 | public class WordCountRunner extends Configured implements Tool { |
Combiner限制
并不是所有的场景都能够使用Combiner来做局部汇总的。再看一个例子,例如我们要计算用户的平均消费金额。
第一个Map任务输出:
1 | (zhangsan, 20) |
第二个Map任务输出:
1 | (zhangsan, 25) |
Reducer任务计算平均消费金额应当是(20+5+5+25+15)/5=14
。如果使用了Combiner,每个Map任务先做一次局部汇总。
第一个Map任务计算(20+5+5)/3=10
,第二个Map任务计算(25+15)/2=20
。最后Reduce任务只会得到两个计算结果,然后将这两个计算结果再做平均值计算(10+20)/2=15
,这时结果就不对了。
小结
在本章中,我们介绍了MapReduce中Combiner的概念,Combiner实际上就是Reducer,它可以在Map任务输出时进行局部汇总,减少网络数据传输量。但是它有使用限制,在特殊场景下,会导致数据有误。因此,如果要使用Combiner,需要事先评估是否会出错。