【Hadoop14】:MapReduce计数器

前言

MapReduce中的计数器时收集作业统计信息的有效手段之一,用于质量控制或应用级统计。例如,统计任务处理过程中异常数据的数量。虽然我们可以将异常数据输出到日志,但是更多时候,仅仅需要知道异常数据的占比就可以了,此时使用计数器就会非常方便。

内置计数器

Hadoop为每个作业维护若干个内置计数器,以描述多项指标。例如,某些计数器记录以处理的字节数和记录数,使用户可监控已处理的输入数据量和已产生的输出数据量。这些内置计数器被划分为若干个组,如下所示。

组别 名称/类别
MapReduce任务计数器 org.apache.hadoop.mapreduce.TaskCounter
文件系统计数器 org.apache.hadoop.mapreduce.FileSystemCounter
FileInputFormat计数器 org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
FileOutputFormat计数器 org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter
作业计数器 org.apache.hadoop.mapreduce.JobCounter

下面我们分别介绍这几组内置的计数器。

任务计数器

在任务执行过程中,任务计数器采集任务的相关信息,各个作业的所有任务的结果会被聚集起来。例如MAP_INPUT_RECORDS计数器统计每个map任务输入记录的总数,并在一个作业的所有map任务上进行聚集,使得最终数字是整个作业的所有输入记录的总数。

任务计数器由其关联任务维护,并定期发送给application master。因此,计数器能够被全局聚集。任务计数器的值每次都是完整传输的,而非传输自上次传输之后的计数值,从而避免由于消息丢失而引发的错误。另外,如果一个任务在作业执行期间失败,则相关计数器的值会减小。

虽然只有当整个作业执行完之后计数器的值才是完整可靠的,但是部分计数器仍然可以在任务处理过程中提供一些有用的诊断信息,以便由Web界面监控。例如,PHYSICAL_MEMORY_BYTESVIRTUAL_MEMORY_BYTESCOMMITTED_HEAP_BYTES计数器显示特定任务执行过程中的内存变化情况。

内置的任务计数器包括在MapReduce任务技术区分组中的计数器以及在文件相关的计数器分组中的计数器。

下面先来看一下内置的MapReduce任务计数器

计数器名称 说明
map输入的记录数(MAP_INPUT_RECORDS) 作业中所有map已处理的输入记录数。每次RecordReader读取到一条记录并将其传输给map()方法时,该计数器的值递增
分片(split)的原始字节数(SPLIT_RAW_BYTES) 由map读取的输入分片对象的字节数。这些对象描述分片元数据(文件的唯位移和长度),而不是分片的数据自身,因此总规模是小的
map输出的记录数(MAP_OUTPUT_RECORDS) 作业中所有map产生的map输出记录数。每次某一个map的OutputCollector调用collect()方法时,该计数器的值增加
map输出的物化字节数(MAP_OUTPUT_METERIALIZED_BYTES) map输出后确实写到磁盘上的字节数;若map输出压缩功能被启用,则会在计数器值上反映出来
combine输入的字节数(COMBINE_INPUT_RECORDS) 作业中所有combiner已处理的输入记录数。combiner的迭代器每次读取一个值,该计数器的值增加。注意,本计数器代表combiner已经处理的值的个数,并非不同的键组数
combine输出的记录数(COMBINE_OUTPUT_RECORDS) 作业中所有combiner已产生的输出记录数。每当一个combiner的OutputCollector调用collect()方法时,该计数器的值增加
reduce输入的组(REDUCE_INPUT_GROUPS) 作业中所有reducer已经处理的不同的分组的个数。每当某一个reducer的reduce()被调用时,该计数器的值增加
reduce输入的记录数(REDUCE_INPUT_RECORDS) 作业中所有reducer已经处理的输入记录的个数。每当某一个reducer的迭代器读取一个值时,该计数器的值增加。如果所有reducer已经处理完所有输入,则该计数器的值与”map输出的记录”计数器的值相同
reduce输出的记录数(REDUCE_OUTPUT_RECORDS) 作业中所有map已经产生的reduce输出记录数。每当某个reducer的OutputCollector调用collect()方法时,该计数器的值增加
reduce经过shuffle的字节数(REDUCE_SHUFFLE_BYTES) 由shuffle复制到reducer的map输出的字节数
溢出的记录数(SPILLED_RECORDS) 作业中所有map和reduce任务溢出到磁盘的记录数
CPU毫秒(CPU_MILLISECONDS) 一个任务的总CPU时间,以毫秒为单位
物理内存字节数(PHYSICAL_MEMORY_BYTES) 一个任务所用的物理内存,以字节数为单位
虚拟内存字节数(VIRTUAL_MEMORY_BYTES) 一个任务所用虚拟内存的字节数
有效的堆字节数(COMMITTED_HEAP_BYTES) 在JVM中的总有效内存量,以字节为单位
GC运行时间毫秒数(GC_TIME_MILLIS) 在任务执行过程中,垃圾收集器话费的时间,以毫秒为单位
由shuffle传输的map输出数(SHUFFLED_MAPS) 由shuffle传输到reducer的map输出文件数
失败的shuffle数(FAILED_SHUFFLE) shuffle过程中,发生map输出拷贝错误的次数
被合并的map输出数(MERGED_MAP_OUTPUTS) shuffle过程中,在reduce端合并的map输出文件数

下面是内置的文件系统任务计数器。

