【Hadoop09】:MapReduce执行流程分析

前言

在上一章中,我们对WordCount程序进行了详细的分析,并简单介绍了WordCount程序的运行流程。今天我们从来详细分析下MapReduce的执行流程,并引出一些核心的概念与组件。本章非常重要,理解了运行机制和原理,不仅有助于排查故障,更能够根据需要进行系统调优。

在MapReduce中,最重要的一个环节是Shuffle,深刻理解Shuffle机制对于理解MapReduce程序的执行流程非常重要,此外,Shuffle的调优也是MapReduce中非常重要的环节,只有理解了原理,才能有针对的设置参数。

MapReduce执行流程

MapReduce任务的执行流程可以分为五个步骤:

  • 数据输入,如何读取数据
  • 执行Map任务,自定义的Map处理逻辑
  • Shuffle,数据分区、分组、排序、合并等
  • 执行Reduce任务,自定义的Reduce处理逻辑
  • 数据输出,如何输出数据

我们将Shuffle这一过程定义为:从Map处理逻辑的输出,到Reduce处理逻辑输入这一部分。从图中可以看到,这一部分涉及到的过程非常复杂,也是MapReduce程序的核心所在。

由于Map处理逻辑和Reduce处理逻辑是我们自己编写的,无需过多说明。下面我们主要对输入分片、Shuffle以及输出三个部分进行分析。

输入分片

分片概念

什么是输入分片?我们在介绍MapReduce编程模型时说过,Map任务是一组独立、没有互相依赖、可并行执行的逻辑。还记得数苹果的例子吗?

在图中,四框苹果交给四个人去数,是按照”框”去划分处理的边界。假如我将四框苹果交给八个人去数,每人数半框,是不是效率更高?

输入分片和上面划分的概念类似,它是指将输入的数据,划分为N个分片,每个分片能够由一个Map任务单独处理。因此,输入分片就决定了Map任务的个数。

需要注意,输入分片并不是真正的将输入的数据源真正的切分为一个个小文件了,它只是一种划分逻辑,举例来说,一个书架上有三层书,找三个人分别数每一层有多少本,因此每个人只要从指定的位置开始数就行了,而不是真正的将书架上的书全部卸下来。如果输入的数据源是HDFS的文本文件,那就可以按照偏移量来划分任务,例如任务A负责偏移量0-999的数据,任务B负责偏移量1000-1999的数据,任务C负责剩下偏移量2000-2500的数据。

对于不同的输入有不同的划分规则,例如对于HDFS上的压缩文件来说,如果该压缩算法是不能够被切分的,那么就不能进行输入分片了,所有的数据只能由一个Map任务来处理。

TextInputFormat

下面我们以TextInputFormat为例,来看一下它是如何做输入分片的。

在MapReduce程序的入口类中,需要定义Map任务的输入路径,不知道大家是否还有印象^_^。回顾下代码吧。

1
2
3
// 设置输入
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));

在分析输入分片之前,先简单介绍一下该类。

TextInputFormat是InputFormat对于普通文本文件的一种实现,文件可以被划分为多行,以换行符或回车符作为行与行之间的分割。通过该类读取到的数据Key-Value分别代表当前行在文件中的位置以及当前行的文本数据。

举例来说,假设文本文件内容如下:

1
2
3
hello world hdfs
yarn mapreduce
hadoop hdfs

那么通过TextInputFormat以Key-Value的形式交给Map任务去处理。

1
2
3
(0, "hello world hdfs")
(17, "yarn mapreduce")
(32, "hadoop hdfs")

再次强调,Key并不是行号。一般情况下,很难获取到行号,因为文件按字节而不是按行切分为分片。每个分片单独处理。行号实际上是一个顺序的标记,即每次读取一行的时候需要对行号进行计数。因此,在分片内知道行号是可能的,但在文件中是不可能的。不过还好,每一行在文件中的偏移量是可以在分片内单独确定的,而不需要知道分片的信息。因为每个分片都知道上一个分片的大小,只需要加到分片内偏移量上,就可以获取每行在整个文件中的偏移量了。

源码分析

下面我们来看一下TextInputFormat的源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
// 这里创建了一个RecordReade的实例对象LineRecordReader,
// 用于读取分片数据
@Override
public RecordReader<LongWritable, Text>
createRecordReader(InputSplit split,
TaskAttemptContext context) {
String delimiter = context.getConfiguration().get(
"textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
return new LineRecordReader(recordDelimiterBytes);
}

// 这里用于确定输入的文件是否可以被切分,如果文件没有压缩,则认为可以切分,
// 如果文件是压缩的,那么判断对应的codec是否支持切分(还记得bzip2吗)
@Override
protected boolean isSplitable(JobContext context, Path file) {
final CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}

}

