【Hadoop10】:MapReduce自定义Writable对象

前言

在之前的章节中,我们介绍了Hadoop中的序列化,即Writable。虽然Hadoop自身提供了很多内置的Writable实现,可以满足大部分需求。但在有些场景下,我们需要根据自己的需求构造一个新的实现。有了定制的Writable类型,就可以完全控制二进制表示和排序顺序。此外,在对于数据字段较多的场景下,我们也会构建一个新的实现,来包装这些字段。

案例1

假设我们的数据格式如下。

1
2
3
4
5
1,zhangsan,18,13888888888,Beijing,2017-02-10
2,lisi,20,13666666666,Shanghai,2017-02-11
3,wangwu,19,13112345678,Tianjin,2017-02-10
4,zhaoliu,21,13212345678,Shanghai,2017-02-11
5,xiaomao,17,13312345678,Beijing,2017-02-10

每行数据以逗号分隔,各个字段的含义为:ID、用户名、年龄、手机号、地区以及注册时间。

下面请编写MapReduce程序,找到所有Beijing和Shanghai地区的用户,并按照用户年龄升序。

从需求中,我们屡屡思路:

  1. 找到Beijing和Shanghai的用户比较简单,将每行数据按照逗号分隔,找到相应位置的字段就好
  2. 按照用户年龄排序该怎么弄?

在前面MapReduce的执行流程中我们以及介绍过了,Map任务在将数据输出后,会按照Key进行排序。Reduce端将各个Map任务的输出数据获取后,也会进行排序。那么关键点就在于Key上了,只要我们能控制Key的排序规则,就可以了。之前我们都是用的Text对象作为Key,现在已经不能满足需求了,我们需要自定义Writable对象。

自定义UserWritable对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
/**
* 自定义对象,注意实现WritableComparable接口
*/
public class UserWritable implements WritableComparable<UserWritable> {

// 定义字段
private int id;
private String name;
private int age;
private String phone;
private String address;
private String regDate;

// 注意,必须有一个空参数的构造方法
public UserWritable() {}

public UserWritable(int id, String name, int age, String phone, String address, String regDate) {
this.id = id;
this.name = name;
this.age = age;
this.phone = phone;
this.address = address;
this.regDate = regDate;
}

// 在该方法中自定义比较逻辑
@Override
public int compareTo(UserWritable o) {
// 直接根据年龄比较
return Integer.compare(this.age, o.age);
}

// 序列化方法
@Override
public void write(DataOutput out) throws IOException {
// 将字段依次输出
out.writeInt(this.id);
out.writeUTF(this.name);
out.writeInt(this.age);
out.writeUTF(this.phone);
out.writeUTF(this.address);
out.writeUTF(this.regDate);
}

// 反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
// 顺序要和输出时一致
this.id = in.readInt();
this.name = in.readUTF();
this.age = in.readInt();
this.phone = in.readUTF();
this.address = in.readUTF();
this.regDate = in.readUTF();
}

// Reduce在输出数据时,会调用toString()方法
@Override
public String toString() {
return new StringBuilder().append(this.id).append(",")
.append(this.name).append(",")
.append(this.age).append(",")
.append(this.phone).append(",")
.append(this.address).append(",")
.append(this.regDate)
.toString();
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

public String getPhone() {
return phone;
}

public void setPhone(String phone) {
this.phone = phone;
}

public String getAddress() {
return address;
}

public void setAddress(String address) {
this.address = address;
}

public String getRegDate() {
return regDate;
}

public void setRegDate(String regDate) {
this.regDate = regDate;
}
}

UserMapper类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class UserMapper extends Mapper<LongWritable, Text, UserWritable, NullWritable> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 转换数据
String[] fields = value.toString().split(",");
// 创建对象
UserWritable user = new UserWritable(
Integer.valueOf(fields[0]), // id
fields[1], // name
Integer.valueOf(fields[2]), // age
fields[3], // phone
fields[4], // address
fields[5] // regDate
);
// 条件判断
if ("Beijing".equals(user.getAddress()) || "Shanghai".equals(user.getAddress())) {
// 将User作为Key输出,Value为Null
context.write(user, NullWritable.get());
}
}
}

UserReducer类。

1
2
3
4
5
6
7
8
public class UserReducer extends Reducer<UserWritable, NullWritable, UserWritable, NullWritable> {

@Override
protected void reduce(UserWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// 直接将key输出即可
context.write(key, NullWritable.get());
}
}

