【Hadoop08】:WordCount详解

前言

在上一章中,我们介绍了MapReduce的基本概念,并编写了一个WordCount程序提交到集群上运行。想必大家在编写完第一个程序后,会有很多疑惑,本章会对该程序进行详细剖析,把知识点理解透彻。

MapReduce程序的整体结构

从整体结构上来看,一个MapReduce程序,至少由三个部分构成:自定义Mapper、自定义Reducer和程序入口类(特殊情况下Reducer也会省略),数据传输是通过Key-Value的形式来完成的。

在自定义Mapper类中,我们需要继承MapReduce框架为我们提供的Mapper类,并指定四个泛型参数,分别表示:

  1. 输入数据Key的类型
  2. 输入数据Value的类型
  3. 输出数据Key的类型
  4. 输出数据Value的类型

此外,我们需要重写父类中的map()方法,加入我们自己的业务逻辑处理代码,并将处理后的数据通过context对象发送出去。

在上面的案例中,由于输入的数据是文本类型,因此Mapper的输入Key是当前数据所在数据块中的偏移量,输入的Value是当前行(数据按照一行一行进入到map函数进行处理),输出数据Key是切分后的独立单词,输出Value是一个单词的个数,这里为了简单起见,全部都为1。

在自定义的Reducer类中,我们需要继承MapReduce框架为我们提供的Reducer类,并指定四个泛型参数,分别表示:

  1. 输入数据Key的类型(等于Mapper任务的输出Key类型)
  2. 输入数据Value的类型(等于Mapper任务的输出Value类型)
  3. 输出数据Key的类型
  4. 输出数据Value的类型

此外,我们需要重写父类中的reduce()方法,加入我们自己的业务处理代码,并将处理后的数据通过context对象发送处理。

在案例中,Reducer输入的Key就是Mapper阶段输出的单词,Value则是一个迭代器,表示这个单词所对应每一个值。想想看,对于每个单词,Mapper阶段输出结构是<单词, 1>这样的结构,也就是说,如果遇到了相同的单词,就会出现多个相同Key的键值对。在Reducer阶段,会自动将相同Key的数据进行聚合,因此Reducer阶段的Value是一个迭代器,或者你可以认为就是一个列表。

最后,我们编写了一个入口类,在该类的main()方法中,使用Job对象对程序进行了一系列的设置,包括输入的数据类型、输入的路径、Mapper类是哪个、Reducer类是哪个、输出的数据类型以及输出的路径等等。

下面我们画幅图,来说明程序的运行过程。

注意,该图省略了很多步骤,不过暂且先通过图形的方式,理解程序是如何运行的即可。

Writable

想必大家以及发现了,我们在案例中,使用到了TextIntWritableLongWritable等,这些类是什么作用,与普通的Java数据类型有什么不同?

在介绍MapReduce时,我们说过,Map任务可能会在不同的节点上运行,运行的结果交由Reduce任务进行汇总处理,这个过程需要通过网络将数据传输,那么就需要将数据进行序列化和反序列化。

序列化(Serialization)是指将结构化的对象转换为字节流以便在网络上传输或写入到磁盘进行永久存储的过程。反序列化(Deserialization)是指将字节流转换回结构化对象的逆过程。序列化用于分布式数据处理的两大领域:进程间通信和永久存储。

在Hadoop中,系统中多个节点上的进程间通信是通过RPC(远程过程调用)实现的。RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化为原始消息。通常情况下,RPC序列化格式如下。

  • 紧凑,紧凑格式能充分利用网络带宽
  • 快速,进程间通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销,这是最基本的
  • 可扩展,为了满足新的需求,协议不断变化。所以在控制客户端和服务器的过程中,需要直接引进相应的协议。例如,需要能够在方法调用的过程中增添新的参数,并且新的服务器需要能够接受来自老客户端的老格式的消息
  • 支持互操作,对于某些系统来说,希望能支持以不同语言编写的客户端与服务端交互

Hadoop使用的是自己的序列化格式Writable,它绝对紧凑、速度快,不过不太容易用Java以外的语言进行扩展或使用。

Writable是一个接口,接口内定义了两个方法:一个将数据写入DataOutput二进制流的write()方法和一个从DataInput二进制流读取数据的readFields()方法。

