写在前面

本篇来学习MapReduce,主要包括MapReduce简介、设计目标、特点、不适合场景、编程模型等内容,并在此基础上通过一个词频统计功能来加深对于MapReduce的理解。

MapReduce

简介

MapReduce是谷歌开源的一个用于大数据量的计算框架,对于大数据量的计算,通常采用并行计算方式来处理。但是如果让开发人员自己来完全实现一个并行计算程序,这是非常困难的,鉴于此种情况,MapReduce就孕育而生,它是一种简化并行计算的编程模型,允许没有并发计算经验的开发者也能开发出并行计算程序,这无疑大大降低了并行计算的实现门槛。

设计目标

从简介中可以知道MapReduce的设计目标就是便于开发人员在不熟悉分布式并发编程的情况下,也能将自己的程序运行在分布式系统上。

MapReduce采用的是“分而治之”的思想,即把对大规模数据集的操作,分发给一个主节点管理下的各个子节点,让它们共同来完成,之后整合各个子节点的中间结果,进而得到最终的计算结果。用一句话来概括MapReduce就是“分散任务,汇总结果”。

特点

(1)容易编程。MapReduce只需简单实现一些接口,就能完成一个个分布式程序,且这些分布式程序可以分布到大量廉价的PC机上运行。这使得开发者写一个分布式程序就像写一个简单的串行程序一样,非常简单,这也是MapReduce编程非常流行的原因。
(2)扩展性良好。当开发者的计算资源无法得到满足时,可以通过简单的增加机器来扩展它的计算能力。
(3)高容错性。MapReduce的设计初衷是要求程序能够部署在大量廉价的PC机上,这就要求MapReduce具备较高的容错性。我们知道程序在廉价PC机上出现故障的概率很大,这就要求当某台机器出现故障宕机了,可以将它上面的计算任务转移到另外一个节点上运行,而不至于导致整个任务运行失败,而且这个过程不需要人工干预,完全是由Hadoop内部完成的。
(4)能够对PB及以上级别的海量数据进行离线处理。也就是说MapReduce比较适合离线处理,而不适合实时处理,对延迟率要求苛刻的时候不建议使用这个。MapReduce很难做到毫秒级返回一个计算结果。

不适合场景

易于编程的MapReduce尽管具有很多优点,但是也有一些不太擅长的地方,注意这里的说法是太擅长,而不是无法做到,只是在某些场景下实现的效果较差,此时更适合使用其他的框架。这些不适合场景主要有以下几个:
(1)实时计算。前面说过MapReduce无法像MySQL那样在毫秒或者秒级内返回计算结果。
(2)流式计算。所谓的流式计算是指计算输入的数据是动态的,而MapReduce的输入数据集必须是静态的,不能动态发生变化,这个就是MapReduce自身设计特点所决定的。
(3)DAG(有向图)计算。多个应用程序之间存在依赖关系,前一个应用程序的输出,作为后一个应用程序的输入,在这种情况下,MapReduce并不是不能做,只是使用MapReduce之后,每个MapReduce作业的输出结果都会写入到磁盘,势必会造成大量的磁盘IO,进而降低使用性能。

MapReduce编程模型

概述

其实单从MapReduce这个名字就能知道,MapReduce由Map和Reduce这两个阶段组成。也就是说用户只需编写map()reduce()这两个函数,就能完成简单的分布式程序的设计。

map()函数以Key-Value键值对作为输入,之后产生另外一系列的Key-Value键值对作为中间输出写入本地磁盘。MapReduce框架会自动将这些中间数据按照Key值进行聚集,且Key值相同的数据会被统一交给reduce()函数来处理。请注意这个聚集策略用户可以自定义,系统默认采用的是对Key值进行哈希取模。

reduce()函数以Key及对应的Value列表作为输入,经合并Key相同的Value值后,会产生另外一系列Key-Value键值对来作为最终输出,进而写入到HDFS中。

MapReduce将作业(其实就是一个MapReduce程序)的整个运行过程分为Map和Reduce这两个过程,这里以后续介绍的词频统计功能来介绍MapReduce编程模型:

(1)Map阶段由一定数量的Map Task组成:

  • 输入数据格式解析:InputFormat(将输入文件分片);
  • 输入数据处理:Mapper;
  • 数据分组:Partitioner。

(2)Reduce阶段由一定数量的Reduce Task组成:

  • 数据远程拷贝(从Map Task的输出拷贝部分数据);
  • 数据按照Key排序和分组(即Key相同的都放在一起,按照Key进行分组操作,每一组交由Reducer进行处理);
  • 数据处理:Reducer;
  • 数据输出格式:OutputFormat(输出文件格式、设置分隔符等)。

    MapReduce编程步骤

    从上述词频统计功能就能看出MapReduce编程步骤如下所示:
    (1)Input:输入一系列的(k1,v1)键值对;
    (2)Map和Reduce:Map:(k1,v1)->list(k2,v2),之后Reduce:(k2,list(v2))->list(k3,v3)。其中k2/v2是中间结果对。
    (3)Output:一系列(k3,v3)键值对。

    MapReduce词频统计实例

    接下来通过一个WordCount程序来详细解释MapReduce模型。一个最简单的MapReduce程序至少包含3个部分:一个Map函数、一个Reduce函数和一个main函数。

