【Hadoop11】:MapReduce分区器

前言

到目前为止,我们编写的所有MapReduce只有一个Reducer任务。Map任务可能有多个,因为通过输入分片的机制,将数据源切分为多个分片,每个分片由一个Map任务进行处理。在一些特殊的场景下,我们可能要对数据进行分组,将不同类型的数据处理后存放到不同的目标文件中。由于Reducer任务只有一个,所有Map任务的输出只会由单独的Reducer任务进行处理,因此所生成的数据文件只有一个。这时,分区器(Partitioner)就出现了。

Partitioner介绍

分区器(Partitioner)可以根据自定义的规则,将数据划分到不同的分区内,每个分区由对应的Reducer进行处理。此外,分区的数量要和Reducer任务的数量保持一致,否则不同分区的数据无法分散到不同的Reducer任务上,就失去了分区的意义了。

我们可以通过继承Partitioner抽象类来定义分区规则,首先来看一下该类的源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public abstract class Partitioner<KEY, VALUE> {

/**
* Get the partition number for a given key (hence record) given the total
* number of partitions i.e. number of reduce-tasks for the job.
*
* <p>Typically a hash function on a all or a subset of the key.</p>
*
* @param key the key to be partioned.
* @param value the entry value.
* @param numPartitions the total number of partitions.
* @return the partition number for the <code>key</code>.
*/
public abstract int getPartition(KEY key, VALUE value, int numPartitions);

}

该类中只有一个getPartition()方法,在Map任务输出之前,会调用该方法将Key-Value作为参数传递,获取到对应的分区编号。

默认情况下,我们是没有指定分区器的,Hadoop也为我们提供了一些内置的分区器,如图所示。

如果我们要使用自定义的Partitioner,需要在Job实例中进行设置,如下所示。

1
2
3
4
// 设置分区器
job.setPartitionerClass(YourPartitioner.class);
// Reducer任务的数量要和分区器中分区数量一致
job.setNumReduceTasks(N);

案例

WordCount案例,对单词进行个数统计,将首字母为大写的单词与其他单词的统计结果分类存放。

1
2
3
4
5
6
7
8
Hadoop hadoop
YARN yarn
MapReduce mapreduce
Spark spark
Hadoop hadoop
YARN yarn
MapReduce mapreduce
Spark spark

在这个案例中,只需要有2个分区就够了,一个分区存放首字母为大写的数据,另外一个分区存放其他数据。

Mapper和Reducer的逻辑这里就不过多描述了,想必大家应该很熟悉。

下面是我们自定义的Partitioner。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class WordCountPartitioner extends Partitioner<Text, VIntWritable> {
@Override
public int getPartition(Text text, VIntWritable vIntWritable, int numPartitions) {
// 获取首字母
char first = text.toString().charAt(0);
if (first >= 'A' && first <= 'Z') {
// 首字母大写
return 0;
} else {
return 1;
}
}
}

在Job实例中,设置我们自定义的Partitioner以及Reducer任务的个数。

1
2
3
4
// 设置分区器
job.setPartitionerClass(WordCountPartitioner.class);
// Reducer任务的数量要和分区器中分区数量一致
job.setNumReduceTasks(2);

运行结果分为两个文件,由两个Reducer任务输出而成。

1
2
3
4
5
6
7
8
9
10
11
# part-r-00000
Hadoop 2
MapReduce 2
Spark 2
YARN 2

# part-r-00001
hadoop 2
mapreduce 2
spark 2
yarn 2

小结

本章我们学习了在MapReducer中分区器(Partitioner)的概念以及使用方式。分区器可以很方便的将数据进行分类,通过自定义的分区逻辑,返回不同数据对应的分区编号。此外,分区的数量要和Reducer任务的数量保持一致,否则不同类型的数据会由相同的Reducer任务处理,就没有分区的效果了。

如果您觉得不错,请赞赏一下!