前言
在很多场景下,我们要对数据集进行排序,并获取排序后的TopN。例如,找出某个用户订单中消费金额最高的前三个。下面我们使用MapReduce来实现这一需求。
需求
下面的数据是每天的气温记录。
1 | 2018,01,01:2 |
冒号前面的数据表示年月日,冒号后面的数据表示当前的气温。现在要求你对数据进行处理,找到每个月气温最高的前三个温度,输出格式如下。
1 | 2018-01:4,3,2 |
实现1
这里先来看第一种比较简单的实现方式。首先在Map阶段对数据进行切割,将年-月作为Key,温度作为Value输出。在Reduce阶段,拿到某个月的所有温度,进行排序,获取到前三个温度即可。
1 | public class TemperatureMapper extends Mapper<LongWritable, Text, Text, VIntWritable> { |
1 | public class TemperatureReducer extends Reducer<Text, VIntWritable, Text, NullWritable> { |
输出结果如下所示。
1 | 2018-01:4,3,2 |
虽然说功能实现了,但是这种方式有一些缺陷:假如数据量很大,例如要求计算所有用户的订单金额前三名,在Reduce端需要将所有订单金额先缓存到内存中,再进行排序,有可能会导致OOM。其次我们没有利用好MapReduce本身自带的排序功能,而是再Reduce端由进行了一次排序,效率下降。
下面我们来看下第二种实现方式。
实现2
考虑如何借助MapReduce自身的排序?Map任务在输出时,会根据Key进行排序,在上面的实现中,Key是年-月,如2018-01,因此最终的结果是按照年-月排序的。但是我们还需要将气温数据也进行排序。想必你也想到了,需要使用自定义Key。
假设Map任务输出的值如下。
1 | <(2018-01,4),4>,<(2018-01,3),3>,<(2018-01,2),2>,<(2018-01,2),2> |
数据已经排好序了,在Reduce端只需要拿到N个value值就可以了。不需要先缓存数据,再进行排序,效率大大提升。
首先自定义Key。
1 | public class MyKey implements WritableComparable<MyKey> { |
Mapper类的处理逻辑与第一种方式类似,不过这里封装成MyKey。
1 | public class TemperatureMapper2 extends Mapper<LongWritable, Text, MyKey, VIntWritable> { |
Reduce端代码如下。
1 | public class TemperatureReducer2 extends Reducer<MyKey, VIntWritable, Text, NullWritable> { |
接下来一步很重要,我们知道,在Reduce端,会根据相同的Key,将一组Value交给Reduce方法进行处理,但是这里Key是自定义的,Reduce可不可以自动聚合呢?答案是否定的,直接运行程序,输出结果如下。
1 | 2018-01:4 |
按照我们的逻辑,MyKey中yearMonth字段相等的对象,应当作为相同的Key进行聚合,那么该如何设置呢?
在最早的WordCount案例中,Map端输出数据如下。
1 | <hello,1>,<world,1>,<hello,1><world,1>,... |
在这个案例中,Map端输出的数据如下。
1 | <myKey1, temperature>,<myKey2, temperature>,<myKey2, temperature>,... |
关键就在于如何编写自定义的比较MyKey逻辑。
在MapReduce中,提供了GroupingComparator
组件,用于编写自定义的分组比较器。它是Reduce端的一个组件,主要的作用就是决定哪些数据作为一组,调用一次reduce逻辑。在前面的案例中,我们都没有设置过这个组件,这是因为我们使用的Text
、IntWritable
这些类中,以及定义好了对应的GroupingComparator了,如下所示。
1 | public static class Comparator extends WritableComparator { |
因此,如果我们使用Text、IntWritable这些组件,只要数据相同,在Reduce端就会使用内置的GroupComparator进行分组。
现在我们需要将MyKey按照自定义逻辑分组,就需要实现自己的GroupingComparator。
1 | public class MyGroupComparator extends WritableComparator { |
别忘了在Runner中配置自定义的GroupingComparator。
1 | // 自定义分组比较器 |
运行结果如下。
1 | 2018-01:4,3,2 |
实际上,到达Reduce的数据,是已经排序后的,框架会拿着当前数据去和后面的数据进行比较,如果相等,则认为是同一组,不相等的话,就是不同组。我们实现了自定义的GroupingComparator,就是让框架使用我们自定义的逻辑对每条数据进行比较。
小结
本章完成了TopN的案例。并使用两种方式进行实现。第一种方式将所有数据发送到Reduce端,在Reduce端缓存并进行排序,在数据量较大的情况下可能会出现OOM,不过实现方式比较简单。第二种方式借助了MapReduce自身的排序功能,将数据封装为自定义Key,由框架进行排序,并实现自定义的GroupingComparator,决定哪些数据作为相同组进入reduce逻辑。虽然说第二种方式相对复杂一些,但是效率最高,也最节省内存。