【Hadoop15】:MapReduce案例-数据连接

前言

在关系型数据中,我们经常会使用连接(join)操作,进行相关的数据查询。例如,查询用户的订单信息、查询商品的分类信息等。在MapReduce程序中,有时我们也需要进行类似的操作,例如订单的信息存储在某些数据文件中,用户的信息存储在另外的数据文件中。下面我们使用MapReduce程序完成连接相关的操作。

需求

还是以订单案例作为需求,假设订单数据如下。

1
2
3
4
1	小米5	1998.00	1001	2018-03-10 15:30:11
2 茅台王子酒 366.00 1001 2018-03-10 15:30:11
3 法国进口拉菲 708.00 1002 2018-03-10 15:30:12
4 追风筝的人 35.60 1002 2018-03-10 15:30:12

每行数据由订单ID、商品名称、商品价格、用户ID、订单时间构成,字段使用TAB分隔。

假设用户数据如下。

1
2
3
1001	张三丰	男	1990-10-11
1002 张无忌 男 1990-10-10
1003 小龙女 女 1990-10-12

每行数据由用户ID、姓名、性别、出生日期构成,字段使用TAB分隔。

要求计算结果为订单ID、商品名称、用户ID、用户姓名、用户性别、订单时间,例如。

1
1	小米5	1998.00	1001	张三丰	男	2018-03-10 15:30:11

实现(Reduce连接)

我们知道,Reduce会将相同Key的数据作为分组,在reduce端聚合。我们这里要根据用户ID进行连接,因此在Map任务输出时,需要将用户ID作为Key。

在Reduce端,根据用户ID拿到分组数据后,需要识别出数据是订单数据,还是用户数据,解析后再进行连接操作。需要注意的是,如果用户没有对应的订单,则不需要将该数据输出。

首先定义Order对象,用于封装字段信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public class Order implements Writable {

private int orderId = -1; // 订单ID
private String productName = ""; // 商品名称
private double producePrice; // 商品价格
private int userId; // 用户ID
private String userName = ""; // 用户姓名
private String userGender = ""; // 用户性别
private String orderDateTime = ""; // 订单时间

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.orderId);
out.writeUTF(this.productName);
out.writeDouble(this.producePrice);
out.writeInt(this.userId);
out.writeUTF(this.userName);
out.writeUTF(this.userGender);
out.writeUTF(this.orderDateTime);
}

@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readInt();
this.productName = in.readUTF();
this.producePrice = in.readDouble();
this.userId = in.readInt();
this.userName = in.readUTF();
this.userGender = in.readUTF();
this.orderDateTime = in.readUTF();
}

@Override
public String toString() {
return this.orderId + "\t" + this.productName + "\t" + this.producePrice + "\t" + this.userId
+ "\t" + this.userName + "\t" + this.userGender + "\t" + this.orderDateTime;
}

public int getOrderId() {
return orderId;
}

public void setOrderId(int orderId) {
this.orderId = orderId;
}

public String getProductName() {
return productName;
}

public void setProductName(String productName) {
this.productName = productName;
}

public double getProducePrice() {
return producePrice;
}

public void setProducePrice(double producePrice) {
this.producePrice = producePrice;
}

public int getUserId() {
return userId;
}

public void setUserId(int userId) {
this.userId = userId;
}

public String getUserName() {
return userName;
}

public void setUserName(String userName) {
this.userName = userName;
}

public String getUserGender() {
return userGender;
}

public void setUserGender(String userGender) {
this.userGender = userGender;
}

public String getOrderDateTime() {
return orderDateTime;
}

public void setOrderDateTime(String orderDateTime) {
this.orderDateTime = orderDateTime;
}
}

注意,所有的字段都有默认值,是为了write()方法中避免空指针异常。

下面是OrderMapper类的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class OrderMapper extends Mapper<LongWritable, Text, VIntWritable, Order> {

private static VIntWritable outKey = new VIntWritable();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取文件名称
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String fileName = fileSplit.getPath().getName();
// 根据文件名称判断输入数据是用户数据还是订单数据
String line = value.toString();
String[] fields = line.split("\t");
Order order = new Order();
int userId;
if (fileName.equals("users")) {
// 用户数据
// 1001 张三丰 男 1990-10-11
userId = Integer.parseInt(fields[0]);
order.setUserName(fields[1]);
order.setUserGender(fields[2]);
} else {
// 订单数据
// 1 小米5 1998.00 1001 2018-03-10 15:30:11
userId = Integer.valueOf(fields[3]);
order.setOrderId(Integer.valueOf(fields[0]));
order.setProductName(fields[1]);
order.setProducePrice(Double.valueOf(fields[2]));
order.setOrderDateTime(fields[4]);
}
outKey.set(userId);
context.write(outKey, order);
}
}

OrderMapper类中,根据输入文件的名称,判断数据是用户数据还是订单数据。