1
2
3
4
public interface Writable {
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}

此外,还有一个接口WritableComparable接口,同时继承了Writable接口和Comparable接口,使得实现该接口的具体实现类能够进行排序(通过实现compareTo()方法)。对于MapReduce程序来说,这非常重要,因为在shuffle阶段有个排序的过程(后续我们再详细讲解)。该接口的实现如下所示。

1
2
public interface WritableComparable<T> extends Writable, Comparable<T> {
}

Hadoop自身提供了很多Writable接口的实现类,我们可以拿过来直接使用,首先来看下对应于Java基本数据类型的实现类。

Java基本类型 Writable实现 序列化大小(字节)
boolean BooleanWritable 1
byte ByteWritable 1
short ShortWritable 2
int IntWritable 4
VintWritable 1~5
float FloatWritable 4
long LongWritable 8
VlongWritable 1~9
double DoubleWritable 8

在对整数类型(int和long)进行编码时,有两种选择,定长格式(IntWritable和LongWritable)和变长格式(VintWritable和VlongWritable)。简单来说,需要编码的数值如果相当小(-127~127),变长格式只会使用一个字节进行编码。否则,使用第一个字节来表示数值的正负和后面跟多少个字节。一般而言,如果数值的分布不均匀的情况下,使用变长格式会节省空间。

除了上面的7种基本类型外,如果要使用字符类型的话,对应的Writable实现为Text。如果要使用null,可以使用NullWritable来表示,它的序列化长度为0,不会从数据流中读取数据,也不会写入数据,仅作为一个占位符。注意,它是一个不可变的单例对象,通过调用NullWritable.get()方法获取这个实例。

另外,如果你要使用集合类型,Hadoop也提供了Writable相关的集合实现,分别是ArrayWritableArrayPrimitiveWritableTwoDArrayWritableMapWritableSortedMapWritable以及EnumMapWritable

Configuration

在使用Job对象配置程序参数之前,需要先创建Configuration对象。一个Configuration的实例表示了一组Hadoop相关的配置集合。在创建该对象时,会自动加载类路径下相关的Hadoop配置文件,包括默认配置(core-default.xml、hdfs-default.xml等)和用户自定义配置(core-site.xml、hdfs-site.xml等)。因此在提交作业时,就能够找到正确的HDFS集群地址、YARN集群地址等。

我们也可以通过该对象,设置MapReduce程序的参数,例如输入输出压缩、任务数量等。

MapReduce本地运行

在很多情况下,我们的代码都不会这么简单,如果每次修改代码都需要重新打包、上传、提交任务等步骤,效率就太低了。能否在IDE中直接运行MapReduce程序呢?

每个MapReduce都有一个入口类,该类中有main()方法,下面我们尝试直接运行该类,看看有什么效果。

第一次运行时,结果如下。

1
2
3
4
Usage: WordCountExample <input pagh> <output path>
Disconnected from the target VM, address: '127.0.0.1:5190', transport: 'socket'

Process finished with exit code 1

很正常,我们的程序需要在运行时指定输入路径和输出路径。

我当前是在Windows环境下,因此我指定的参数是:

1
F:\mr\demo01\input F:\mr\demo01\output

接下来再次运行,结果如下。

1
2
3
4
5
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:469)
at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:440)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:517)
... 14 more

原来是需要设置HADOOP_HOME环境变量啊,这个简单,将hadoop-3.1.2.tar.gz解压缩,配置操作系统的环境变量后,再次运行,结果如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Caused by: java.io.FileNotFoundException: Could not locate Hadoop executable: D:\Development\hadoop-3.1.2\bin\winutils.exe -see https://wiki.apache.org/hadoop/WindowsProblems
at org.apache.hadoop.util.Shell.getQualifiedBinInner(Shell.java:620)
at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:593)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:690)
at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:78)
at org.apache.hadoop.conf.Configuration.getBoolean(Configuration.java:1664)
at org.apache.hadoop.security.SecurityUtil.setConfigurationInternal(SecurityUtil.java:102)
at org.apache.hadoop.security.SecurityUtil.<clinit>(SecurityUtil.java:86)
at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:315)
at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:303)
at org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1827)
at org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:709)
at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:659)
at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:570)
at org.apache.hadoop.mapreduce.task.JobContextImpl.<init>(JobContextImpl.java:72)
at org.apache.hadoop.mapreduce.Job.<init>(Job.java:150)
at org.apache.hadoop.mapreduce.Job.getInstance(Job.java:193)
at org.victor.mapreduce.demo01.WordCountRunner.main(WordCountRunner.java:26)

