前言
本章来学习一个相对复杂一些的案例:共同好友。想必大家都用过QQ、微信之类的聊天工具吧,A是B的好友,而B和C是好友,那么A和C的共同好友就是B。下面我们使用MapReduce的方式实现计算共同好友。
需求
假设数据样本如下所示。
1 | A:B,C,D,F,E,O |
每一行数据代表一个用户的好友关系,冒号前面是用户,冒号后面是该用户的所有好友,数据是单向的。现在要求计算出哪些人有共同好友,以及他们的共同好友都有谁。
举例来说,A和B的共同好友有:C、E。J和K没有共同好友。
实现
明白了需求后,我们就要考虑如何实现了。这个问题稍微有些复杂,需要先想明白,再去写代码。
抛开MapReduce不说,我们先用普通代码的方法将这个案例实现出来。从第一行开始,依次去和后续所有行进行计算,用自己的好友列表与对方好友列表的交集即可。
1 | public class OurFriends { |
输出结果如下所示。
1 | A与B的共同好友是:[C, E] |
下面我们来考虑如何使用MapReduce来计算共同好友。
计算共同好友,需要将问题转换为MapReduce的编程模型来进行实现。在上面使用Java语言实现时,逻辑是依次用每个用户的好友列表,和其他用户的好友列表进行交集计算。但是在MapReduce中可不行,因为数据是一行行的交给Map任务处理的,并且可能会有多个Map任务,因此无法在Map阶段就计算出来,肯定是需要依赖到Reduce任务的。那么在Map阶段该如何做呢?下面我们用两行数据进行说明。
1 | A:B,C,D,F,E,O |
这是什么意思呢?
Map任务每读取一行,会产生多条数据,数据格式为<好友,用户>,以好友作为Key,自己作为Value。那么在Reduce阶段,就会收到<好友,[用户1,用户2,…],这样我们就计算出了有哪些用户,他们的好友列表中都包括该好友。
需要注意到,Reduce的输出为两个用户之间的某一个共同好友,因此需要编写两个MapReduce程序,第一个用于计算用户的共同好友,但Reduce的输出为每行一个共同好友。第二个MapReduce程序将第一次输出的结果作为输入,进行汇总。
下面我们来编写代码实现。
1 | public class FriendMapper extends Mapper<LongWritable, Text, Text, Text> { |
1 | public class FriendReducer extends Reducer<Text, Text, Text, Text> { |
1 | public class FriendMapper2 extends Mapper<LongWritable, Text, Text, Text> { |
1 | public class FriendReducer2 extends Reducer<Text, Text, Text, NullWritable> { |
最终输出结果如下。
1 | A-B:[C, E] |
可以看到,最终结果与我们编写的单机版Java程序效果一致。
扩展
基于上面的数据,我们可以做一个扩展任务:计算互为好友的用户。举例来说,A和B互为好友,因为A的好友列表中有B,而B的好友列表中有A。A和K不是互为好友,因为A的好友列表中没有K,而K的好友列表中有A。L和M不是互为好友,因为L的好友列表中没有M,M的好友列表中也没有L。
我们还是以两行数据来作为说明。
1 | A:B,C,D,F,E,O |
下面我们直接看Map任务和Reduce任务的代码即可。
1 | public class FriendMapper3 extends Mapper<LongWritable, Text, Text, NullWritable> { |
1 | public class FriendReducer3 extends Reducer<Text, NullWritable, Text, NullWritable> { |
最终输出结果如下。
1 | A-B |
Job串联
在第一个案例中,使用到了两个MapReduce任务,我们先提交第一个任务,等待运行完成后,再提交第二个,而第二个任务的输入,则是第一个任务的输出。
在一些相对复杂的处理逻辑中,往往需要多个MapReduce任务串联起来进行处理。一次一个任务提交非常麻烦,你需要等待第一个任务运行结束,再去手动提交第二个。MapReduce中提供了JobControl
用于将多个任务串联起来,而不需要手动一个个的去提交。
代码很简单,下面给出代码片段,说明如何去使用。
1 | // 将job转换为ControlledJob |
小结
本章完成了”共同好友”以及”互为好友”的MapReduce案例。这两个案例相对于前面的案例有些复杂,主要关键点在于Key的设计。由于MapReduce编程模型的特性(按照Key排序、汇总等),需要将任务按照其特性进行拆解,这点非常重要。最后我们介绍了在多个MapReduce串联情况下,可以使用JobControl来完成任务串联执行,而无需手动依次提交任务。