【Hadoop15】:MapReduce案例-流量统计

前言

在前面的章节中,我们已经学习了MapReduce程序的基本模式以及一些场景的组件,如Combiner、Partitioner等。本章将会编写一个流量统计的案例,尽量将前面学到的知识点串联起来。

需求

对手机上网日志数据进行处理,统计出每个手机号码上网流量信息,数据样本如下所示。

1
2
3
1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200

数据文件中的每行表示一个手机号码的上网记录,下面以第一行数据为例,说明各个字段的含义。

  • 1363157985066,未使用
  • 13726230503,手机号码
  • 00-FD-07-A4-72-B8:CMCC,未使用
  • 120.196.100.82,未使用
  • i02.c.aliimg.com,未使用
  • 24,未使用
  • 27,未使用
  • 2481,上行流量
  • 24681,下行流量
  • 200,未使用

数据字段有了之后,来看一下具体的需求说明。

  1. 计算每个手机号码的总上行流量
  2. 计算每个手机号码的总下行流量
  3. 计算每个手机号码的总流量
  4. 将手机号码为130-134开头的数据存放到单独的结果文件中
  5. 将手机号码为135-139开头的数据存放到单独的结果文件中
  6. 不符合规范的数据打印到日志中,并使用计数器统计

分析

根据需求,我们来分析如何实现。

需求1-3只需要提取对应字段的数据,发送到Reduce端进行聚合即可。

需求4-5要求将结果数据存放到不同的结果文件中,可以使用分区器加上多个Reducer任务来实现。

需求6在处理数据的过程中,发现不符合规范的数据后,使用计数器记录即可。

实现

首先定义一个Flow类,用于封装数据传输中的字段信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class Flow implements Writable {

private String phone;
private long up;
private long down;
private long total;

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.phone);
out.writeLong(this.up);
out.writeLong(this.down);
out.writeLong(this.total);
}

@Override
public void readFields(DataInput in) throws IOException {
this.phone = in.readUTF();
this.up = in.readLong();
this.down = in.readLong();
this.total = in.readLong();
}

public void set(String phone, long up, long down) {
this.phone = phone;
this.up = up;
this.down = down;
this.total = up + down;
}

@Override
public String toString() {
return this.phone + "\t" + this.up + "\t" + this.down + "\t" + this.total;
}

public String getPhone() {
return phone;
}

public void setPhone(String phone) {
this.phone = phone;
}

public long getUp() {
return up;
}

public void setUp(long up) {
this.up = up;
}

public long getDown() {
return down;
}

public void setDown(long down) {
this.down = down;
}

public long getTotal() {
return total;
}

public void setTotal(long total) {
this.total = total;
}
}

定义FlowMapper类,用于处理Map阶段的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class FlowMapper extends Mapper<LongWritable, Text, Text, Flow> {

private static final Log LOG = LogFactory.getLog(FlowMapper.class);
private static Text outKey = new Text();
private static Flow outValue = new Flow();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 切分
String[] fields = value.toString().split("\t");
if (fields.length != 11) {
// 数据有误
LOG.info("ErrorData:" + value.toString());
context.getCounter("MyCounter", "ErrorDataCounter").increment(1);
} else {
// 封装数据
outKey.set(fields[1]);
outValue.set(fields[1], Long.parseLong(fields[8]), Long.parseLong(fields[9]));
context.write(outKey, outValue);
}
}

}

定义FlowReducer类,用于处理Reduce阶段的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class FlowReducer extends Reducer<Text, Flow, Flow, NullWritable> {

private static Flow outKey = new Flow();
private static NullWritable outValue = NullWritable.get();

@Override
protected void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException {
// 聚合操作
long totalUp = 0L;
long totalDown = 0L;
for (Flow flow : values) {
totalUp += flow.getUp();
totalDown += flow.getDown();
}
outKey.set(key.toString(), totalUp, totalDown);
context.write(outKey, outValue);
}
}

定义FlowPartitioner类,用于对数据分区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class FlowPartitioner extends Partitioner<Text, Flow> {

private static Pattern pattern = Pattern.compile("13[0-4][0-9]{8}");

@Override
public int getPartition(Text text, Flow flow, int numPartitions) {
String phone = flow.getPhone();
if (pattern.matcher(phone).matches()) {
return 0;
} else {
return 1;
}
}
}

定义FlowRunner类,用于配置参数以及作为入口类执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class FlowRunner extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("Usage: FlowRunner <input path> <output path>");
return 1;
}

Job job = Job.getInstance(this.getConf(), "FlowExample");
job.setJarByClass(FlowRunner.class);

job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));

job.setMapperClass(FlowMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Flow.class);

job.setReducerClass(FlowReducer.class);
job.setOutputKeyClass(Flow.class);
job.setOutputValueClass(NullWritable.class);

job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));

job.setPartitionerClass(FlowPartitioner.class);
job.setNumReduceTasks(2);

boolean success = job.waitForCompletion(true);
return success ? 0 : -1;
}

public static void main(String[] args) throws Exception {
int code = ToolRunner.run(new FlowRunner(), args);
System.exit(code);
}
}

小结

本章以一个流量统计的案例,将前面学过的MapReduce分区器、计数器、自定义Writable对象等知识串联起来,加强对MapReduce程序的理解。

如果您觉得不错,请赞赏一下!