也就是说在运行一个MapReduce计算任务时,这个任务过程会被划分为Map阶段和Reduce阶段,每个阶段都采用Key-Value键值对作为输入(input)和输出(output),main函数则将作业控制和文件输入/输出结合起来。

WordCount程序需求:现在有大量的文件,每个文件中又有大量的单词,要求统计每个单词出现的词频。

MapReduce完整的执行流程如下所示:

(1)WordCount实现设计分析
(a)Map过程:并行读取文本,对读取的单词进行map操作,每个词都以<Key,Value>形式生成。

读取第一行Deer Bear River,分割单词形成Map:

1
<Deer,1> <Bear,1> <River,1>

读取第二行Car Car River,分割单词形成Map:

1
<Car,1> <Car,1> <River,1>

读取第三行Deer Car Bear,分割单词形成Map:

1
<Deer,1> <Car,1> <Bear,1>

(b)Reduce过程:对Map的结果进行排序、合并,最后得到词频。

reduce将形成的Map根据相同的Key组合成Value数组,如下所示:

1
<Bear,1,1> <Car,1,1,1> <Deer,1,1> <River,1,1>

之后循环执行Reduce(K,V[]),分别统计出每个单词出现的此处,结果如下所示:

1
<Bear,2> <Car,3> <Deer,2> <River,2>

(2)WordCount代码实现。创建一个Maven项目,名称为envy-mapReduce,之后在其pom.xml依赖文件中添加如下信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.6.0</hadoop.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
</dependency>
</dependencies>

接着在java包内新建com/envy/envymapreduce/app包,并在app包内新建EnvyWordCountApp类,其中的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public class EnvyWordCountApp {
public static class MyMapper extends Mapper<Object, Text,Text, IntWritable> {
//new一个int类型用于计数,并赋值为1,因为不管每个单词出现几次,都是直接输出1
//假设输入数据为nice to nice,one赋值为1,那么使用context.write(word,one)的输出结果就是nice 1,to 1,nice 1
//如果one赋值为2,那么使用context.write(word,one)的输出结果就是nice 2,to 2,nice 2
private static final IntWritable one = new IntWritable(1);
//new一个String类型
private Text word = new Text();

/**
* 重写map函数,其中的context参数是上下文,用于记录key和value的值
* */
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//字符串分解类,将一行单词分解为多个单词
StringTokenizer itr = new StringTokenizer(value.toString());
//循环条件表示返回是否还有分隔符
while (itr.hasMoreTokens()){
//使用itr.nextToken()来获取单词
//word.set()方法将Java数据类型转换为Hadoop数据类型,这样后续才能输出
word.set(itr.nextToken());
//按照输出格式来输出结果,如nice 1
context.write(word,one);
}
}
}

public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
private IntWritable result = new IntWritable();

/**
* 重写reduce函数,其中的context参数是上下文,用于记录key和result的值
* */
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//定义一个Java数据类型的sum,用于记录相同单词的数量
int sum = 0;
//遍历得到的结果
for(IntWritable value:values){
//由于value是IntWritable类型,因此需要调用get()方法来返回Java类型的值
//请注意IntWritable.get()和IntWritable.set()都可以进行数据类型转换,不同的是set是将Java数据类型转换为Hadoop数据类型
//而get则是将Hadoop数据类型转换为Java数据类型

sum+=value.get();
}
//将Java数据类型转换为Hadoop数据类型
result.set(sum);
//将结果输出到HDFS中
context.write(key,result);
}
}

public static void main(String[] args) throws Exception {
//配置输入路径
String INPUT_PATH = "hdfs://master:9000/inputwc";
//配置输出路径
String OUTPUT_PATH = "hdfs://master:9000/outputwc";

//读取Hadoop的配置文件
Configuration configuration= new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
if(fileSystem.exists(new Path(OUTPUT_PATH))){
fileSystem.delete(new Path(OUTPUT_PATH),true);
}

//创建一个任务,第一个参数是配置对象,第二个参数是任务名称,这里是EnvyWordCountApp
Job job = Job.getInstance(configuration,"EnvyWordCountApp");

//运行Jar类
job.setJarByClass(EnvyWordCountApp.class);

//设置map
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//设置reduce
job.setReducerClass(MyReducer.class);
//设置输出结果的key/value的类型,也就是最终存储在HDFS上结果文件的key/value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//设置输入格式
//设置输入文件路径
job.setInputFormatClass(TextInputFormat.class);
Path inputPath = new Path(INPUT_PATH);
FileInputFormat.addInputPath(job,inputPath);

//设置输出格式
job.setOutputFormatClass(TextOutputFormat.class);
Path outputPath = new Path(OUTPUT_PATH);
//设置输出文件路径
FileOutputFormat.setOutputPath(job, outputPath);

//提交job
//如果job运行成功,那么程序就会正常退出
System.exit(job.waitForCompletion(true)?0:1);
}
}

