【Hadoop18】:MapReduce案例-TopN

前言

在很多场景下,我们要对数据集进行排序,并获取排序后的TopN。例如,找出某个用户订单中消费金额最高的前三个。下面我们使用MapReduce来实现这一需求。

需求

下面的数据是每天的气温记录。

1
2
3
4
5
6
7
8
9
10
2018,01,01:2
2018,01,02:4
2018,01,03:3
2018,01,03:2
...
2018,02,01:5
2018,02,02:4
2018,02,03:6
2018,02,04:4
...

冒号前面的数据表示年月日,冒号后面的数据表示当前的气温。现在要求你对数据进行处理,找到每个月气温最高的前三个温度,输出格式如下。

1
2
2018-01:4,3,2
2018-02:6,5,4

实现1

这里先来看第一种比较简单的实现方式。首先在Map阶段对数据进行切割,将年-月作为Key,温度作为Value输出。在Reduce阶段,拿到某个月的所有温度,进行排序,获取到前三个温度即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class TemperatureMapper extends Mapper<LongWritable, Text, Text, VIntWritable> {

private static Text outKey = new Text();
private static VIntWritable outValue = new VIntWritable();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 切割数据
String[] fields = value.toString().split(":");
String temperature = fields[1];
String[] yearMonthDay = fields[0].split(",");
// 输出
outKey.set(yearMonthDay[0] + "-" + yearMonthDay[1]);
outValue.set(Integer.valueOf(temperature));
context.write(outKey, outValue);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class TemperatureReducer extends Reducer<Text, VIntWritable, Text, NullWritable> {

private static final Integer N = 3;
private static Text outKey = new Text();
private static NullWritable outValue = NullWritable.get();

@Override
protected void reduce(Text key, Iterable<VIntWritable> values, Context context) throws IOException, InterruptedException {
// 获取到月份下所有气温
List<Integer> temperatures = new ArrayList<>();
for (VIntWritable value : values) {
temperatures.add(value.get());
}
// 对气温进行降序排列
Collections.sort(temperatures);
Collections.reverse(temperatures);
// 拼接
StringBuilder sb = new StringBuilder();
for (int i=0; i<temperatures.size(); i++) {
sb.append(temperatures.get(i)).append(",");
if (i+1 == N) {
break;
}
}
if (sb.length() > 0) {
sb.deleteCharAt(sb.length() - 1);
}
outKey.set(key.toString() + ":" + sb.toString());
context.write(outKey, outValue);
}
}

输出结果如下所示。

1
2
2018-01:4,3,2
2018-02:6,5,4

虽然说功能实现了,但是这种方式有一些缺陷:假如数据量很大,例如要求计算所有用户的订单金额前三名,在Reduce端需要将所有订单金额先缓存到内存中,再进行排序,有可能会导致OOM。其次我们没有利用好MapReduce本身自带的排序功能,而是再Reduce端由进行了一次排序,效率下降。

下面我们来看下第二种实现方式。

实现2

考虑如何借助MapReduce自身的排序?Map任务在输出时,会根据Key进行排序,在上面的实现中,Key是年-月,如2018-01,因此最终的结果是按照年-月排序的。但是我们还需要将气温数据也进行排序。想必你也想到了,需要使用自定义Key。

假设Map任务输出的值如下。

1
2
<(2018-01,4),4>,<(2018-01,3),3>,<(2018-01,2),2>,<(2018-01,2),2>
<(2018-02,6),6>,<(2018-02,5),5>,<(2018-02,4),4>,<(2018-02,4),4>

数据已经排好序了,在Reduce端只需要拿到N个value值就可以了。不需要先缓存数据,再进行排序,效率大大提升。

首先自定义Key。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class MyKey implements WritableComparable<MyKey> {

private String yearMonth; // 封装年-月
private Integer temperature; // 封装气温

@Override
public int compareTo(MyKey o) {
// 先按照年-月排序
int r = this.yearMonth.compareTo(o.yearMonth);
if (r == 0) {
// 再按照气温排序
// 注意这里是降序
r = -1 * Integer.compare(this.temperature, o.temperature);
}
return r;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.yearMonth);
out.writeInt(this.temperature);
}

@Override
public void readFields(DataInput in) throws IOException {
this.yearMonth = in.readUTF();
this.temperature = in.readInt();
}

public String getYearMonth() {
return yearMonth;
}

public void setYearMonth(String yearMonth) {
this.yearMonth = yearMonth;
}

public Integer getTemperature() {
return temperature;
}

public void setTemperature(Integer temperature) {
this.temperature = temperature;
}
}