计数器名称 说明
文件系统的读字节数(BYTES_READ) 由map任务和reduce任务在各个文件系统中读取的字节数,各个文件系统分别对应一个计数器,文件系统可以是Local、HDFS、S3等
文件系统的写字节数(BYTES_WRITTEN) 由map任务和reduce任务在各个文件系统中写的字节数
文件系统读操作的数量(READ_OPS) 由map任务和reduce任务在各个文件系统中进行读操作的数量,例如open、file status操作
文件系统大规模读操作的数量(LARGE_READ_OPS) 由map任务和reduce任务在各个文件系统中进行大规模读操作的数量,例如对一个大容量的目录进行list操作
文件系统写操作的数量(WRITE_OPS) 由map任务和reduce任务在各个文件系统中进行写操作的数量,例如create、append操作

下面是内置的内置的FileInputFormat任务计数器。

计数器名称 说明
读取的字节数(BYTES_READ) 由map任务通过FileInputFormat读取的字节数

下面是内置的FileOutputFormat任务计数器。

计数器名称 说明
写的字节数(BYTES_WRITTEN) 由map任务或者reduce任务通过通过FileOutputFormat写的字节数

作业计数器

作业计数器由application master维护,因此无需在网络间传输数据,这一点与包括“用户定义的计数器”在内的其他计数器不同。这些计数器都是作业级别的统计量,其值不会随着任务运行而改变。例如TOTAL_LAUNCHED_MAPS统计在作业执行过程中启动的map任务数,包括失败的map任务。

下面是内置的作业计数器。

计数器名称 说明
启用的map任务数(TOTAL_LAUNCHED_MAPS) 启动的map任务数,包括以”推测执行”方式启动的任务
启用的reduce任务数(TOTAL_LAUNCHED_REDUCERS) 启动的reduce任务数,包括以”推测执行”方式启动的任务
启用的uber任务数(TOTAL_LAUNCHED_UBERTASKS) 启动的uber任务数(一种优化MapReduce小任务的方式)
uber任务中的map数(NUM_UBER_SUBMAPS) 在uber任务中的map数
uber任务中的reduce数(NUM_UBER_SUBREDUCES) 在uber任务中的reduce数
失败的map任务数(NUM_FAILED_MAPS) 失败的map任务数
失败的reduce任务数(NUM_FAILED_REDUCES) 失败的reduce任务数
失败的uber任务数(NUM_FAILED_UBERTASKS) 失败的uber任务数
被中止的map任务数(NUM_KILLED_MAPS) 被终止的map任务数
被中止的reduce任务数(NUM_KILLED_REDUCES) 被中止的reduce任务数
数据本地化的map任务数(DATA_LOCAL_MAPS) 与输入数据在同一节点上的map任务数
机架本地化的map任务数(RACK_LOCAL_MAPS) 与输入数据在同一机架范围内但不在同一节点上的map任务数
其他本地化的map数(OTHER_LOCAL_MAPS) 与输入数据不在同一机架范围内的map任务数。由于机架之间的带宽资源相对较少,Hadoop会尽量让map任务数靠近输入数据执行,因而该计数器值一般会比较小
map任务的总运行时间(MILLIS_MAPS) map任务的总运行时间,单位是毫秒。包括以推测执行方式启动的任务。可参见相关的度量内核和内存使用的计数器(VCORE_MILLS_MAPS和MB_MILLIS_MAPS)
reduce任务的总运行时间(MILLIS_REDUCES) reduce任务的总运行时间,单位是毫秒。包括以推测执行方式启动的任务。可参见相关的度量内核和内存使用的计数器(VCORE_MILIIS_REDUCES和MB_MILLIS_REDUCES)

用户自定义计数器

MapReduce允许用户编写程序来定义计数器,计数器的值可在mapper或reducer中增加。计数器由一个Java枚举类型来定义,以便对有关的计数器分组。一个作业可以定义的枚举类型数量不限,各个枚举类型包含的字段数量也不限。枚举类型的名称作为分组的名称,枚举类型的字段就是计数器的名称。计数器是全局的。换而言之,MapReduce框架将跨越所有map和reduce聚集这些计数器,并在作业结束时产生一个最终结果。

下面使用枚举方式定义并使用计数器。

MyCounter代码如下。

1
2
3
4
public enum MyCounter {
HADOOP,
MAPREDUCE
}

WordCountMapper代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class WordCountMapper extends Mapper<LongWritable, Text, Text, VIntWritable> {

private static Text outKey = new Text();
private static VIntWritable outValue = new VIntWritable(1);

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
for (String word : value.toString().split(" ")) {
if ("hadoop".equals(word)) {
context.getCounter(MyCounter.HADOOP).increment(1);
}
if ("mapreduce".equals(word)) {
context.getCounter(MyCounter.MAPREDUCE).increment(1);
}
outKey.set(word);
context.write(outKey, outValue);
}
}
}

使用context.getCounter()方法可获取到对应的计数器,使用increment(1)方法使得计数器自增1。

提交程序,当任务运行完成后,会将计数器的结果打印到控制台,如图所示。

我们也可以通过YARN的Web界面来查看任务的计数器。

实际上,我们还可以通过context.getCounter(String groupName, String counterName)的方式动态获取计数器,这种方式使用起来更加简单,例如。

1
context.getCounter("MyCounter", "hadoop").increment(1);

小结

本章介绍了MapReduce中计数器的概念和使用方式。计数器能够对于某个事件或指标以计数的方式统计值,在很多场景下非常有用。Hadoop自带了很多计数器,通过这些计数器,我们可以有针对性的进行任务优化。

如果您觉得不错,请赞赏一下!