下面我们看一下LineRecordReader的初始化方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
// 分片起始位置
start = split.getStart();
// 分片结束位置
end = start + split.getLength();
// 要读取的数据文件
final Path file = split.getPath();

// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
// 获取文件对应的codec
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
// 如果codec不为null,说明数据文件是压缩文件
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
// 判断压缩算法是否支持切分
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new CompressedSplitLineReader(cIn, job,
this.recordDelimiterBytes);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
// 如果压缩算法不支持切分,并且不是从头开始读取
// 这样不行啊小老弟,抛出异常吧
if (start != 0) {
// So we have a split that is only part of a file stored using
// a Compression codec that cannot be split.
throw new IOException("Cannot seek in " +
codec.getClass().getSimpleName() + " compressed stream");
}
// 从头开始读取
in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;
}
} else {
// 非压缩文件,直接seek到起始位置就行了
fileIn.seek(start);
in = new UncompressedSplitLineReader(
fileIn, job, this.recordDelimiterBytes, split.getLength());
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
// 注意这里,如果当前读取的不是第一个分片的话,将第一行数据丢弃,
// 因为除了最后一个分片外,在next()方法中会额外读取一行
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}

从代码中我们看出,对于LineRecordReader来说,分片就是某个数据文件的一部分,有起始位置和结束位置,只需要通过seek()方法找到起始位置,不断的读取就行了。压缩文件的话,处理会稍微复杂一些,不过整体逻辑是类似的。

诶,分片的逻辑在哪呢?TextInputFormat中只有判断是否支持分片的方法和创建RecordReader的方法,而RecordReader就开始准备读数据了,没有看到分片的逻辑啊。