异常信息说是找不到winutils.exe可执行文件,这是因为我们在Windows环境下运行程序,需要调用一些底层的操作系统方法,因此我们需要将该文件下载下来,放入本地解压hadoop/bin目录下。

下载地址:https://github.com/steveloughran/winutils

注意,你可能还会遇到下面的错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:640)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1223)
at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1428)
at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:468)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
at org.apache.hadoop.fs.FileSystem$4.<init>(FileSystem.java:2072)
at org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:2071)
at org.apache.hadoop.fs.ChecksumFileSystem.listLocatedStatus(ChecksumFileSystem.java:693)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:312)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:274)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:396)
at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:310)
at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:327)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:200)
at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1570)
at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1567)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1567)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1588)

这个异常看起来很奇怪,似乎是调用了NativeIO访问了Window文件系统,那么为什么会失败呢?我们可以直接找到源码中去看一下,根据异常信息,我们进入NativeIO.java的640行,代码如下。

1
2
3
4
public static boolean access(String path, AccessRight desiredAccess)
throws IOException {
return access0(path, desiredAccess.accessRight());
}

这里访问了access0()方法,我们再来看看这个方法。

1
2
3
/** Windows only method used to check if the current process has requested
* access rights on the given path. */
private static native boolean access0(String path, int requestedAccess);

这个方法是一个native方法,从方法的说明上来看,该方法只是在Windows环境下调用的,用于检查是否有权限去访问给定的路径。

我就是在本地调试的,还检查什么访问权限?不需要搞那么麻烦吧。

这里给出一个最简单的解决方法。在当前项目中创建一个包,名称为org.apache.hadoop.io.nativeio,然后将NativeIO这个类,直接复制到该包下(一定要确保包名、类名一致)。然后找到access()方法,直接返回true,不要去做本地检查了,代码如下。

1
2
3
4
5
public static boolean access(String path, AccessRight desiredAccess)
throws IOException {
// return access0(path, desiredAccess.accessRight());\
return true;
}

再次运行程序,不出意外,程序就可以运行成功了。运行结果在F:\mr\demo01\output路径下,我们可以进入该目录直接查看。

MapReduce程序为什么可以在本地运行呢?本地又没有安装Hadoop环境。

因为MapReduce在执行时,会检查mapreduce.framework.name配置,该配置项默认为local,而集群上我们改为了yarn。local的意思是指,将MapReduce程序提交给本地的LocalJobRunner执行,实际上就是搞了一个线程池,每个任务使用一个线程来执行,方便本地开发调试使用的。

同样的道理,默认的配置中fs.defaultFS值为file:///,表示本地文件系统,因此MapReduce程序只能访问本地文件系统中路径,那么我们在本地IDE运行时,能否访问HDFS文件系统呢?

根据推断,我们只需要将该参数设置为HDFS就可以了,如下所示:

1
2
3
// 创建任务对象,用于配置任务参数
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop01:9000");

在启动程序时,指定HDFS对应的路径即可。

1
/mr/demo01/input /mr/demo01/output2

这样的话,程序在运行时会读取HDFS中的数据,处理完成后,再将数据写回HDFS。不过这样做似乎没什么意义,调试的话,直接使用本地文件系统就好了,使用HDFS上的数据,还需要经过网络传输。

注意,如果发现无法执行任务,可以尝试设置HADOOP_USER_NAME环境变量为hadoop,或者在集群的hdfs-site.xml中,将dfs.permissions.enabled配置项设置为false,重启集群即可。

使用ToolRunner

有时候我们需要通过命令行提交程序时,为Configuration对象设置一些特别的配置,例如任务的内存大小、任务需要的CPU个数、输出数据是否压缩等,而不是将这些配置硬编码到代码中。

