前言
在上一章中,我们对WordCount程序进行了详细的分析,并简单介绍了WordCount程序的运行流程。今天我们从来详细分析下MapReduce的执行流程,并引出一些核心的概念与组件。本章非常重要,理解了运行机制和原理,不仅有助于排查故障,更能够根据需要进行系统调优。
在MapReduce中,最重要的一个环节是Shuffle
,深刻理解Shuffle机制对于理解MapReduce程序的执行流程非常重要,此外,Shuffle的调优也是MapReduce中非常重要的环节,只有理解了原理,才能有针对的设置参数。
MapReduce执行流程
MapReduce任务的执行流程可以分为五个步骤:
数据输入
,如何读取数据执行Map任务
,自定义的Map处理逻辑Shuffle
,数据分区、分组、排序、合并等执行Reduce任务
,自定义的Reduce处理逻辑数据输出
,如何输出数据
我们将Shuffle这一过程定义为:从Map处理逻辑的输出,到Reduce处理逻辑输入这一部分。从图中可以看到,这一部分涉及到的过程非常复杂,也是MapReduce程序的核心所在。
由于Map处理逻辑和Reduce处理逻辑是我们自己编写的,无需过多说明。下面我们主要对输入分片、Shuffle以及输出三个部分进行分析。
输入分片
分片概念
什么是输入分片?我们在介绍MapReduce编程模型时说过,Map任务是一组独立、没有互相依赖、可并行执行的逻辑。还记得数苹果的例子吗?
在图中,四框苹果交给四个人去数,是按照”框”去划分处理的边界。假如我将四框苹果交给八个人去数,每人数半框,是不是效率更高?
输入分片和上面划分的概念类似,它是指将输入的数据,划分为N个分片,每个分片能够由一个Map任务单独处理。因此,输入分片就决定了Map任务的个数。
需要注意,输入分片并不是真正的将输入的数据源真正的切分为一个个小文件了,它只是一种划分逻辑,举例来说,一个书架上有三层书,找三个人分别数每一层有多少本,因此每个人只要从指定的位置
开始数就行了,而不是真正的将书架上的书全部卸下来。如果输入的数据源是HDFS的文本文件,那就可以按照偏移量来划分任务,例如任务A负责偏移量0-999的数据,任务B负责偏移量1000-1999的数据,任务C负责剩下偏移量2000-2500的数据。
对于不同的输入有不同的划分规则,例如对于HDFS上的压缩文件来说,如果该压缩算法是不能够被切分的,那么就不能进行输入分片了,所有的数据只能由一个Map任务来处理。
TextInputFormat
下面我们以TextInputFormat
为例,来看一下它是如何做输入分片的。
在MapReduce程序的入口类中,需要定义Map任务的输入路径,不知道大家是否还有印象^_^。回顾下代码吧。
1 | // 设置输入 |
在分析输入分片之前,先简单介绍一下该类。
TextInputFormat是
InputFormat
对于普通文本文件的一种实现,文件可以被划分为多行,以换行符或回车符作为行与行之间的分割。通过该类读取到的数据Key-Value分别代表当前行在文件中的位置以及当前行的文本数据。
举例来说,假设文本文件内容如下:
1 | hello world hdfs |
那么通过TextInputFormat以Key-Value的形式交给Map任务去处理。
1 | (0, "hello world hdfs") |
再次强调,Key并不是行号。一般情况下,很难获取到行号,因为文件按字节而不是按行切分为分片。每个分片单独处理。行号实际上是一个顺序的标记,即每次读取一行的时候需要对行号进行计数。因此,在分片内知道行号是可能的,但在文件中是不可能的。不过还好,每一行在文件中的偏移量是可以在分片内单独确定的,而不需要知道分片的信息。因为每个分片都知道上一个分片的大小,只需要加到分片内偏移量上,就可以获取每行在整个文件中的偏移量了。
源码分析
下面我们来看一下TextInputFormat的源码。
1 | public class TextInputFormat extends FileInputFormat<LongWritable, Text> { |
下面我们看一下LineRecordReader的初始化方法。
1 | public void initialize(InputSplit genericSplit, |
从代码中我们看出,对于LineRecordReader来说,分片就是某个数据文件的一部分,有起始位置和结束位置,只需要通过seek()
方法找到起始位置,不断的读取就行了。压缩文件的话,处理会稍微复杂一些,不过整体逻辑是类似的。
诶,分片的逻辑在哪呢?TextInputFormat中只有判断是否支持分片的方法和创建RecordReader的方法,而RecordReader就开始准备读数据了,没有看到分片的逻辑啊。
分片的逻辑实际上在父类FileInputFormat
类中实现的,因为无论是TextInputFormat,还是KeyValueTextInputFormat(文件中的数据是Key-Value格式的),它们的分片逻辑都是相同的。
1 | public List<InputSplit> getSplits(JobContext job) throws IOException { |
从分片的划分逻辑中,我们可以得出如下结论:
- 如果输入文件有多个,那么分片的数量肯定有多个,无论文件大小或是否可切分
- 如果单个文件的大小超过了数据块的大小的1.1倍,那么就会划分为多个分片
- 分片的大小和HDFS数据块的大小息息相关(如果不进行配置的话)
下面我们使用画图的方式,将分片的逻辑进行描述,加深印象。
MapReduce可以处理不同类型的数据格式,以及不同的数据源。不过使用最多的就是对数据文件进行处理,下面我们来看一下相关类的层次结构。
有了输入分片后,接下来就可以执行我们自定义的Map处理逻辑了。每一个分片都会对应一个RecordReader
,例如TextInputFormat对应的是LineRecordReader。框架会循环调用RecordReader的nextKeyValue()
方法,获取到分片内的输入数据,然后调用我们在Mapper类中重写的map()方法,将数据交给我们的逻辑进行处理。
Shuffle
在我们自定义的map()方法中,将数据处理完成后,调用context.write()
方法将处理后的数据输出,这时就进入到了shuffle阶段。再次强调,shuffle阶段是指从Map任务的输入到Reduce任务的输入中间的过程。
Map端
当map()方法开始产生输出时,并不是简单地将数据写到磁盘。数据首先会被写入到一个缓冲区中,然后再被刷到磁盘上。数据可以在内存缓冲区中进行预排序,能够很大程度的提升效率。
每个Map任务都有一个对应的缓冲区,默认情况下,缓冲区的大小为100MB,这个值可以通过改变mapreduce.task.io.sort.mb
属性来调整。一旦缓冲区的内容达到了阈值(mapreduce.map.sort.spill.percent
,默认为0.8,也就是80%),一个后台线程便开始把内容写到磁盘上。在写磁盘的过程中,Map任务可以继续将数据写到缓冲区,但如果在此期间缓冲区被填满,Map会被阻塞直到写磁盘过程完成。写入到磁盘上的数据被存放在mapreduce.cluster.local.dir
属性配置的目录下。
在写磁盘之前,线程首先根据数据最终要传输的Reducer把数据划分成相应的分区
(Partition)。在每个分区中,后台线程按照Key进行内存中排序,如果有一个Combiner函数
(后续详解),它就在排序后的输出上运行。运行Combiner函数使得Map输出结果更紧凑,因此减少写到磁盘的数据和传递给Reducer的数据。
每次内存缓冲区达到溢出阈值,就会新建一个溢出文件,因此在map任务写完其最后一个输出记录后,会有多个溢出文件。在任务完成之前,溢出文件被合并
成一个已经分区且已经排序
的输出文件。配合属性mapreduce.task.io.sort.factor
控制着一次最多能合并多少个文件,默认是10。
如果至少存在3个(mapreduce.map.combine.minspills
属性)溢出文件时,则Combiner函数就会在输出文件写到磁盘之前再次运行。前面说过,Combiner可以在输入上反复运行,但并不影响最终结果。如果只有1或2个溢出文件,那么由于Map输出规模减少,因而不值得调用Combiner带来的开销,因此不会为该Map输出再次运行Combiner。
将Map任务写到磁盘上的数据进行压缩
会带来性能提升,因为压缩后数据量减少,写磁盘和网络传输的数据量就少了。默认情况下,输出是不压缩的,将mapreduce.map.output.compress
设置为true,就可以启动该功能。另外还可以选择对应的压缩算法,由mapreduce.map.output.compress.codec
指定。
Reduce端
现在转到处理过程的Reduce部分。Map输出文件位于运行Map任务节点上的本地磁盘。Reduce任务需要集群上若干个Map任务的Map输出作为其特殊的分区文件。每个Map任务的完成时间可能不同,因此在每个任务完成时,Reduce任务就开始复制其输出(通过HTTP的方式)。Reduce任务有少量的复制线程,因此能够并行取得Map输出,默认值是5个线程,可以通过mapreduce.reduce.shuffle.parallelcopies
来设置。
如果Map的输出相当小,会被复制到Reduce任务JVM的内存(缓冲区大小由mapreduce.reduce.shuffle.input.buffer.percent
属性控制,指定用于此用途的堆空间的百分比),否则,Map输出会被复制到Reduce任务所在节点的磁盘上。一旦内存缓冲区达到阈值大小(由mapreduce.reduce.shuffle.merge.percent
决定)或达到Map输出阈值(由mapreduce.reduce.merge.inmem.threshold
控制),则合并后溢出写到磁盘中。如果指定Combiner,则在合并期间运行它以降低写入磁盘的数据量。
随着磁盘上副本增多,后台线程会将它们合并为更大的、排好序的文件。这会为后面的合并节省一些时间。注意,为了合并,压缩的Map输出都必须在内存中被解压。
复制完所有Map输出后,Reduce任务进入排序阶段(更恰当的说法是合并阶段,因为排序是在Map端进行的),这个阶段将合并Map输出,维持其顺序排序。这是循环进行的。比如,如果有50个Map输出,而合并因子是10(由mapreduce.task.io.sort.factor
属性控制),合并将进行5趟,每趟将10个文件合并成一个文件,因此最后有5个中间文件。
在最后阶段,即Reduce阶段,直接把数据输入reduce()方法,从而省略了一次磁盘往返行程,并没有将这5个文件合并成一个已排序的文件作为最后一趟。最后的合并可以来自内存和磁盘片段。
每趟合并的文件数实际上和我们举例说明的有所不同。目标是合并最小数量的文件以便满足最后一趟的合并系数。因此如果有40个文件,我们不会在四趟中每趟合并10个文件从而得到4个文件。相反,第一趟只合并4个文件,随后的三趟合并完整的10个文件。在最后一趟中,4个已合并的文件和余下的6个(未合并的)文件合计10个文件。如图所示。
注意,这并没改变合并次数,它只是一个优化措施,目的是尽量减少写到磁盘的数据量,因为最后一趟总是直接合并到Reduce。
数据输出
数据输出相对于输入就简单多了,因为没有分片的概念。对于前面介绍的输入格式,都有对应的输出格式。还记得我们是如何设置数据输出的吗?回顾一下代码。
1 | // 设置输出格式化类 |
我们首先来看一下关于输出相关的类层次结构图。
有了前面的铺垫后,我们可以直接看TextOutputFormat
的源码,分析具体的步骤。
1 | public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> { |
小结
在本章中,我们介绍了MapReduce程序的执行流程,大致可以分为输入、Map逻辑、Shuffle、Reduce逻辑和输出五个部分。在输入部分上,涉及到了数据分片,我们介绍了分片的概念,并分析了分片的处理流程。Shuffle部分是MapReduce最核心的知识点,理解Shuffle的概念对于任务的故障排查、调优等有很大帮助。输出与输入相互对应,支持各种格式的输出,不过相对比较简单。