【Hadoop17】:MapReduce案例-共同好友

前言

本章来学习一个相对复杂一些的案例:共同好友。想必大家都用过QQ、微信之类的聊天工具吧,A是B的好友,而B和C是好友,那么A和C的共同好友就是B。下面我们使用MapReduce的方式实现计算共同好友。

需求

假设数据样本如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J

每一行数据代表一个用户的好友关系,冒号前面是用户,冒号后面是该用户的所有好友,数据是单向的。现在要求计算出哪些人有共同好友,以及他们的共同好友都有谁。

举例来说,A和B的共同好友有:C、E。J和K没有共同好友。

实现

明白了需求后,我们就要考虑如何实现了。这个问题稍微有些复杂,需要先想明白,再去写代码。

抛开MapReduce不说,我们先用普通代码的方法将这个案例实现出来。从第一行开始,依次去和后续所有行进行计算,用自己的好友列表与对方好友列表的交集即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class OurFriends {

private static String[] relations = {
"A:B,C,D,F,E,O",
"B:A,C,E,K",
"C:F,A,D,I",
"D:A,E,F,L",
"E:B,C,D,M,L",
"F:A,B,C,D,E,O,M",
"G:A,C,D,E,F",
"H:A,C,D,E,O",
"I:A,O",
"J:B,O",
"K:A,C,D",
"L:D,E,F",
"M:E,F,G",
"O:A,H,I,J"
};

public static void main(String[] args) {
for (int i = 0; i < relations.length - 1; i++) {
// 解析出第一行数据
User current = parse(relations[i]);
for (int j = i + 1; j < relations.length; j++) {
User other = parse(relations[j]);
// 首先判断双方是否为好友关系
// 获取双方的共同好友
Set<String> together = new HashSet<>();
for (String myFriend : current.friends) {
if (other.friends.contains(myFriend)) {
together.add(myFriend);
}
}
if (together.isEmpty()) {
// System.out.println(String.format("%s与%s没有共同好友.", current.name, other.name));
} else {
System.out.println(String.format("%s与%s的共同好友是:%s", current.name, other.name, together));
}
}
}
}

private static User parse(String relation) {
User user = new User();
String[] parts = relation.split(":");
String name = parts[0];
String[] friends = parts[1].split(",");

user.name = name;
user.friends = new HashSet<>(Arrays.asList(friends));
return user;
}

}

class User {
String name;
Set<String> friends;
}

输出结果如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
A与B的共同好友是:[C, E]
A与C的共同好友是:[D, F]
A与D的共同好友是:[E, F]
A与E的共同好友是:[B, C, D]
A与F的共同好友是:[B, C, D, E, O]
A与G的共同好友是:[C, D, E, F]
A与H的共同好友是:[C, D, E, O]
A与I的共同好友是:[O]
A与J的共同好友是:[B, O]
A与K的共同好友是:[C, D]
A与L的共同好友是:[D, E, F]
A与M的共同好友是:[E, F]
B与C的共同好友是:[A]
B与D的共同好友是:[A, E]
B与E的共同好友是:[C]
B与F的共同好友是:[A, C, E]
B与G的共同好友是:[A, C, E]
B与H的共同好友是:[A, C, E]
B与I的共同好友是:[A]
B与K的共同好友是:[A, C]
B与L的共同好友是:[E]
B与M的共同好友是:[E]
B与O的共同好友是:[A]
C与D的共同好友是:[A, F]
C与E的共同好友是:[D]
C与F的共同好友是:[A, D]
C与G的共同好友是:[A, D, F]
C与H的共同好友是:[A, D]
C与I的共同好友是:[A]
C与K的共同好友是:[A, D]
C与L的共同好友是:[D, F]
C与M的共同好友是:[F]
C与O的共同好友是:[A, I]
D与E的共同好友是:[L]
D与F的共同好友是:[A, E]
D与G的共同好友是:[A, E, F]
D与H的共同好友是:[A, E]
D与I的共同好友是:[A]
D与K的共同好友是:[A]
D与L的共同好友是:[E, F]
D与M的共同好友是:[E, F]
D与O的共同好友是:[A]
E与F的共同好友是:[B, C, D, M]
E与G的共同好友是:[C, D]
E与H的共同好友是:[C, D]
E与J的共同好友是:[B]
E与K的共同好友是:[C, D]
E与L的共同好友是:[D]
F与G的共同好友是:[A, C, D, E]
F与H的共同好友是:[A, C, D, E, O]
F与I的共同好友是:[A, O]
F与J的共同好友是:[B, O]
F与K的共同好友是:[A, C, D]
F与L的共同好友是:[D, E]
F与M的共同好友是:[E]
F与O的共同好友是:[A]
G与H的共同好友是:[A, C, D, E]
G与I的共同好友是:[A]
G与K的共同好友是:[A, C, D]
G与L的共同好友是:[D, E, F]
G与M的共同好友是:[E, F]
G与O的共同好友是:[A]
H与I的共同好友是:[A, O]
H与J的共同好友是:[O]
H与K的共同好友是:[A, C, D]
H与L的共同好友是:[D, E]
H与M的共同好友是:[E]
H与O的共同好友是:[A]
I与J的共同好友是:[O]
I与K的共同好友是:[A]
I与O的共同好友是:[A]
K与L的共同好友是:[D]
K与O的共同好友是:[A]
L与M的共同好友是:[E, F]

