前言
在前面的章节中,我们已经学习了MapReduce程序的基本模式以及一些场景的组件,如Combiner、Partitioner等。本章将会编写一个流量统计的案例,尽量将前面学到的知识点串联起来。
需求
对手机上网日志数据进行处理,统计出每个手机号码上网流量信息,数据样本如下所示。
1 | 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 |
数据文件中的每行表示一个手机号码的上网记录,下面以第一行数据为例,说明各个字段的含义。
- 1363157985066,未使用
- 13726230503,手机号码
- 00-FD-07-A4-72-B8:CMCC,未使用
- 120.196.100.82,未使用
- i02.c.aliimg.com,未使用
- 24,未使用
- 27,未使用
- 2481,上行流量
- 24681,下行流量
- 200,未使用
数据字段有了之后,来看一下具体的需求说明。
- 计算每个手机号码的总上行流量
- 计算每个手机号码的总下行流量
- 计算每个手机号码的总流量
- 将手机号码为130-134开头的数据存放到单独的结果文件中
- 将手机号码为135-139开头的数据存放到单独的结果文件中
- 不符合规范的数据打印到日志中,并使用计数器统计
分析
根据需求,我们来分析如何实现。
需求1-3只需要提取对应字段的数据,发送到Reduce端进行聚合即可。
需求4-5要求将结果数据存放到不同的结果文件中,可以使用分区器加上多个Reducer任务来实现。
需求6在处理数据的过程中,发现不符合规范的数据后,使用计数器记录即可。
实现
首先定义一个Flow
类,用于封装数据传输中的字段信息。
1 | public class Flow implements Writable { |
定义FlowMapper
类,用于处理Map阶段的逻辑。
1 | public class FlowMapper extends Mapper<LongWritable, Text, Text, Flow> { |
定义FlowReducer
类,用于处理Reduce阶段的逻辑。
1 | public class FlowReducer extends Reducer<Text, Flow, Flow, NullWritable> { |
定义FlowPartitioner
类,用于对数据分区。
1 | public class FlowPartitioner extends Partitioner<Text, Flow> { |
定义FlowRunner
类,用于配置参数以及作为入口类执行。
1 | public class FlowRunner extends Configured implements Tool { |
小结
本章以一个流量统计的案例,将前面学过的MapReduce分区器、计数器、自定义Writable对象等知识串联起来,加强对MapReduce程序的理解。