【Hadoop07】:MapReduce基础

前言

MapReduce是一种可用于数据处理的编程模型,用于解决大数据场景下数据处理的问题。该模型比较简单,但要想写出有用的程序却不太容易。Hadoop可以运行各种语言版本的MapReduce程序,不过我们这里以Java语言为主。

MapReduce的核心思想是”分而治之”,简单来说,就是将要处理的数据集,拆分成多个任务并行在多个节点上运行,然后将任务的运行结果收集起来,再进行全局的计算,最后将结果保存。

基本概念

MapReduce本质上是一种编程模型,该模型将任务划分为两个阶段:Map和Reduce。Map阶段将任务分解为若干个没有互相依赖的小任务进行并行计算;Reduce阶段负责将Map阶段的处理结果进行全局汇总。这两个阶段加起来,就是MapReduce的思想。

可能这么说有些抽象,我们举一个例子来说明。例如现在有100框苹果,每框的苹果个数不同,现在要你计算出这100框苹果的具体个数,你会怎么做?

第一种方式:你可以自己一个人慢慢数。

第二种方式:多叫一些人一起数,最后汇总每个人的数的数量。

哪种方式更好?

在第二种方式中,就体现了MapReduce的编程思想。将”数苹果”这个任务拆分成了多个独立的互不依赖的小任务,这些任务可以并行执行,最后将任务的结果汇总即可。

有了思想之后,我们需要将思想转换为具体的代码逻辑进行实现。虽然MapReduce的编程模型很简单,但是如何去划分任务,任务运行在哪儿,当某个任务运行时出现了错误怎么办,网络之间的数据如何传输?这些问题比我们纯粹编写业务逻辑代码要复杂的多,但是它又是必须的步骤。

幸好,Hadoop中的MapReduce框架以及帮我们把这些通用的处理细节实现了,我们无需考虑除了业务逻辑之外的工作如何运转,交给框架来做就好了。MapReduce框架可以将用户编写的业务逻辑代码和自带的默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。

MapReduce框架将做什么怎么做这两件事情分开了,我们只需要关心具体的处理逻辑即可,框架会自动完成怎么做这件事。

编程模型

既然MapReduce框架可以将我们编写的代码整合成为一个完整的分布式运算程序,那么我们就要按照一定的套路来编写程序,或者说,你要使用框架,就得遵循框架的设计规范。

通常情况下,我们编写一个MapReduce程序需要下列这些步骤:

  1. 指定数据来源,这个很好理解吧,你要处理数据,首先得指定数据在哪儿,是不?
  2. 编写Map函数,对输入的数据进行处理,并将处理后的结果进行输出
  3. 编写Reduce函数,处理来自Map函数的输出
  4. 将处理后的结果进行保存

是不是非常简单?使用MapReduce框架,就是这么简单。

实际上,这里隐藏了很多细节,例如分组、排序等,不过不要担心,在后续的阶段我们会一步步把这些步骤讲清楚。

第一个MapReduce程序:WordCount

有了MapReduce的概念后,也了解了编写MapReduce程序的编程模型,下面我们来编写第一个MapReduce程序:WordCount。(还记得我们在Hadoop搭建环境运行的自带示例程序吗?)

我们需要编写三个类:WordCountMapper、WordCountReducer和WordCountRunner。

WordCountMapper类实现了Map函数的逻辑,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* WordCount程序的Map函数实现
* 四个泛型参数表示:偏移量、当前行数据、输出key类型、输出value类型
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private static final Logger logger = LoggerFactory.getLogger(WordCountMapper.class);

/**
* map函数的具体实现
* @param key 表示当前数据在数据块中的偏移量
* @param value 当前行数据
* @param context 上下文
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 每一行数据
String line = value.toString();
// 按照空格切割,提取单词
String[] words = line.split(" ");
// 将获取到的单词发送到Reduce端
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}

WordCountReducer类实现了Reduce函数的逻辑,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* WordCount程序Reduce函数
* 四个泛型参数表示:输入key类型、输入value类型、输出key类型、输出value类型
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, LongWritable> {

/**
* reduce函数的具体实现
* @param key map端处理好的单词
* @param values 单词对应的个数,实际上数据结构为<key, [value1, value2, ...]>
* @param context 上下文
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 单词累加
long sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
// 输出
context.write(key, new LongWritable(sum));
}
}

WordCountRunner类作为程序的入口,设置任务的相关参数,如输入输出、数据类型等。代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* MapReduce WordCount程序
*/
public class WordCountRunner {

// 程序执行入口
public static void main(String[] args) throws Exception {
// 数据的路径由程序参数指定
if (args.length != 2) {
System.out.println("Usage: WordCountRunner <input path> <output path>");
System.exit(1);
}
// 创建任务对象,用于配置任务参数
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 给任务取个名称
job.setJobName("WordCountExample");
// 设置入口类
job.setJarByClass(WordCountRunner.class);

// 设置输入
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));
// 设置Mapper任务类
job.setMapperClass(WordCountMapper.class);
// 设置Mapper任务的输出数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置Reducer任务类
job.setReducerClass(WordCountReducer.class);
// 设置Reducer任务的输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));

// 等待任务执行结束
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}

}

代码编写完成后,我们就可以将程序打包,提交到集群上运行了。由于我们的项目采用Maven构建,可以直接使用Maven命令进行打包。

1
$ mvn package

或者直接在IDE中找到对应的Maven面板,点击打包即可。

程序打包后,需要上传到集群,并提交任务执行(注意,实现准备好数据)。

1
$ bin/hadoop jar mr/hadoop-example-1.0.jar org.victor.mapreduce.demo01.WordCountRunner /mr/demo01/input /mr/demo01/output

查看输出结果。

1
2
3
4
5
6
7
$ bin/hadoop fs -text /mr/demo01/output/*
hadoop 3
hdfs 3
mapreduce 2
spark 2
wordcount 1
yarn 3

从结果中可以看到,数据的顺序,是按照Key进行升序排列的。

OK,我们的第一个MapReduce程序WordCount就算是完成了,想必你有很多疑问,不用着急,接下来我们就会对这个程序进行详细剖析。

小结

在本章中,我们介绍了MapReduce的基本概念。MapReduce编程模型分为两个阶段:Map阶段和Reduce阶段。Map阶段用于并行执行没有依赖关系的任务,Reduce阶段对Map阶段的输出进行汇总处理。在Hadoop框架中,提供了MapReduce框架,可以将我们按照固定模式编写的代码封装为一个完整的分布式程序,我们无需关心网络传输、任务调度等工作,可以将开发人员的精力专注于具体的实现上。最后,我们编写了第一个MapReduce程序:WordCount,可用于统计HDFS文件中的单词个数。

如果您觉得不错,请赞赏一下!