下面我们来考虑如何使用MapReduce来计算共同好友。

计算共同好友,需要将问题转换为MapReduce的编程模型来进行实现。在上面使用Java语言实现时,逻辑是依次用每个用户的好友列表,和其他用户的好友列表进行交集计算。但是在MapReduce中可不行,因为数据是一行行的交给Map任务处理的,并且可能会有多个Map任务,因此无法在Map阶段就计算出来,肯定是需要依赖到Reduce任务的。那么在Map阶段该如何做呢?下面我们用两行数据进行说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
A:B,C,D,F,E,O
B:A,C,E,K

# 第一行输出
<B,A>,<C,A>,<D,A>,<F,A>,<E,A>,<O,A>
# 第二行输出
<A,B>,<C,B>,<E,B>,<K,B>

# Reduce端接收到
<C,A>,<C,B>,...
这样就计算出了,A用户和B用户都包含了C好友
Reduce任务输出
<A-B,C>

这是什么意思呢?

Map任务每读取一行,会产生多条数据,数据格式为<好友,用户>,以好友作为Key,自己作为Value。那么在Reduce阶段,就会收到<好友,[用户1,用户2,…],这样我们就计算出了有哪些用户,他们的好友列表中都包括该好友。

需要注意到,Reduce的输出为两个用户之间的某一个共同好友,因此需要编写两个MapReduce程序,第一个用于计算用户的共同好友,但Reduce的输出为每行一个共同好友。第二个MapReduce程序将第一次输出的结果作为输入,进行汇总。

