前言
前几章学习了YARN的基本概念与调度器,本章我们来学习MapReduce任务在提交时的流程,在提交时如何与YARN进行交互,并且讨论在任务运行过程中,各个组件如何进行故障恢复。
MapReduce作业运行机制
在之前的案例中,我们通过Job
对象的waitForCompletion()
方法(实际上内部调用了submit()
方法)来提交作业,并等待任务运行完成。对于客户端而言,只是一个方法的调用,但是其内部封装了大量的处理细节。
任务提交时,涉及到以下几个部分。
- 客户端,提交MapReduce作业
- ResourceManager,负责协调集群上计算机资源的分配
- NodeManager,负责启动和监视集群中机器上的计算容器(Container)
- MapReduce的Application Master,负责协调运行MapReduce作业的任务。它和MapReduce任务在容器中运行,这些容器由ResourceManager分配并由NodeManager进行管理
- 分布式文件系统,一般为HDFS,用来与其他实体间共享作业文件
作业的提交
Job的submit()
方法创建一个内部的JobSubmitter
实例,并且调用其submitJobInternal()
方法。提交作业后,waitForCompletion()
每秒轮询作业的进度,如果发现自上次报告后有改变,便把进度报告到控制台。作业完成后,如果成功,就显示作业计数器;如果失败,则导致作业失败的错误被记录到控制台。
下图是MapReduce作业提交的流程。
JobSubmitter所实现的作业提交过程如下所述。
- 向ResourceManager请求一个新的应用ID,用于MapReduce作业ID
- 检查作业的输出说明。例如,如果没有指定输出目录或输出目录已经存在,作业就不提交,错误抛回给MapReduce程序
- 计算作业的输入分片,如果分片无法计算,比如因为输入路径不存在,作业就不提交,错误返回给MapReduce程序
- 将运行作业所需要的资源(包括作业Jar包、配置文件和计算所得的输入分片)复制到一个以作业ID命名的目录下的共享文件系统中。作业Jar的副本数比较多(由
mapreduce.client.submit.file.replication
属性控制,默认值为10),因此在运行作业的任务时,集群中有很多个副本可供NodeManager访问 - 通过调用ResourceManager的
submitApplication()
方法提交作业
作业的初始化
ResourceManager收到调用它的submitApplication()
消息后,便将请求传递给YARN调度器。调度器分配一个容器,由NodeManager启动容器运行Application Master进程。
MapReduce作业的Application Master是一个Java应用程序,它的主类是MRAppMaster
。由于将接受来自任务的进度和完成报告,因此Application Master对作业的初始化是通过创建多个汇总对象以保持对作业进度的跟踪来完成的。接下来,它接受来自共享文件系统的、在客户端计算的输入分片。然后对每一个分片创建一个map
任务对象以及由mapreduce.job.reduces
属性(通过Job的setNumReduceTasks()方法设置)确定的多个reduce
任务对象。任务ID在此时分片。
Application Master必须决定如何运行构成MapReduce作业的各个任务。如果作业很小,就选择和自己在同一个JVM上运行任务。与在一个节点上顺序运行这些任务相比,当Application Master判断在新的容器中分配和运行任务的开销大于并行运行它们的开销时,就会发生这一情况。这一的作业成为uberized
,或者叫做uber
任务。
怎么判断作业是小作业呢?默认情况下,小作业就是少于10个map任务且只有1个reduce任务,此外输入大小小于一个HDFS块的作业(通过设置mapreduce.job.ubertask.maxmaps
、mapreduce.job.ubertask.maxreduces
和mapreduce.job.ubertask.maxbytes
可以改变这几个值)。必须明确启用Uber任务(对于单个作业,或者是对整个集群),具体方法是将mapreduce.job.ubertask.enable
属性设置为true。
最后,在任何任务运行之前,Application Master调用setupJob()
方法设置OutputCommitter
。FileOutputCommitter
为默认值,表示将建立作业的最终输出目录及任务输出的临时工作空间。
任务的分配
如果作业不适合作为uber任务运行,那么Application Master就会为该作业中的所有map任务和reduce任务向ResourceManager请求容器。首先为map任务发出请求,该请求优先级要高于reduce任务的请求,这是因为所有的map任务必须在reduce的排序阶段能够启动前完成。直到有5%的map任务已经完成时,为reduce任务的请求才会发出。
Reduce任务能够在集群中任意位置运行,但map任务的请求有着数据本地化局限,这也是调度器所关注的。在理想的情况下,任务是数据本地化的,意味着任务在分片所在的同一节点上运行。可选的情况是,任务可能是本地机架化的,即和分片在同一机架而非同一节点上运行。有一些任务既不是数据本地化,也不是机架本地化,它们会从别的机架,而不是运行所在的机架上获取自己的数据。对于一个特定的作业运行,可以通过查看作业的计数器来确定在每个本地化层次上运行的任务的数量。
请求也为任务制定了内存需求和CPU数。在默认情况下,每个map任务和reduce任务都分配到1024MB内存和1个虚拟核,这些值可以在每个作业的基础上进行配置,分别通过4个属性来设置mapreduce.map.memory.mb
、mapreduce.reduce.memory.mb
、mapreduce.map.cpu.vcores
和mapreduce.cpu.vcores
。
任务的执行
一旦ResourceManager的调度器为任务分配了一个特定节点上的容器,application master就通过与NodeManager通信来启动任务容器。该任务由主类为YarnChild
的一个Java应用程序执行。在它运行任务之前,首先将任务需要的资源本地化,包括作业的配置、Jar文件和所有来自分布式缓存的文件。最后,运行map任务或reduce任务。
YarnChild在指定的JVM中运行,因此用户定义的map或reduce方法(甚至是YarnChild)中的任何缺陷不会影响到NodeManager,例如导致其崩溃或挂起。
每个任务都能够执行setup
和commit
动作,它们和任务本身在同一个JVM中运行,并由作业的OutputCommitter
确定。对于基于文件系统的作业,提交动作将任务输出由临时位置搬移到最终位置。提交协议确保当前推测执行
被启用时,只有一个任务副本被提交,其他的都被取消。
进度和状态更新
MapReduce是长时间运行的批量作业,运行时间范围从数秒到数小时。这可能是一个很长的时间段,所以对于用户而言,能够得知关于作业进展的一些反馈是很重要的。一个作业和它的每个任务都有一个状态(status),包括:作业或任务的状态(比如,运行中,成功完成,失败)、map和reduce的进度、作业计数器的值、状态消息或描述(可以由用户代码来设置)。这些状态信息在作业期间不断改变,它们是如何与客户端通信的呢?
任务在运行时,对其进度保持追踪。对map任务,任务进度是已处理输入所占的比例。对reduce任务,情况稍微有点复杂,但系统仍然会估计已处理reduce输入的比例。整个过程分成三部分,与shuffle的三个阶段相对应。比如,如果任务已经执行reduce一半的输入,那么任务的进度便是5/6,这时因为已经完成复制和排序阶段(每个占1/3),并且已经完成reduce阶段的一半(1/6)。
任务也有一组计数器,负责对任务运行过程中各个事件进行计数,这些计数器要么内置于框架中,例如已写入的map输出记录数,要么由用户自己定义。
当map任务或reduce任务运行时,子进程和自己的父Application Master通过特定接口通信。每隔3秒钟,任务通过这个接口向自己的Application Master报告进度和状态(包括计数器),Application Master会形成一个作业的汇聚视图。
ResourceManager的界面显示了所有运行中的应用程序,并且分别有链接指向这些应用各自的Application Master的界面,这些界面展示了MapReduce作业的更多细节,包括其进度。
在作业期间,客户端每秒钟轮询一次Application Master以接收最新状态。客户端也可以使用Job的getStatus()
方法得到一个JobStatus的实例,后者包含作业的所有状态信息。
作业的完成
当Application Master收到作业的左右一个任务已完成的通知后,便把作业的状态设置为”成功”。然后,在Job轮询状态时,便知道任务已成功完成,于是Job打印一条消息告知用户,然后从waitForCompletion()
方法返回。Job的统计信息和计数值也在这个时候输出到控制台。
如果Application Master有相应的设置,也会发送一个HTTP作业通知。希望收到回调指令的客户端可以通过mapreduce.job.end-notification.url
属性来进行这项设置。
最后,作业完成时,Application Master和任务容器清理其工作状态(这样中间输出将被删除),OutputCommitter的commitJob()
方法会被调用。作业信息由作业历史服务器存档,以便日后用户需要时可以查询。
失败
在现实情况中,用户代码错误不断,进程崩溃,机器故障,如此种种。使用Hadoop最主要的好处之一是它能处理此类故障并让你能够成功完成作业。我们需要考虑以下实体的失败:任务、Application Master、NodeManager和Resource Manager。
任务运行失败
首先考虑任务失败的情况。最常见的情况是map任务或reduce任务中的用户代码抛出运行异常。如果发生这种情况,任务JVM会在退出之前向其父Application Master发送错误报告。错误报告最后被记入用户日志。Application Master将此次任务尝试标记为failed
,并释放容器以便资源可以为其他任务使用。
另一种失败模式是任务JVM突然退出,可能由于JVM软件缺陷而导致MapReduce用户代码由于某些特殊原因造成JVM退出。在这种情况下,NodeManager会注意到进程已经退出,并通知Application Master将此次任务尝试标记为失败。
任务挂起的处理方式则有不同。一旦Application Master注意到已经有一段时间没有收到进程的更新,变回将任务标记为失败。在此之后,任务JVM进程将被自动杀死。任务被任务失败的超时间隔通常为10分钟,可以以作业为基础(或以集群为基础)进行设置,对应的属性为mapreduce.task.timeout
,单位为毫秒。
超时设置为0将关闭超时判定,所以长时间运行的任务永远不会被标记为失败。在这种情况下,被挂起的任务永远不会释放它的容器并随着时间的推移最终降低整个集群的效率。因此,尽量避免这种设置,同时充分确保每个任务能够定期汇报其进度。
Application Master被告知一个任务尝试失败后,将重新调度该任务的执行。Application Master会试图避免在以前失败过的NodeManager上重新调度该任务。此外,如果一个任务失败过4次,将不会再充实。这个值时可以设置的:对于map任务,运行任务的最多尝试次数由mapreduce.map.maxattemptes
属性控制;对于reduce任务,则由mapreduce.reduce.maxattempts
属性控制。在默认情况下,如果任何任务失败次数大于4,整个作业都会失败。
对于一些应用程序,我们不希望一旦有少数几个任务失败就中止整个作业,因为即使有任务失败,作业的一些结果可能还是可用的。在这种情况下,可用为作业设置在不触发作业失败的情况下允许任务失败的最大百分比。针对map任务和reduce任务的设置可以通过mapreduce.map.failures.maxpercent
和mapreduce.reduce.failures.maxpercent
这两个属性来完成。
任务尝试也是可以中止的,这与失败不同。任务尝试可以被中止是因为它是一个推测任务或因为它所处的NodeManager失败,导致Application Master将它上面运行的所有任务尝试标记为killed
。被中止的任务尝试不会被计入任务运行尝试次数,因为尝试被中止并不是任务的过错。
用户也可以使用Web UI或命令行来手动中止或取消任务尝试。
Application Master运行失败
YARN中的应用程序在运行失败的时候会有几次尝试机会,就像MapReduce任务在遇到硬件或网络故障时要进行几次尝试一样。运行MapReduce Application Master的最多尝试次数由mapreduce.am.max-attempts
属性控制,默认值是2,即如果Application Master失败两次,便不会再进行尝试,作业将失败。
YARN对集群上运行的Application Master的最大尝试次数加以了限制,单个的应用程序不可以超过这个限制。该限制由yarn.resourcemanager.am.max-attempts
属性设置,默认值是2,这样如果你想增加Application Master的尝试次数,必须增加集群上YARN的设置。
恢复的过程如下。Application Master向ResourceManager发送周期性的心跳,当Application Master失败时,ResourceManager将检测到该失败并在一个新的容器中开始一个新的Master实例。对于MapReduce Application Master,它默认情况下恢复功能是开启的,但可以通过设置yarn.app.mapreduce.am.job.recovery.enable
为false来关闭这个功能。
MapReduce客户端向Application Master轮询进度报告,但是如果它的Application Master运行失败,客户端就需要重新定位新的实例。在作业初始化期间,客户端向ResourceManager询问并缓存Application Master的地址,使其每次需要向Application Master查询时不必访问ResourceManager。但是,如果Application Master运行失败,客户端就会在发出状态更新请求时经历超时,这时客户端会再次向ResourceManager请求新的Application Master地址,这个过程对用户是透明的。
NodeManager运行失败
如果NodeManager由于崩溃或运行非常缓慢而失败,就会停止向ResourceManager发送心跳(或发送频率很低)。如果10分钟内(yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms
配置)没有收到一条心跳信息,ResourceManager将会通知停止发送心跳信息的NodeManager,并将其从自己的节点池中移除以调度启用容器。
在失败的NodeManager上运行的所有任务或Application Master都用前面两节描述的机制进行恢复。另外,对于那些曾经在失败的NodeManager上运行且成功完成的map任务,如果属于未完成的作业,那么Application Master会安排他们重新运行。这是由于这些任务的中间输出驻留在失败的节点NodeManager的本地文件系统中,可能无法被reduce任务访问的缘故。
如果应用程序的运行失败次数过高,那么NodeManager可能会被拉黑,即使NodeManager自己并没有失败过。由Application Master管理黑名单,对于MapReduce,如果一个NodeManager上有超过三个任务失败,Application Master就会尽量将任务调度到不同的节点上。用户可以通过作业属性mapreduce.job.maxtaskfailures.per.tracker
设置该阈值。
ResourceManager运行失败
ResourceManager运行失败是非常严重的问题,没有ResourceManager,作业和任务容器将无法启动。在默认的配置中,Resource有单点故障问题,不管YARN也有HA机制,如果主ResourceManager失败了,那么备份的ResourceManager能够接替,且客户端不会感到明显的中断。
关于所有运行中的应用程序的信息存储在一个高可用的状态存储区中(由ZooKeeper或HDFS备份),这样备机可以恢复出失败的主ResourceManager的关键状态。NodeManager信息没有存储在状态存储区中,因为当NodeManager发送它们的第一个心跳信息时,NodeManager的信息能以相当快的速度被新的ResourceManager重构。
当新的ResourceManager启动后,它从状态存储区中读取应用程序的信息,然后为集群中运行的所有应用程序重启Application Master。这个行为不被计为失败的应用程序尝试,这时因为应用程序并不是因为程序代码错误而失败,而是被系统强行中止的。实际情况中,Application Master重启不是MapReduce应用程序的问题,因为它们是恢复已完成任务的工作。
ResourceManager从备机到主机的切换是由故障转移控制器(failover controller)处理的。默认的故障转移控制器是自动工作的,使用ZooKeeper的leader选举机制以确保同一时刻只有一个主ResourceManager。不同于HDFS高可用的实现,故障转移器不必是一个独立的进程,为配置方便,默认情况下嵌入在ResourceManager中。故障转移也可以配置为手动处理,但不建议这样。
为应对ResourceManager的故障转移,必须对客户端和NodeManager进行配置,因为他们可能是在和两个ResourceManager打交道。客户端和NodeManager以轮询方式试图连接每一个ResourceManager,直到找到主ResourceManager。如果主ResourceManager故障,他们将再次尝试直到备份ResourceManager变成主机。
任务的执行
在了解了MapReduce作业运行机制后,我们来看看MapReduce用户对任务执行的更多控制。
任务执行环境
Hadoop为map任务或reduce任务提供运行环境相关信息。例如,map任务可以知道它处理的文件的名称,map任务或reduce任务可以得知任务的尝试次数。下表的属性可以从作业的配置信息中获取。
属性名称 | 类型 | 说明 | 范例 |
---|---|---|---|
mapreduce.job.id | String | 作业ID | job_201903061120_0004 |
mapreduce.task.id | String | 任务ID | task_201903061120_m_000003 |
mapreduce.task.attempt.id | String | 任务尝试ID | attempt_201903061120_m_000003_0 |
mapreduce.task.partition | int | 作业中任务的索引 | 3 |
mapreduce.task.ismap | boolean | 此任务是否是map任务 | true |
推测执行
MapReduce模型将作业分解成任务,然后并行地运行任务以使作业的整体执行时间少于各个任务顺序执行的时间。这使作业执行时间对运行缓慢的任务很敏感,因为只运行一个缓慢的任务会使整个作业所用的时间远远长于执行其他任务的时间。当一个作业由几百个或几千个任务组成时,可能出现少数”拖后腿”的任务,这是很常见的。
任务执行缓慢可能有多种原因,包括硬件老化或软件配置错误,但是,检测具体原因很困难,因为任务总能够成功完成,尽管比预期执行时间长。Hadoop不会尝试诊断或修复执行慢的任务,相反,在一个任务运行比预期慢的时候,它会尽量检测,并启动另一个相同的任务作为备份。这就是所谓的任务”推测执行”(Speculative Execution)。
必须认识到一点:如果同时启动两个重复的任务,它们会互相竞争,导致推测执行无法工作。这对集群资源是一种浪费。相反,调度器跟踪作业中所有相同类型任务的进度,并且仅仅启动运行速度明显低于平均水平的那一小部分任务的推测副本。一个任务成功完成后,任何正在运行的重复任务都将被中止,因为已经不再需要它们了。因此,如果原任务在推测任务前完成,推测任务就会被中止;同样,如果推测任务先完成,那么原任务就会被中止。
推测执行是一种优化措施,它并不能使作业的运行更可靠。如果有一些软件缺陷会造成任务挂起或运行速度减慢,依靠推测执行来避免这些问题显然是不明智的,并且不能可靠地运行,因为相同的软件缺陷可能会影响推测式任务。应该修复软件缺陷,使任务不会挂起或运行速度减慢。
在默认情况下,推测执行是启用的。可以基于集群或基于每个作业,单独为map任务和reduce任务启用或禁用该功能。下表示推测任务相关的属性。
属性名称 | 类型 | 默认值 | 描述 |
---|---|---|---|
mapreduce.map.speculative | boolean | true | 如果任务运行变慢,该属性决定着是否要启动map任务的另外一个实例 |
mapreduce.reduce.speculative | boolean | true | 如果任务运行变慢,该属性决定着是否要启动reduce任务的另一个实例 |
为什么会想到关闭推测执行?推测执行的目的是减少作业执行时间,但这是以集群效率为代价的。在一个繁忙的集群中,推测执行会减少整体的吞吐量,因为冗余任务的执行时会减少作业的执行时间。因此,一些集群管理员倾向于在集群上关闭此选项,而让用户根据个别作业需要而开启该功能。
对于reduce任务,关闭推测执行是有益的,因为任务重复的reduce任务都必须将取得map输出作为最先的任务,这可能会大幅度地增加集群上的网络传输。
关于OutputCommitters
Hadoop MapReduce使用一个提交协议来确保作业和任务都完全成功或失败。这个行为通过对作业使用OutputCommitter
来实现。默认值为FileOutputCommitter
,这对基于文件的MapReduce是适合的。可以定制已有的OutputCommitter,或者在需要时还可以写一个新的实现以完成对作业或任务的特别设置或清理。
OutputCommitter的API如下所示。
1 | public abstract class OutputCommitter |
setupJob()
方法在作业运行前调用,通常用来执行初始化操作。例如FileOutputCommitter会创建最终的输出目录,并且为任务输出创建一个临时的工作工具,_temporary
,作为最终输出目录的子目录。
如果作业成功,就调用commitJob()
方法,在默认的基于文件的实现中,它用于删除临时的工作空间并在输出目录中创建一个名为_SUCCESS
的隐藏的标识文件,以此告知文件系统的客户端该作业成功完成了。如果作业不成功,就通过状态对象调用abortJob()
,意味着该作业是否失败或终止(例如由用户终止)。在默认的实现中,将删除作业的临时工作空间。
在任务级别的操作上于此类似。在任务执行之前先调用setupTask()
方法,默认的实现是不做任何事情,因为针对任务输出命名的临时目录在写任务输出的时候被创建。
任务的提交阶段是可选的,并通过从needsTaskCommit()
返回的false值关闭它。这使得执行框架不必为任务运行分布提交协议,也不需要commitTask()
或者abortTask()
。当一个任务没有写任何输出时,FileOutputCommitter将跳过提交阶段。
如果任务成功,就调用commitTask()
,在默认的实现中它将临时的输出目录移动到最后的输出路径。否则,执行框架调用abortTask()
,它负责删除临时的任务输出目录。
执行框架保证特定任务在有多次任务尝试的情况下,只有一个任务会被提交,其他的则被取消。这种情况是可能出现的,因为第一次尝试出于某个原因而失败,提交的是稍后成功的尝试。另一种情况是如果两个任务尝试作为推测副本同时运行,则提交先完成的,而另一个被取消。
小结
本章介绍了MapReduce向YARN框架的提交流程,说明了客户端、ResourceManager、NodeManager、Application Master以及Task之间是如何交互的。任务在提交时会将任务信息上传到HDFS,并在某个容器中启动Application Master,之后由Application Master负责向ResourceManager申请资源,以启动不同的任务。此外,我们还介绍了任务在运行过程中各个组件的失败情况。基本上可以说,无论是任务、还是Application Master,以及ResourceManager和NodeManager,都有相应的恢复机制,这也是使用YARN的优势之一。最后我们介绍了关于推测执行的相关概念,它只是一种优化手段,当某个任务的运行时间高于任务的平均运行时间时,框架就会尝试启动一个推测副本任务,谁先执行完,就将另一个中止。