UserRunner启动类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class UserRunner extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("Usage: UserRunner <input path> <output path>");
System.exit(1);
}

// 初始化
Job job = Job.getInstance(this.getConf());
job.setJobName("UserWritableExample");
job.setJarByClass(UserRunner.class);
// 设置输入
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));
// Map任务处理类
job.setMapperClass(UserMapper.class);
job.setMapOutputKeyClass(UserWritable.class);
job.setMapOutputValueClass(NullWritable.class);

// Reduce
job.setReducerClass(UserReducer.class);
job.setOutputKeyClass(UserWritable.class);
job.setOutputValueClass(NullWritable.class);
// 输出
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));

boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}

public static void main(String[] args) throws Exception {
int code = ToolRunner.run(new UserRunner(), args);
System.exit(code);
}
}

为了方便起见,我们可以直接在本地运行调试。最终结果如下。

1
2
3
4
5,xiaomao,17,13312345678,Beijing,2017-02-10
1,zhangsan,18,13888888888,Beijing,2017-02-10
2,lisi,20,13666666666,Shanghai,2017-02-11
4,zhaoliu,21,13212345678,Shanghai,2017-02-11

案例2

对下列数据进行排序。

1
2
3
4
5
6
7
8
a 5
b 4
a 1
c 3
b 2
a 2
b 1
c 2

要求,第一列按照字典顺序排序,如果第一列相同,第二列按照降序排列。例如,<a, 2>和<a, 5>,排序后应当<a, 5>在前,<a. 2>在后。

先屡屡思路:

  1. 要根据特殊的规则进行排序,需要定义排序规则,即要自定义Writable对象
  2. MapReduce自动根据Key排序,因此数据输出要将自定义对象作为Key,不需要Value,用NullWritable即可

首先是自定义对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class PairWritable implements WritableComparable<PairWritable> {

// 字段
private String first;
private int second;

public PairWritable() {
}

public PairWritable(String first, int second) {
this.first = first;
this.second = second;
}

@Override
public int compareTo(PairWritable o) {
// 先按照第一个字段排序
int compare = this.first.compareTo(o.first);
if (compare == 0) {
compare = - Integer.compare(this.second, o.second);
}
return compare;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.first);
out.writeInt(this.second);
}

@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readUTF();
this.second = in.readInt();
}

@Override
public String toString() {
return this.first + "\t" + this.second;
}

public String getFirst() {
return first;
}

public void setFirst(String first) {
this.first = first;
}

public int getSecond() {
return second;
}

public void setSecond(int second) {
this.second = second;
}
}

PairMapper类。

1
2
3
4
5
6
7
8
9
10
public class PairMapper extends Mapper<LongWritable, Text, PairWritable, NullWritable> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 封装对象
String[] fields = value.toString().split(" ");
PairWritable pair = new PairWritable(fields[0], Integer.valueOf(fields[1]));
context.write(pair, NullWritable.get());
}
}

PairReducer类。

1
2
3
4
5
6
7
8
public class PairReducer extends Reducer<PairWritable, NullWritable, PairWritable, NullWritable> {

@Override
protected void reduce(PairWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// 直接输出
context.write(key, NullWritable.get());
}
}

PairRunner类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class PairRunner extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("Usage: PairRunner <input path> <output path>");
System.exit(1);
}

Job job = Job.getInstance(this.getConf(), "PairExample");
job.setJarByClass(PairRunner.class);

job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));
job.setMapperClass(PairMapper.class);
job.setMapOutputValueClass(PairWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(PairReducer.class);
job.setOutputKeyClass(PairWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));

boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}

public static void main(String[] args) throws Exception {
int code = ToolRunner.run(new PairRunner(), args);
System.exit(code);
}
}

最终输出结果如下所示。

1
2
3
4
5
6
7
8
a	5
a 2
a 1
b 4
b 2
b 1
c 3
c 2

小结

本章介绍了自定义Writable对象用法,如果要处理的数据字段有多个,或需要根据某种特定的规则进行排序,我们就需要构建自定义的Writable对象封装数据,并实现排序逻辑。此外,如果自定义对象作为Map任务的Key,则要实现WritableComparable接口,否则只需要实现Writable接口即可。本章中的两个案例有一定的代表性,希望大家能够自己完成。

如果您觉得不错,请赞赏一下!