分片的逻辑实际上在父类FileInputFormat类中实现的,因为无论是TextInputFormat,还是KeyValueTextInputFormat(文件中的数据是Key-Value格式的),它们的分片逻辑都是相同的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public List<InputSplit> getSplits(JobContext job) throws IOException {
// 一个计时器,用于计算方法的执行时间
StopWatch sw = new StopWatch().start();
// 获取分片的最小值,如果没有配置(mapreduce.input.fileinputformat.split.minsize)就是1
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
// 获取分片的最大值,如果没有配置(mapreduce.input.fileinputformat.split.maxsize)就是Long.MAX_VALUE
long maxSize = getMaxSplitSize(job);

// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
// 从配置的输入路径中获取到路径下所有的文件
// 还记得我们是如何配置输入路径的吗?
List<FileStatus> files = listStatus(job);

boolean ignoreDirs = !getInputDirRecursive(job)
&& job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
// 开始遍历文件
for (FileStatus file: files) {
if (ignoreDirs && file.isDirectory()) {
continue;
}
// 获取路径
Path path = file.getPath();
// 获取长度
long length = file.getLen();
if (length != 0) {
// 获取文件的数据块分布信息
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
// 如果文件是可以切分的,注意,FileInputFormat的默认实现为true,
// 子类中如TextInputFormat会根据文件是否压缩来判断
if (isSplitable(job, path)) {
// 获取文件的数据块大小(一个文件上传至HDFS会被切分为多个数据块,默认为128MB,还记得吗?)
long blockSize = file.getBlockSize();
// 根据配置的分块最小值、最大值以及数据块大小计算出分片的大小
// Math.max(minSize, Math.min(maxSize, blockSize));
// 因此,如果我们不配置分片的最大最小值,分片的大小就等于HDFS数据块的大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
// 开始切分了
// 剩余大小
long bytesRemaining = length;
// 如果剩余大小除以分片大小大于1.1
// 假如一个文件是500MB,HDFS数据块大小为128MB,那么
// 524288000 / 134217728 = 3.90625 > 1.1
// 如果一个文件是50MB,HDFS数据块大小为128MB,那么
// 52428800 / 134217728 = 0.390625 < 1.1
// 实际上就是说,如果文件的大小在140MB(HDFS数据块大小*1.1)左右,
// 就会将整个文件作为一个分片来处理,否则的话,就会划分为多个分片。
// 因为如果文件大小稍微超过了一点数据块的大小,如果进行了分片划分,意味着就多了一个Map任务,
// 而这个Map任务处理的数据量非常少,没必要这样做。
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
// 找到分片数据所属的数据块,遍历一下所有的数据块就能找到了
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
// 创建分片,指定文件路径、起始位置、分片大小、数据块所在的节点等信息
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
// 减去已经划分的大小
bytesRemaining -= splitSize;
}
// 如果文件小于140MB或者在上面的划分逻辑后,还剩下部分数据
// 将这部分数据作为一个分片
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
// 文件不支持切分,因此会将整个文件作为一个分片
if (LOG.isDebugEnabled()) {
// Log only if the file is big enough to be splitted
if (length > Math.min(file.getBlockSize(), minSize)) {
LOG.debug("File is not splittable so no parallelization "
+ "is possible: " + file.getPath());
}
}
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
// 文件的大小为0,但是也得作为一个分片啊,不能随便给我跳过去把小老弟
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
// 将分片的数量设置到Configuration实例中
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}

从分片的划分逻辑中,我们可以得出如下结论:

  1. 如果输入文件有多个,那么分片的数量肯定有多个,无论文件大小或是否可切分
  2. 如果单个文件的大小超过了数据块的大小的1.1倍,那么就会划分为多个分片
  3. 分片的大小和HDFS数据块的大小息息相关(如果不进行配置的话)

下面我们使用画图的方式,将分片的逻辑进行描述,加深印象。

MapReduce可以处理不同类型的数据格式,以及不同的数据源。不过使用最多的就是对数据文件进行处理,下面我们来看一下相关类的层次结构。

有了输入分片后,接下来就可以执行我们自定义的Map处理逻辑了。每一个分片都会对应一个RecordReader,例如TextInputFormat对应的是LineRecordReader。框架会循环调用RecordReader的nextKeyValue()方法,获取到分片内的输入数据,然后调用我们在Mapper类中重写的map()方法,将数据交给我们的逻辑进行处理。

Shuffle

在我们自定义的map()方法中,将数据处理完成后,调用context.write()方法将处理后的数据输出,这时就进入到了shuffle阶段。再次强调,shuffle阶段是指从Map任务的输入到Reduce任务的输入中间的过程。

Map端

当map()方法开始产生输出时,并不是简单地将数据写到磁盘。数据首先会被写入到一个缓冲区中,然后再被刷到磁盘上。数据可以在内存缓冲区中进行预排序,能够很大程度的提升效率。

每个Map任务都有一个对应的缓冲区,默认情况下,缓冲区的大小为100MB,这个值可以通过改变mapreduce.task.io.sort.mb属性来调整。一旦缓冲区的内容达到了阈值(mapreduce.map.sort.spill.percent,默认为0.8,也就是80%),一个后台线程便开始把内容写到磁盘上。在写磁盘的过程中,Map任务可以继续将数据写到缓冲区,但如果在此期间缓冲区被填满,Map会被阻塞直到写磁盘过程完成。写入到磁盘上的数据被存放在mapreduce.cluster.local.dir属性配置的目录下。

在写磁盘之前,线程首先根据数据最终要传输的Reducer把数据划分成相应的分区(Partition)。在每个分区中,后台线程按照Key进行内存中排序,如果有一个Combiner函数(后续详解),它就在排序后的输出上运行。运行Combiner函数使得Map输出结果更紧凑,因此减少写到磁盘的数据和传递给Reducer的数据。

每次内存缓冲区达到溢出阈值,就会新建一个溢出文件,因此在map任务写完其最后一个输出记录后,会有多个溢出文件。在任务完成之前,溢出文件被合并成一个已经分区且已经排序的输出文件。配合属性mapreduce.task.io.sort.factor控制着一次最多能合并多少个文件,默认是10。

如果至少存在3个(mapreduce.map.combine.minspills属性)溢出文件时,则Combiner函数就会在输出文件写到磁盘之前再次运行。前面说过,Combiner可以在输入上反复运行,但并不影响最终结果。如果只有1或2个溢出文件,那么由于Map输出规模减少,因而不值得调用Combiner带来的开销,因此不会为该Map输出再次运行Combiner。

将Map任务写到磁盘上的数据进行压缩会带来性能提升,因为压缩后数据量减少,写磁盘和网络传输的数据量就少了。默认情况下,输出是不压缩的,将mapreduce.map.output.compress设置为true,就可以启动该功能。另外还可以选择对应的压缩算法,由mapreduce.map.output.compress.codec指定。

Reduce端

现在转到处理过程的Reduce部分。Map输出文件位于运行Map任务节点上的本地磁盘。Reduce任务需要集群上若干个Map任务的Map输出作为其特殊的分区文件。每个Map任务的完成时间可能不同,因此在每个任务完成时,Reduce任务就开始复制其输出(通过HTTP的方式)。Reduce任务有少量的复制线程,因此能够并行取得Map输出,默认值是5个线程,可以通过mapreduce.reduce.shuffle.parallelcopies来设置。

如果Map的输出相当小,会被复制到Reduce任务JVM的内存(缓冲区大小由mapreduce.reduce.shuffle.input.buffer.percent属性控制,指定用于此用途的堆空间的百分比),否则,Map输出会被复制到Reduce任务所在节点的磁盘上。一旦内存缓冲区达到阈值大小(由mapreduce.reduce.shuffle.merge.percent决定)或达到Map输出阈值(由mapreduce.reduce.merge.inmem.threshold控制),则合并后溢出写到磁盘中。如果指定Combiner,则在合并期间运行它以降低写入磁盘的数据量。

随着磁盘上副本增多,后台线程会将它们合并为更大的、排好序的文件。这会为后面的合并节省一些时间。注意,为了合并,压缩的Map输出都必须在内存中被解压。

复制完所有Map输出后,Reduce任务进入排序阶段(更恰当的说法是合并阶段,因为排序是在Map端进行的),这个阶段将合并Map输出,维持其顺序排序。这是循环进行的。比如,如果有50个Map输出,而合并因子是10(由mapreduce.task.io.sort.factor属性控制),合并将进行5趟,每趟将10个文件合并成一个文件,因此最后有5个中间文件。

在最后阶段,即Reduce阶段,直接把数据输入reduce()方法,从而省略了一次磁盘往返行程,并没有将这5个文件合并成一个已排序的文件作为最后一趟。最后的合并可以来自内存和磁盘片段。

每趟合并的文件数实际上和我们举例说明的有所不同。目标是合并最小数量的文件以便满足最后一趟的合并系数。因此如果有40个文件,我们不会在四趟中每趟合并10个文件从而得到4个文件。相反,第一趟只合并4个文件,随后的三趟合并完整的10个文件。在最后一趟中,4个已合并的文件和余下的6个(未合并的)文件合计10个文件。如图所示。

注意,这并没改变合并次数,它只是一个优化措施,目的是尽量减少写到磁盘的数据量,因为最后一趟总是直接合并到Reduce。

数据输出

数据输出相对于输入就简单多了,因为没有分片的概念。对于前面介绍的输入格式,都有对应的输出格式。还记得我们是如何设置数据输出的吗?回顾一下代码。

1
2
3
4
// 设置输出格式化类
job.setOutputFormatClass(TextOutputFormat.class);
// 设置输出路径
TextOutputFormat.setOutputPath(job, new Path(args[1]));

我们首先来看一下关于输出相关的类层次结构图。

有了前面的铺垫后,我们可以直接看TextOutputFormat的源码,分析具体的步骤。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
public static String SEPARATOR = "mapreduce.output.textoutputformat.separator";
/**
* @deprecated Use {@link #SEPARATOR}
*/
@Deprecated
public static String SEPERATOR = SEPARATOR;
// 写入动作是由LineRecordWriter来完成的
protected static class LineRecordWriter<K, V>
extends RecordWriter<K, V> {
// 行分隔符
private static final byte[] NEWLINE =
"\n".getBytes(StandardCharsets.UTF_8);
// 实际上就是我们在学习HDFS时使用的数据流写入
protected DataOutputStream out;
// key-value分隔符,默认是\t
private final byte[] keyValueSeparator;

public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
this.keyValueSeparator =
keyValueSeparator.getBytes(StandardCharsets.UTF_8);
}

public LineRecordWriter(DataOutputStream out) {
this(out, "\t");
}

/**
* Write the object to the byte stream, handling Text as a special
* case.
* @param o the object to print
* @throws IOException if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(StandardCharsets.UTF_8));
}
}

public synchronized void write(K key, V value)
throws IOException {

boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
// 写入key
if (!nullKey) {
writeObject(key);
}
// 写入分隔符
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
// 写入value
if (!nullValue) {
writeObject(value);
}
// 写入换行符
out.write(NEWLINE);
}

public synchronized
void close(TaskAttemptContext context) throws IOException {
out.close();
}
}

// 创建LineRecordWriter
public RecordWriter<K, V>
getRecordWriter(TaskAttemptContext job
) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
// 默认的key-value分隔符是\t
String keyValueSeparator= conf.get(SEPARATOR, "\t");
CompressionCodec codec = null;
String extension = "";
// 如果设置了输出压缩
if (isCompressed) {
// 获取编码器,如果没有指定就用默认的Gzip
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = ReflectionUtils.newInstance(codecClass, conf);
// 文件后缀名,如.gz、.snappy等
extension = codec.getDefaultExtension();
}
// 创建文件,获取到输出流
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, false);
if (isCompressed) {
return new LineRecordWriter<>(
new DataOutputStream(codec.createOutputStream(fileOut)),
keyValueSeparator);
} else {
return new LineRecordWriter<>(fileOut, keyValueSeparator);
}
}
}

小结

在本章中,我们介绍了MapReduce程序的执行流程,大致可以分为输入、Map逻辑、Shuffle、Reduce逻辑和输出五个部分。在输入部分上,涉及到了数据分片,我们介绍了分片的概念,并分析了分片的处理流程。Shuffle部分是MapReduce最核心的知识点,理解Shuffle的概念对于任务的故障排查、调优等有很大帮助。输出与输入相互对应,支持各种格式的输出,不过相对比较简单。

如果您觉得不错,请赞赏一下!