请注意目前直接运行代码中的main方法是会抛异常的,正确的启动会在后面进行介绍。

WordCount代码详解

尽管笔者在代码中对部分语句进行了注释,但是这里还是有必要对 WordCount代码进行详解。

(1)对于map方法来说,其参数如下所示:

1
public void map(Object key, Text value, Context context)

请注意这个方法存在于MyMapper类中,它需要继承Mapper类,并实现map方法。这个map方法有三个参数,前面的Object keyText value就是输入的key和value,第三个参数Context context记录的则是整个上下文,因此开发者可以通过context将数据写出去。

(2)对于reduce方法来说,其参数如下所示:

1
public void reduce(Text key, Iterable<IntWritable> values, Context context)

请注意这个方法存在于MyReducer类中,它需要继承Reducer类,并实现reduce方法。这个reduce方法有三个参数,前面的Text keyIterable<IntWritable> values也是输入的key/values形式,不过可以看到它的value是迭代器形式的Iterable<IntWritable> values,也就是说reduce的输入是一个key对应一组value,reduce方法也有context参数,且作用和之前map方法中的作用一样。

(3)对于main方法来说,其分析如下所示:
(a)创建Configuration类,请注意在运行MapReduce程序前都需要初始化Configuration类,该类主要是读取MapReduce系统配置信息:

1
Configuration configuration= new Configuration();

(b)创建Job类:

1
2
3
4
ob job = Job.getInstance(configuration,"EnvyWordCountApp");
job.setJarByClass(EnvyWordCountApp.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);

第一行代码用于构建一个Job对象,它有两个参数,一个是configuration,另一个是job名称。第二行则是用户自定义的MapReduce类。第三行和第四行则是分别设置map函数和reduce函数的实现类。

(c)设置输出的key/value类型:

1
2
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

上面设置的是中间输出key/value的类型,还需要设置最终存储在HDFS中的结果文件中的key/value的类型,如下所示:

1
2
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

(d)设置Job的输入和输出路径并提交到集群中运行:

1
2
3
4
5
6
7
8
9
10
job.setInputFormatClass(TextInputFormat.class);
Path inputPath = new Path(INPUT_PATH);
FileInputFormat.addInputPath(job,inputPath);


job.setOutputFormatClass(TextOutputFormat.class);
Path outputPath = new Path(OUTPUT_PATH);
FileOutputFormat.setOutputPath(job, outputPath);

System.exit(job.waitForCompletion(true)?0:1);

可以看到上面一共有三段代码,其中第一段用于构建输入的数据文件,第二段用于构建输出的数据文件,第三段则表示如果job运行成功,则程序就会正常退出。

请注意,虽然我们编写MapReduce程序只需要实现map函数和reduce函数,但是在实际开发中往往需要实现三个方法,第三个方法是为了配置MapReduce如何运行map和reduce函数,说白了就是构建一个MapReduce能执行的job。

WordCount提交到集群运行

接下来介绍如何将之前的WordCount代码提交到集群中运行,由于这里是第一次接触到MapReduce,因此这里以伪分布式集群的方式来运行WordCount,实际上完全分布式集群的运行和这里的过程是完全一样的。

伪分布式集群方式运行WordCount程序的步骤如下所示:
(1)在本地Windows机器上使用mvn clean package -DskipTests命令或者直接在IDEA中使用maven插件进行清理打包,之后就可以生成envy-mapreduce-1.0-SNAPSHOT.jar包;
(2)将(1)中的jar包上传至虚拟机/home/hadoop/lib目录下;
(3)在虚拟机/home/hadoop目录中新建一个hello.text的测试文件,其中的内容如下所示:

1
2
3
Deer Bear River
Car Car River
Deer Car Bear

之后在HDFS根目录下新建inputwc目录,之后将这个hello.txt文件上传至该目录:

1
2
3
[root@master sbin]# hadoop fs -mkdir /inputwc
[root@master sbin]# hadoop fs -put
/home/hadoop/hello.txt /inputwc

之后确认文件上传HDFS中:

1
2
[root@master sbin]# hadoop fs -ls -R /inputwc
-rw-r--r-- 1 root supergroup 44 2020-06-04 21:23 /inputwc/hello.txt

(4)提交MapReduce作业到集群中运行:

1
[root@master sbin]# hadoop jar /home/hadoop/lib/envy-mapreduce-1.0-SNAPSHOT.jar com.envy.envymapreduce.app.EnvyWordCountApp

执行流程如下所示:

(5)查看作业输出结果,如下所示:

1
2
3
4
5
[root@master sbin]# hadoop fs -text /outputwc/part-*
Bear 2
Car 3
Deer 2
River 2

顺便看一下HDFS上/outputwc目录信息:

1
2
3
[root@master sbin]# hadoop fs -ls -R /outputwc
-rw-r--r-- 1 root supergroup 0 2020-06-04 21:27 /outputwc/_SUCCESS
-rw-r--r-- 1 root supergroup 28 2020-06-04 21:27 /outputwc/part-r-00000

这样WordCount作业就执行成功了,那么本篇关于MapReduce的入门学习就到此为止。