前言
在关系型数据中,我们经常会使用连接(join)操作,进行相关的数据查询。例如,查询用户的订单信息、查询商品的分类信息等。在MapReduce程序中,有时我们也需要进行类似的操作,例如订单的信息存储在某些数据文件中,用户的信息存储在另外的数据文件中。下面我们使用MapReduce程序完成连接相关的操作。
需求
还是以订单案例作为需求,假设订单数据如下。
1 | 1 小米5 1998.00 1001 2018-03-10 15:30:11 |
每行数据由订单ID、商品名称、商品价格、用户ID、订单时间构成,字段使用TAB分隔。
假设用户数据如下。
1 | 1001 张三丰 男 1990-10-11 |
每行数据由用户ID、姓名、性别、出生日期构成,字段使用TAB分隔。
要求计算结果为订单ID、商品名称、用户ID、用户姓名、用户性别、订单时间,例如。
1 | 1 小米5 1998.00 1001 张三丰 男 2018-03-10 15:30:11 |
实现(Reduce连接)
我们知道,Reduce会将相同Key的数据作为分组,在reduce端聚合。我们这里要根据用户ID进行连接,因此在Map任务输出时,需要将用户ID作为Key。
在Reduce端,根据用户ID拿到分组数据后,需要识别出数据是订单数据,还是用户数据,解析后再进行连接操作。需要注意的是,如果用户没有对应的订单,则不需要将该数据输出。
首先定义Order
对象,用于封装字段信息。
1 | public class Order implements Writable { |
注意,所有的字段都有默认值,是为了write()
方法中避免空指针异常。
下面是OrderMapper
类的实现。
1 | public class OrderMapper extends Mapper<LongWritable, Text, VIntWritable, Order> { |
OrderMapper类中,根据输入文件的名称,判断数据是用户数据还是订单数据。
下面是OrderReducer
类的实现。
1 | public class OrderReducer extends Reducer<VIntWritable, Order, Order, NullWritable> { |
在OrderReducer类中,将Order数据获取到后,根据特定的字段判断此对象是用户数据还是订单数据。此外,只有用户数据不为空并且存在订单数据时,才会将数据输出。
最后是OrderRunner
类的实现。
1 | public class OrderRunner extends Configured implements Tool { |
程序并不是很复杂,也能够正常运行,不过这种实现方式有一些缺点。数据是在Reduce端连接的,这就意味着所有数据(包括订单数据和用户数据)都要经过网络传输,Reducer节点负载很高,Map节点负载较低,并且没有做到真正的”并行”。
下面我们来看另外一种实现方式。
实现(Map连接)
我们换个思路:能否在Map阶段将数据连接呢?Map任务一般来说会有很多,在这个阶段完全是并行执行的,如果能在这个阶段完成连接,那么程序的性能将会大大提升。
一般情况下,对于大表连接小表
类型的任务,可以使用Map连接。举例来说,订单数据量可能非常大,而用户数据量则是有限的。此时我们可以在Map端直接加载用户数据到内存中,然后对于输入的订单数据进行匹配。
那么如何在Map任务中获取到用户数据文件呢?有很多方式可以获取,下面我们介绍比较常见的两种。
从HDFS获取
从HDFS直接获取非常简单,在Map任务的初始化方法(setup())中,实例化FileSystem
对象,然后去指定路径读取文件即可。文件路径可以通过-D your.file.path=path
的方式在提交任务时进行设置。
1 | protected void setup(Context context) throws IOException, InterruptedException { |
不过这种方式不太好,尤其是在Map任务数量比较多的情况下,每运行一个Map任务,都需要从HDFS上加载数据,效率不高。
分布式缓存
另外一种方式是使用分布式缓存
的形式在Map任务中加载数据。
当用户启动一个作业时,Hadoop会吧-files
、-archives
和-libjars
等选项锁指定的文件复制到HDFS上,并在任务运行之前,NodeManager
将这些文件从HDFS复制到本地磁盘,使得任务在运行时能够访问文件。此时,这些文件就被视为”本地化”了。从任务的角度来看,这些文件就已经在那儿了,以符号链接的方式指向任务的工作目录。此外,由-libjars
指定的文件会在任务启动前添加到任务的类路径(classpath)中。
当然,如果你的任务没有使用ToolRunner
的方式进行编码,你也可以使用job.addCacheFile()
、job.addFileToClassPath()
等方式进行设置,只是较为繁琐而已。
下面我们演示如何使用分布式缓存来实现Map端连接。
定义User
类,用于保存用户数据。
1 | public class User { |
定义Order
类,作为Map任务的输出Key,注意要实现WritableComparable
接口。
1 | public class Order implements WritableComparable<Order> { |
定义OrderMapper
类,用于处理用户数据与订单数据的连接操作。
1 | public class OrderMapper extends Mapper<LongWritable, Text, Order, NullWritable> { |
定义OrderRunner
类,配置MapReduce任务信息。注意,本例只有Map任务,没有Reduce任务。
1 | public class OrderRunner extends Configured implements Tool { |
在提交任务时,通过-files
以及-D key=value
的形式指定缓存文件路径以及参数,如下所示。
1 | $ bin/hadoop jar mr/hadoop-example-1.0.jar org.victor.mapreduce.demo11.OrderRunner -D user.file.name=users -files test/users /mr/demo11/input /mr/demo11/output |
小结
本章介绍了如何使用MapReduce进行数据”连接”操作。并给出了两种实现方式:Reduce连接和Map连接。Reduce连接适用于被连接的数据集都非常大的情况下,但是效率较低(可以通过分区增加Reduce数量提高效率),不过更为通用。Map连接适用于被连接的数据集有一端数据量较少的情况,连接操作直接在Map任务中完成,性能较高。此外,我们还介绍了分布式缓存,用于在任务端加载本地数据,而不用每个任务都去HDFS上读取。虽然任务实现了,但是在MapReduce中实现较为复杂的操作还是比较繁琐的,这也是为什么要学习Hive、Spark等框架的原因。