【Hadoop12】:MapReduce Combiner

前言

我们在介绍MapReduce Shuffle阶段时,曾经提及过Combiner的概念。Combiner在某种场景下,可以减少Map任务和Reduce任务之间的数据传输,属于优化方案。简单来说,即便不使用Combiner,我们的程序也没有任何影响,如果要使用Combiner,可以适当提升运行速度,不过有条件限制。

Combiner概念

Combiner主要用于局部汇总,可在Map任务输出以及Shuffle合并阶段对数据进行局部汇总处理,减少Map和Reduce之间的数据量,其功能与Reducer类似。

下面我们通过一个例子来说明Combiner的概念以及处理过程。

还是以WordCount为例,假设有两个Map任务,第一个Map任务的输出如下:

1
2
3
4
5
6
7
(hadoop, 1)
(hadoop, 1)
(yarn, 1)
(spark, 1)
(hadoop, 1)
(yarn, 1)
(hadoop, 1)

第二个Map任务的输出如下:

1
2
3
4
5
(hadoop, 1)
(yarn, 1)
(yarn, 1)
(spark, 1)
(hadoop, 1)

在Reduce阶段,输入数据如下:

1
2
3
(hadoop, [1, 1, 1, 1, 1, 1])
(spark, [1, 1])
(yarn, [1, 1, 1, 1])

Reduce通过遍历每个Key对应的Value列表,累加计算出每个Key出现的次数。

程序当然没有问题。不过第一个Map任务最终有7条数据,第二个Map任务最终有5条数据,Reduce任务需要从两个Map任务复制7+5=12条数据。

考虑一下,假如第一个Map任务输出如下:

1
2
3
(hadoop, 4)
(spark, 1)
(yarn, 2)

第二个Map任务输出如下:

1
2
3
(hadoop, 2)
(spark, 1)
(yarn, 2)

在Reduce阶段,输入数据如下:

1
2
3
(hadoop, [4, 2])
(spark, [1, 1])
(yarn, [2, 2])

每个Map任务最终有3条数据,Reduce任务需要从两个Map任务复制3+3=6条数据,数据量相比于第一种方式减少了一半,效率自然就提升了。

在第二种方式中,数据在传输之间,相当于做了一次局部的汇总,其效果和Reduce任务所达到的效果一样。因此,很多时候,我们编写的Reducer任务可以直接拿来作为Combiner任务执行。

使用Combiner

明白了Combiner的概念以及作用之后,下面我们通过代码来体验一把Combiner的使用,还是以WordCount为例,下面是代码。

WordCountMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class WordCountMapper extends Mapper<LongWritable, Text, Text, VIntWritable> {

private static Text outKey = new Text();
private static VIntWritable outValue = new VIntWritable();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(" ");
for (String word : words) {
outKey.set(word);
outValue.set(1);
context.write(outKey, outValue);
}
}
}

WordCountReducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class WordCountReducer extends Reducer<Text, VIntWritable, Text, VIntWritable> {

private static VIntWritable sum = new VIntWritable();

@Override
protected void reduce(Text key, Iterable<VIntWritable> values, Context context) throws IOException, InterruptedException {
int s = 0;
for (VIntWritable value : values) {
s += value.get();
}
sum.set(s);
context.write(key, sum);
}
}

WordCountRunner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class WordCountRunner extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("Usage: WordCountRunner <input path> <output path>");
System.exit(1);
}

Job job = Job.getInstance(this.getConf(), "WordCountWithCombinerExample");
job.setJarByClass(WordCountRunner.class);

job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));

job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(VIntWritable.class);

// 设置Combiner
// 直接使用Reducer即可
job.setCombinerClass(WordCountReducer.class);

job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(VIntWritable.class);

job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));

boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}

public static void main(String[] args) throws Exception {
int code = ToolRunner.run(new WordCountRunner(), args);
System.exit(code);
}
}

Combiner限制

并不是所有的场景都能够使用Combiner来做局部汇总的。再看一个例子,例如我们要计算用户的平均消费金额。

第一个Map任务输出:

1
2
3
(zhangsan, 20)
(zhangsan, 5)
(zhangsan, 5)

第二个Map任务输出:

1
2
(zhangsan, 25)
(zhangsan, 15)

Reducer任务计算平均消费金额应当是(20+5+5+25+15)/5=14。如果使用了Combiner,每个Map任务先做一次局部汇总。

第一个Map任务计算(20+5+5)/3=10,第二个Map任务计算(25+15)/2=20。最后Reduce任务只会得到两个计算结果,然后将这两个计算结果再做平均值计算(10+20)/2=15,这时结果就不对了。

小结

在本章中,我们介绍了MapReduce中Combiner的概念,Combiner实际上就是Reducer,它可以在Map任务输出时进行局部汇总,减少网络数据传输量。但是它有使用限制,在特殊场景下,会导致数据有误。因此,如果要使用Combiner,需要事先评估是否会出错。

如果您觉得不错,请赞赏一下!