下面是OrderReducer类的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class OrderReducer extends Reducer<VIntWritable, Order, Order, NullWritable> {

private static NullWritable outValue = NullWritable.get();

@Override
protected void reduce(VIntWritable key, Iterable<Order> values, Context context) throws IOException, InterruptedException {
List<Order> orderList = new ArrayList<>();
Order userInfo = null;
for (Order value : values) {
if (value.getOrderId() == -1) {
// 用户数据
userInfo = new Order();
userInfo.setUserName(value.getUserName());
userInfo.setUserGender(value.getUserGender());
} else if (value.getUserName().equals("")) {
// 订单数据
Order order = new Order();
order.setOrderId(value.getOrderId());
order.setProductName(value.getProductName());
order.setProducePrice(value.getProducePrice());
order.setOrderDateTime(value.getOrderDateTime());
orderList.add(order);
}
}
// 判断是否满足连接条件
if (userInfo != null && !orderList.isEmpty()) {
// 连接
for (Order order : orderList) {
order.setUserId(key.get());
order.setUserName(userInfo.getUserName());
order.setUserGender(userInfo.getUserGender());

context.write(order, outValue);
}
}
}
}

在OrderReducer类中,将Order数据获取到后,根据特定的字段判断此对象是用户数据还是订单数据。此外,只有用户数据不为空并且存在订单数据时,才会将数据输出。

最后是OrderRunner类的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class OrderRunner extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 3) {
System.out.println("Usage: OrderRunner <order input path> <user input path> <output path>");
return 1;
}
Job job = Job.getInstance(this.getConf(), "OrderExample");
job.setJarByClass(OrderRunner.class);

job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));
TextInputFormat.addInputPath(job, new Path(args[1]));

job.setMapperClass(OrderMapper.class);
job.setMapOutputKeyClass(VIntWritable.class);
job.setMapOutputValueClass(Order.class);

job.setReducerClass(OrderReducer.class);
job.setOutputKeyClass(Order.class);
job.setOutputValueClass(NullWritable.class);

job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[2]));

boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}

public static void main(String[] args) throws Exception {
int code = ToolRunner.run(new OrderRunner(), args);
System.exit(code);
}
}

程序并不是很复杂,也能够正常运行,不过这种实现方式有一些缺点。数据是在Reduce端连接的,这就意味着所有数据(包括订单数据和用户数据)都要经过网络传输,Reducer节点负载很高,Map节点负载较低,并且没有做到真正的”并行”。

下面我们来看另外一种实现方式。

实现(Map连接)

我们换个思路:能否在Map阶段将数据连接呢?Map任务一般来说会有很多,在这个阶段完全是并行执行的,如果能在这个阶段完成连接,那么程序的性能将会大大提升。

一般情况下,对于大表连接小表类型的任务,可以使用Map连接。举例来说,订单数据量可能非常大,而用户数据量则是有限的。此时我们可以在Map端直接加载用户数据到内存中,然后对于输入的订单数据进行匹配。

那么如何在Map任务中获取到用户数据文件呢?有很多方式可以获取,下面我们介绍比较常见的两种。

从HDFS获取

从HDFS直接获取非常简单,在Map任务的初始化方法(setup())中,实例化FileSystem对象,然后去指定路径读取文件即可。文件路径可以通过-D your.file.path=path的方式在提交任务时进行设置。

1
2
3
4
5
6
7
8
9
10
11
protected void setup(Context context) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(context.getConfiguration());
String p = context.getConfiguration().get("your.file.path");
if (p == null) {
throw new IllegalArgumentException("No file specified.");
}
FSDataInputStream open = fs.open(new Path(p));
BufferedReader br = new BufferedReader(new InputStreamReader(open));
// 读取数据
// ...
}

不过这种方式不太好,尤其是在Map任务数量比较多的情况下,每运行一个Map任务,都需要从HDFS上加载数据,效率不高。

分布式缓存

另外一种方式是使用分布式缓存的形式在Map任务中加载数据。

当用户启动一个作业时,Hadoop会吧-files-archives-libjars等选项锁指定的文件复制到HDFS上,并在任务运行之前,NodeManager将这些文件从HDFS复制到本地磁盘,使得任务在运行时能够访问文件。此时,这些文件就被视为”本地化”了。从任务的角度来看,这些文件就已经在那儿了,以符号链接的方式指向任务的工作目录。此外,由-libjars指定的文件会在任务启动前添加到任务的类路径(classpath)中。

当然,如果你的任务没有使用ToolRunner的方式进行编码,你也可以使用job.addCacheFile()job.addFileToClassPath()等方式进行设置,只是较为繁琐而已。

下面我们演示如何使用分布式缓存来实现Map端连接。

定义User类,用于保存用户数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class User {

private Integer userId;
private String userName;
private String userGender;

public Integer getUserId() {
return userId;
}

public void setUserId(Integer userId) {
this.userId = userId;
}

public String getUserName() {
return userName;
}

public void setUserName(String userName) {
this.userName = userName;
}

public String getUserGender() {
return userGender;
}

public void setUserGender(String userGender) {
this.userGender = userGender;
}
}