当然,我们可以使用args的方式获取到对应位置的参数,然后设置到Configuration对象中,不过这样很麻烦,尤其是当要设置的参数很多的时候。

为了简化命令行方式运行作业,Hadoop自带了一些辅助类。GenericOptionsParser是一个类,用来解释常用的Hadoop命令行选项,并根据需要,为Configuration对象设置相应的值。不过通常不会直接使用该类,更方便的方式是实现Tool接口,通过ToolRunner来运行程序。在ToolRunner内部会调用GenericOptionsParser

下面我们通过ToolRunner的方式将WordCount程序改写,实际上只需要改动程序入口类即可,Mapper和Reducer无需改动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class WordCountRunner extends Configured implements Tool {

@Override
public int run(String[] args) throws Exception {
// 数据的路径由程序参数指定
if (args.length != 2) {
System.out.println("Usage: WordCountExample <input path> <output path>");
System.exit(1);
}
// 获取Job实例
// 注意,由于继承了Configured类,该类中提供了对Configuration的set/get方法。
// ToolRunner会调用setConf()方法将创建的Configuration实例注入
Job job = Job.getInstance(this.getConf());
// 给任务取个名称
job.setJobName("WordCountExample");
// 设置入口类
job.setJarByClass(WordCountReducer.class);

// 设置输入
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));
// 设置Mapper任务类
job.setMapperClass(WordCountMapper.class);
// 设置Mapper任务的输出数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置Reducer任务类
job.setReducerClass(WordCountReducer.class);
// 设置Reducer任务的输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));
// 等待任务执行结束
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}

public static void main(String[] args) throws Exception {
// 调用ToolRunner的run()方法,默认会创建一个Configuration实例
int code = ToolRunner.run(new WordCountRunner(), args);
System.exit(code);
}
}

将程序打包,提交到集群上运行,并且设置自定义参数,将输出的结果文件使用Snappy进行压缩。

1
2
3
4
5
6
$ bin/hadoop jar mr/hadoop-example-1.0.jar org.victor.mapreduce.demo02.WordCountRunner -D mapreduce.output.fileoutputformat.compress=true -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec /mr/demo01/input /mr/demo01/output3

$ bin/hadoop fs -ls /mr/demo01/output3
Found 2 items
-rw-r--r-- 3 hadoop supergroup 0 2019-03-07 18:09 /mr/demo01/output3/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 65 2019-03-07 18:09 /mr/demo01/output3/part-r-00000.snappy

使用ToolRunner后,我们可以通过命令行设置下列选项:

选项名称 描述
-D property=value 将指定值赋给某个Hadoop配置属性,覆盖配置文件里的属性或在site文件里的属性。
-conf filename 将指定文件添加到配置的资源列表中,如果要设置多个属性,可以通过这种方式
-fs uri 用指定的URI设置默认的文件系统,相当于-D fs.default.FS=uri
-jt host:port 指定YARN的主机和端口号,相当于-D yarn.resourcemanager.address=host:port
-files file1,file2,… 从本地文件系统(或任意指定模式的文件系统)中复制指定文件到MapReduce所用的共享文件系统(通常是HDFS),确保在任务工作目录的MapReduce程序可以访问这些文件
-archives archive1,archive2,… 从本地文件系统(或任意指定模式的文件系统)复制指定的归档文件到MapReduce所用的共享文件系统(通常是HDFS),打开归档文件,确保在任务工作目录的MapReduce程序可以访问这些归档
-libjars jar1,jar2,… 从本地文件系统(或任意指定模式的文件系统)复制JAR文件到被MapReduce使用的共享文件系统(通常是HDFS),把它们加入MapReduce任务的类路径中,这个选项适用于传输作业需要的JAR文件

小结

在本章中,我们对前面编写的WordCount程序进行了详细剖析,在介绍MapReduce整体结构的同时,引出了Writeable、Configuration类的介绍与使用。为了方便调试,我们又配置了Windows的本地MapReduce程序运行环境。最后,我们通过使用ToolRunner辅助类改写了WordCount程序的入口类,该辅助类能够很好的帮助我们解析命令行参数,并自动注入到Configuration实例中。

如果您觉得不错,请赞赏一下!