Mapper类的处理逻辑与第一种方式类似,不过这里封装成MyKey。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TemperatureMapper2 extends Mapper<LongWritable, Text, MyKey, VIntWritable> {

private static MyKey outKey = new MyKey();
private static VIntWritable outValue = new VIntWritable();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 切割数据
String[] fields = value.toString().split(":");
String temperature = fields[1];
String[] yearMonthDay = fields[0].split(",");
outKey.setYearMonth(yearMonthDay[0] + "-" + yearMonthDay[1]);
outKey.setTemperature(Integer.valueOf(temperature));
outValue.set(Integer.valueOf(temperature));

context.write(outKey, outValue);
}
}

Reduce端代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class TemperatureReducer2 extends Reducer<MyKey, VIntWritable, Text, NullWritable> {

private static final Integer N = 3;

private static Text outKey = new Text();
private static NullWritable outValue = NullWritable.get();

@Override
protected void reduce(MyKey key, Iterable<VIntWritable> values, Context context) throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
int counter = 0;
for (VIntWritable value : values) {
sb.append(value).append(",");
counter++;
if (counter == N) {
break;
}
}
if (sb.length() != 0) {
sb.deleteCharAt(sb.length() - 1);
}
outKey.set(key.getYearMonth() + ":" + sb.toString());
context.write(outKey, outValue);
}
}

接下来一步很重要,我们知道,在Reduce端,会根据相同的Key,将一组Value交给Reduce方法进行处理,但是这里Key是自定义的,Reduce可不可以自动聚合呢?答案是否定的,直接运行程序,输出结果如下。

1
2
3
4
5
6
2018-01:4
2018-01:3
2018-01:2,2
2018-02:6
2018-02:5
2018-02:4,4

按照我们的逻辑,MyKey中yearMonth字段相等的对象,应当作为相同的Key进行聚合,那么该如何设置呢?

在最早的WordCount案例中,Map端输出数据如下。

1
<hello,1>,<world,1>,<hello,1><world,1>,...

在这个案例中,Map端输出的数据如下。

1
<myKey1, temperature>,<myKey2, temperature>,<myKey2, temperature>,...

关键就在于如何编写自定义的比较MyKey逻辑。

在MapReduce中,提供了GroupingComparator组件,用于编写自定义的分组比较器。它是Reduce端的一个组件,主要的作用就是决定哪些数据作为一组,调用一次reduce逻辑。在前面的案例中,我们都没有设置过这个组件,这是因为我们使用的TextIntWritable这些类中,以及定义好了对应的GroupingComparator了,如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
public static class Comparator extends WritableComparator {
public Comparator() {
super(Text.class);
}

@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
int n1 = WritableUtils.decodeVIntSize(b1[s1]);
int n2 = WritableUtils.decodeVIntSize(b2[s2]);
return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);
}
}

因此,如果我们使用Text、IntWritable这些组件,只要数据相同,在Reduce端就会使用内置的GroupComparator进行分组。

现在我们需要将MyKey按照自定义逻辑分组,就需要实现自己的GroupingComparator。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class MyGroupComparator extends WritableComparator {

public MyGroupComparator() {
// 这里很重要,一定要调用父类的构造方法
// 第一个参数表示将MyKey注册到框架中,
// 第二个参数表示在比较时,将MyKey创建出来
super(MyKey.class, true);
}

// 注意,一定要重写参数为WritableComparable的方法
@Override
public int compare(WritableComparable a, WritableComparable b) {
MyKey k1 = (MyKey) a;
MyKey k2 = (MyKey) b;
// 如果yearMonth字段相同,则认为是同一组
return k1.getYearMonth().compareTo(k2.getYearMonth());
}
}

别忘了在Runner中配置自定义的GroupingComparator。

1
2
// 自定义分组比较器
job.setGroupingComparatorClass(MyGroupComparator.class);

运行结果如下。

1
2
2018-01:4,3,2
2018-02:6,5,4

实际上,到达Reduce的数据,是已经排序后的,框架会拿着当前数据去和后面的数据进行比较,如果相等,则认为是同一组,不相等的话,就是不同组。我们实现了自定义的GroupingComparator,就是让框架使用我们自定义的逻辑对每条数据进行比较。

小结

本章完成了TopN的案例。并使用两种方式进行实现。第一种方式将所有数据发送到Reduce端,在Reduce端缓存并进行排序,在数据量较大的情况下可能会出现OOM,不过实现方式比较简单。第二种方式借助了MapReduce自身的排序功能,将数据封装为自定义Key,由框架进行排序,并实现自定义的GroupingComparator,决定哪些数据作为相同组进入reduce逻辑。虽然说第二种方式相对复杂一些,但是效率最高,也最节省内存。

如果您觉得不错,请赞赏一下!