定义Order类,作为Map任务的输出Key,注意要实现WritableComparable接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public class Order implements WritableComparable<Order> {

private int orderId; // 订单ID
private String productName; // 商品名称
private double producePrice; // 商品价格
private int userId; // 用户ID
private String userName; // 用户姓名
private String userGender; // 用户性别
private String orderDateTime; // 订单时间

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.orderId);
out.writeUTF(this.productName);
out.writeDouble(this.producePrice);
out.writeInt(this.userId);
out.writeUTF(this.userName);
out.writeUTF(this.userGender);
out.writeUTF(this.orderDateTime);
}

@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readInt();
this.productName = in.readUTF();
this.producePrice = in.readDouble();
this.userId = in.readInt();
this.userName = in.readUTF();
this.userGender = in.readUTF();
this.orderDateTime = in.readUTF();
}

@Override
public int compareTo(Order o) {
// 根据订单金额排序
return Double.compare(this.producePrice, o.producePrice);
}

@Override
public String toString() {
return this.orderId + "\t" + this.productName + "\t" + this.producePrice + "\t" + this.userId
+ "\t" + this.userName + "\t" + this.userGender + "\t" + this.orderDateTime;
}

public int getOrderId() {
return orderId;
}

public void setOrderId(int orderId) {
this.orderId = orderId;
}

public String getProductName() {
return productName;
}

public void setProductName(String productName) {
this.productName = productName;
}

public double getProducePrice() {
return producePrice;
}

public void setProducePrice(double producePrice) {
this.producePrice = producePrice;
}

public int getUserId() {
return userId;
}

public void setUserId(int userId) {
this.userId = userId;
}

public String getUserName() {
return userName;
}

public void setUserName(String userName) {
this.userName = userName;
}

public String getUserGender() {
return userGender;
}

public void setUserGender(String userGender) {
this.userGender = userGender;
}

public String getOrderDateTime() {
return orderDateTime;
}

public void setOrderDateTime(String orderDateTime) {
this.orderDateTime = orderDateTime;
}
}

定义OrderMapper类,用于处理用户数据与订单数据的连接操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class OrderMapper extends Mapper<LongWritable, Text, Order, NullWritable> {

private static Map<Integer, User> userInfo = new HashMap<>();

@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 使用分布式缓存
String fileName = context.getConfiguration().get("user.file.name");
File file = new File(fileName);
BufferedReader br = new BufferedReader(new FileReader(file));
String line = null;
while ((line = br.readLine()) != null) {
String[] fields = line.split("\t");
// 1001 张三丰 男 1990-10-11
User user = new User();
user.setUserId(Integer.parseInt(fields[0]));
user.setUserName(fields[1]);
user.setUserGender(fields[2]);
userInfo.put(user.getUserId(), user);
}
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 这里读取的都是订单数据了
// 1 小米5 1998.00 1001 2018-03-10 15:30:11
String line = value.toString();
String[] fields = line.split("\t");
int userId = Integer.valueOf(fields[3]);
User user = userInfo.get(userId);
if (user != null) {
Order order = new Order();
order.setOrderId(Integer.valueOf(fields[0]));
order.setProductName(fields[1]);
order.setProducePrice(Double.valueOf(fields[2]));
order.setOrderDateTime(fields[4]);

order.setUserId(userId);
order.setUserName(user.getUserName());
order.setUserGender(user.getUserGender());
context.write(order, NullWritable.get());
}
}
}

定义OrderRunner类,配置MapReduce任务信息。注意,本例只有Map任务,没有Reduce任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class OrderRunner extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("Usage: OrderRunner <input path> <output path>");
return 1;
}
Job job = Job.getInstance(this.getConf(), "MapJoinWithDistributedCache");
job.setJarByClass(OrderRunner.class);

job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));

job.setMapperClass(OrderMapper.class);
job.setMapOutputKeyClass(Order.class);
job.setMapOutputValueClass(NullWritable.class);

// 不要Reducer了
job.setNumReduceTasks(0);

job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));

boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}

public static void main(String[] args) throws Exception {
int code = ToolRunner.run(new OrderRunner(), args);
System.exit(code);
}
}

在提交任务时,通过-files以及-D key=value的形式指定缓存文件路径以及参数,如下所示。

1
$ bin/hadoop jar mr/hadoop-example-1.0.jar org.victor.mapreduce.demo11.OrderRunner -D user.file.name=users -files test/users /mr/demo11/input /mr/demo11/output

小结

本章介绍了如何使用MapReduce进行数据”连接”操作。并给出了两种实现方式:Reduce连接和Map连接。Reduce连接适用于被连接的数据集都非常大的情况下,但是效率较低(可以通过分区增加Reduce数量提高效率),不过更为通用。Map连接适用于被连接的数据集有一端数据量较少的情况,连接操作直接在Map任务中完成,性能较高。此外,我们还介绍了分布式缓存,用于在任务端加载本地数据,而不用每个任务都去HDFS上读取。虽然任务实现了,但是在MapReduce中实现较为复杂的操作还是比较繁琐的,这也是为什么要学习Hive、Spark等框架的原因。

如果您觉得不错,请赞赏一下!