下面我们来编写代码实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class FriendMapper extends Mapper<LongWritable, Text, Text, Text> {

private static Text outKey = new Text();
private static Text outValue = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// A:B,C,D,F,E,O
String[] fields = value.toString().split(":");
String user = fields[0];
String[] friends = fields[1].split(",");
for (String friend : friends) {
outKey.set(friend);
outValue.set(user);
// <B,A>,<C,A>,<D,A>,<F,A>,<E,A>,<O,A>
context.write(outKey, outValue);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class FriendReducer extends Reducer<Text, Text, Text, Text> {

private static Text outKey = new Text();
private static Text outValue = new Text();

@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// <好友, [用户1, 用户2, ...]>
List<String> users = new ArrayList<>();
for (Text value : values) {
users.add(value.toString());
}
for (int i = 0; i < users.size() - 1; i++) {
for (int j = i + 1; j < users.size(); j++) {
outKey.set(users.get(i) + "-" + users.get(j));
outValue.set(key);
context.write(outKey, outValue);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class FriendMapper2 extends Mapper<LongWritable, Text, Text, Text> {

private static Text outKey = new Text();
private static Text outValue = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\t");

// 这里需要将Key排序,避免出现A-B、B-A这样的情况
String[] users = fields[0].split("-");
Arrays.sort(users);

outKey.set(users[0] + "-" + users[1]);
outValue.set(fields[1]);

context.write(outKey, outValue);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class FriendReducer2 extends Reducer<Text, Text, Text, NullWritable> {

private static Text outKey = new Text();
private static NullWritable outValue = NullWritable.get();

@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// values就是两个用户的共同好友
Set<String> friends = new HashSet<>();
for (Text value : values) {
friends.add(value.toString());
}
outKey.set(key.toString() + ":" + friends.toString());
context.write(outKey, outValue);
}
}

最终输出结果如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
A-B:[C, E]
A-C:[D, F]
A-D:[E, F]
A-E:[B, C, D]
A-F:[B, C, D, E, O]
A-G:[C, D, E, F]
A-H:[C, D, E, O]
A-I:[O]
A-J:[B, O]
A-K:[C, D]
A-L:[D, E, F]
A-M:[E, F]
B-C:[A]
B-D:[A, E]
B-E:[C]
B-F:[A, C, E]
B-G:[A, C, E]
B-H:[A, C, E]
B-I:[A]
B-K:[A, C]
B-L:[E]
B-M:[E]
B-O:[A]
C-D:[A, F]
C-E:[D]
C-F:[A, D]
C-G:[A, D, F]
C-H:[A, D]
C-I:[A]
C-K:[A, D]
C-L:[D, F]
C-M:[F]
C-O:[A, I]
D-E:[L]
D-F:[A, E]
D-G:[A, E, F]
D-H:[A, E]
D-I:[A]
D-K:[A]
D-L:[E, F]
D-M:[E, F]
D-O:[A]
E-F:[B, C, D, M]
E-G:[C, D]
E-H:[C, D]
E-J:[B]
E-K:[C, D]
E-L:[D]
F-G:[A, C, D, E]
F-H:[A, C, D, E, O]
F-I:[A, O]
F-J:[B, O]
F-K:[A, C, D]
F-L:[D, E]
F-M:[E]
F-O:[A]
G-H:[A, C, D, E]
G-I:[A]
G-K:[A, C, D]
G-L:[D, E, F]
G-M:[E, F]
G-O:[A]
H-I:[A, O]
H-J:[O]
H-K:[A, C, D]
H-L:[D, E]
H-M:[E]
H-O:[A]
I-J:[O]
I-K:[A]
I-O:[A]
K-L:[D]
K-O:[A]
L-M:[E, F]

可以看到,最终结果与我们编写的单机版Java程序效果一致。

扩展

基于上面的数据,我们可以做一个扩展任务:计算互为好友的用户。举例来说,A和B互为好友,因为A的好友列表中有B,而B的好友列表中有A。A和K不是互为好友,因为A的好友列表中没有K,而K的好友列表中有A。L和M不是互为好友,因为L的好友列表中没有M,M的好友列表中也没有L。

我们还是以两行数据来作为说明。

1
2
3
4
5
6
7
8
9
10
A:B,C,D,F,E,O
B:A,C,E,K

# 第一行输出
<A-B,null>,<A-C,null>,<A-D,null>,...
# 第二行输出
<A-B,null>,<B-C,null>,<B-E,null>,...

Reduce任务只要判断values的长度是否为2,就可以知道哪两个用户为共同好友了
<A-B,[null,null]>,<A-E,[null]>

下面我们直接看Map任务和Reduce任务的代码即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class FriendMapper3 extends Mapper<LongWritable, Text, Text, NullWritable> {

private static Text outKey = new Text();
private static NullWritable outValue = NullWritable.get();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// A:B,C,D,F,E,O
String[] fields = value.toString().split(":");
// A
String user = fields[0];
// B,C,D,F,E,O
String[] friends = fields[1].split(",");
for (String friend : friends) {
String[] k = {user, friend};
Arrays.sort(k);
outKey.set(k[0] + "-" + k[1]);
// <A-B, null>, <A-C, null>, <A-D, null>, ...
context.write(outKey, outValue);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class FriendReducer3 extends Reducer<Text, NullWritable, Text, NullWritable> {

@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
int counter = 0;
for (NullWritable value : values) {
counter++;
}
if (counter == 2) {
// 互为好友
context.write(key, NullWritable.get());
}
}
}

最终输出结果如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
A-B
A-C
A-D
A-F
A-O
B-E
C-F
D-E
D-F
D-L
E-L
E-M
F-M
H-O
I-O
J-O

Job串联

在第一个案例中,使用到了两个MapReduce任务,我们先提交第一个任务,等待运行完成后,再提交第二个,而第二个任务的输入,则是第一个任务的输出。

在一些相对复杂的处理逻辑中,往往需要多个MapReduce任务串联起来进行处理。一次一个任务提交非常麻烦,你需要等待第一个任务运行结束,再去手动提交第二个。MapReduce中提供了JobControl用于将多个任务串联起来,而不需要手动一个个的去提交。

代码很简单,下面给出代码片段,说明如何去使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 将job转换为ControlledJob
ControlledJob cjob1 = new ControlledJob(job1.getConfiguration());
ControlledJob cjob2 = new ControlledJob(job2.getConfiguration());
// 设置作业之间的依赖关系
cjob2.addDependingJob(cjob1);
// 创建JobControl对象,实际上是一个Runnable
JobControl jobControl = new JobControl("MyJobGroup");
// 将ControlledJob放到JobControl中
jobControl.addJob(cjob1);
jobControl.addJob(cjob2);
// 启动一个线程执行jobControl
Thread t = new Thread(jobControl);
t.start();
// 等待任务结束
while (!jobControl.allFinished()) {
Thread.sleep(1000);
}

小结

本章完成了”共同好友”以及”互为好友”的MapReduce案例。这两个案例相对于前面的案例有些复杂,主要关键点在于Key的设计。由于MapReduce编程模型的特性(按照Key排序、汇总等),需要将任务按照其特性进行拆解,这点非常重要。最后我们介绍了在多个MapReduce串联情况下,可以使用JobControl来完成任务串联执行,而无需手动依次提交任务。

如果您觉得不错,请赞赏一下!