MapReduce入门
写在前面
本篇来学习MapReduce,主要包括MapReduce简介、设计目标、特点、不适合场景、编程模型等内容,并在此基础上通过一个词频统计功能来加深对于MapReduce的理解。
MapReduce
简介
MapReduce是谷歌开源的一个用于大数据量的计算框架,对于大数据量的计算,通常采用并行计算方式来处理。但是如果让开发人员自己来完全实现一个并行计算程序,这是非常困难的,鉴于此种情况,MapReduce就孕育而生,它是一种简化并行计算的编程模型,允许没有并发计算经验的开发者也能开发出并行计算程序,这无疑大大降低了并行计算的实现门槛。
设计目标
从简介中可以知道MapReduce的设计目标就是便于开发人员在不熟悉分布式并发编程的情况下,也能将自己的程序运行在分布式系统上。
MapReduce采用的是“分而治之”的思想,即把对大规模数据集的操作,分发给一个主节点管理下的各个子节点,让它们共同来完成,之后整合各个子节点的中间结果,进而得到最终的计算结果。用一句话来概括MapReduce就是“分散任务,汇总结果”。
特点
(1)容易编程。MapReduce只需简单实现一些接口,就能完成一个个分布式程序,且这些分布式程序可以分布到大量廉价的PC机上运行。这使得开发者写一个分布式程序就像写一个简单的串行程序一样,非常简单,这也是MapReduce编程非常流行的原因。
(2)扩展性良好。当开发者的计算资源无法得到满足时,可以通过简单的增加机器来扩展它的计算能力。
(3)高容错性。MapReduce的设计初衷是要求程序能够部署在大量廉价的PC机上,这就要求MapReduce具备较高的容错性。我们知道程序在廉价PC机上出现故障的概率很大,这就要求当某台机器出现故障宕机了,可以将它上面的计算任务转移到另外一个节点上运行,而不至于导致整个任务运行失败,而且这个过程不需要人工干预,完全是由Hadoop内部完成的。
(4)能够对PB及以上级别的海量数据进行离线处理。也就是说MapReduce比较适合离线处理,而不适合实时处理,对延迟率要求苛刻的时候不建议使用这个。MapReduce很难做到毫秒级返回一个计算结果。
不适合场景
易于编程的MapReduce尽管具有很多优点,但是也有一些不太擅长的地方,注意这里的说法是太擅长,而不是无法做到,只是在某些场景下实现的效果较差,此时更适合使用其他的框架。这些不适合场景主要有以下几个:
(1)实时计算。前面说过MapReduce无法像MySQL那样在毫秒或者秒级内返回计算结果。
(2)流式计算。所谓的流式计算是指计算输入的数据是动态的,而MapReduce的输入数据集必须是静态的,不能动态发生变化,这个就是MapReduce自身设计特点所决定的。
(3)DAG(有向图)计算。多个应用程序之间存在依赖关系,前一个应用程序的输出,作为后一个应用程序的输入,在这种情况下,MapReduce并不是不能做,只是使用MapReduce之后,每个MapReduce作业的输出结果都会写入到磁盘,势必会造成大量的磁盘IO,进而降低使用性能。
MapReduce编程模型
概述
其实单从MapReduce这个名字就能知道,MapReduce由Map和Reduce这两个阶段组成。也就是说用户只需编写map()
和reduce()
这两个函数,就能完成简单的分布式程序的设计。
map()
函数以Key-Value键值对作为输入,之后产生另外一系列的Key-Value键值对作为中间输出写入本地磁盘。MapReduce框架会自动将这些中间数据按照Key值进行聚集,且Key值相同的数据会被统一交给reduce()
函数来处理。请注意这个聚集策略用户可以自定义,系统默认采用的是对Key值进行哈希取模。
reduce()
函数以Key及对应的Value列表作为输入,经合并Key相同的Value值后,会产生另外一系列Key-Value键值对来作为最终输出,进而写入到HDFS中。
MapReduce将作业(其实就是一个MapReduce程序)的整个运行过程分为Map和Reduce这两个过程,这里以后续介绍的词频统计功能来介绍MapReduce编程模型:
(1)Map阶段由一定数量的Map Task组成:
- 输入数据格式解析:InputFormat(将输入文件分片);
- 输入数据处理:Mapper;
- 数据分组:Partitioner。
(2)Reduce阶段由一定数量的Reduce Task组成:
- 数据远程拷贝(从Map Task的输出拷贝部分数据);
- 数据按照Key排序和分组(即Key相同的都放在一起,按照Key进行分组操作,每一组交由Reducer进行处理);
- 数据处理:Reducer;
- 数据输出格式:OutputFormat(输出文件格式、设置分隔符等)。
MapReduce编程步骤
从上述词频统计功能就能看出MapReduce编程步骤如下所示:
(1)Input:输入一系列的(k1,v1)键值对;
(2)Map和Reduce:Map:(k1,v1)->list(k2,v2),之后Reduce:(k2,list(v2))->list(k3,v3)。其中k2/v2是中间结果对。
(3)Output:一系列(k3,v3)键值对。MapReduce词频统计实例
接下来通过一个WordCount程序来详细解释MapReduce模型。一个最简单的MapReduce程序至少包含3个部分:一个Map函数、一个Reduce函数和一个main函数。
也就是说在运行一个MapReduce计算任务时,这个任务过程会被划分为Map阶段和Reduce阶段,每个阶段都采用Key-Value键值对作为输入(input)和输出(output),main函数则将作业控制和文件输入/输出结合起来。
WordCount程序需求:现在有大量的文件,每个文件中又有大量的单词,要求统计每个单词出现的词频。
MapReduce完整的执行流程如下所示:
(1)WordCount实现设计分析。
(a)Map过程:并行读取文本,对读取的单词进行map操作,每个词都以<Key,Value>形式生成。
读取第一行Deer Bear River,分割单词形成Map:
1 | <Deer,1> <Bear,1> <River,1> |
读取第二行Car Car River,分割单词形成Map:
1 | <Car,1> <Car,1> <River,1> |
读取第三行Deer Car Bear,分割单词形成Map:
1 | <Deer,1> <Car,1> <Bear,1> |
(b)Reduce过程:对Map的结果进行排序、合并,最后得到词频。
reduce将形成的Map根据相同的Key组合成Value数组,如下所示:
1 | <Bear,1,1> <Car,1,1,1> <Deer,1,1> <River,1,1> |
之后循环执行Reduce(K,V[])
,分别统计出每个单词出现的此处,结果如下所示:
1 | <Bear,2> <Car,3> <Deer,2> <River,2> |
(2)WordCount代码实现。创建一个Maven项目,名称为envy-mapReduce,之后在其pom.xml依赖文件中添加如下信息:
1 | <properties> |
接着在java包内新建com/envy/envymapreduce/app
包,并在app包内新建EnvyWordCountApp类,其中的代码如下所示:
1 | public class EnvyWordCountApp { |
请注意目前直接运行代码中的main方法是会抛异常的,正确的启动会在后面进行介绍。
WordCount代码详解
尽管笔者在代码中对部分语句进行了注释,但是这里还是有必要对 WordCount代码进行详解。
(1)对于map方法来说,其参数如下所示:
1 | public void map(Object key, Text value, Context context) |
请注意这个方法存在于MyMapper类中,它需要继承Mapper类,并实现map方法。这个map方法有三个参数,前面的Object key
和Text value
就是输入的key和value,第三个参数Context context
记录的则是整个上下文,因此开发者可以通过context将数据写出去。
(2)对于reduce方法来说,其参数如下所示:
1 | public void reduce(Text key, Iterable<IntWritable> values, Context context) |
请注意这个方法存在于MyReducer类中,它需要继承Reducer类,并实现reduce方法。这个reduce方法有三个参数,前面的Text key
和Iterable<IntWritable> values
也是输入的key/values形式,不过可以看到它的value是迭代器形式的Iterable<IntWritable> values
,也就是说reduce的输入是一个key对应一组value,reduce方法也有context参数,且作用和之前map方法中的作用一样。
(3)对于main方法来说,其分析如下所示:
(a)创建Configuration类,请注意在运行MapReduce程序前都需要初始化Configuration类,该类主要是读取MapReduce系统配置信息:
1 | Configuration configuration= new Configuration(); |
(b)创建Job类:
1 | ob job = Job.getInstance(configuration,"EnvyWordCountApp"); |
第一行代码用于构建一个Job对象,它有两个参数,一个是configuration,另一个是job名称。第二行则是用户自定义的MapReduce类。第三行和第四行则是分别设置map函数和reduce函数的实现类。
(c)设置输出的key/value类型:
1 | job.setMapOutputKeyClass(Text.class); |
上面设置的是中间输出key/value的类型,还需要设置最终存储在HDFS中的结果文件中的key/value的类型,如下所示:
1 | job.setOutputKeyClass(Text.class); |
(d)设置Job的输入和输出路径并提交到集群中运行:
1 | job.setInputFormatClass(TextInputFormat.class); |
可以看到上面一共有三段代码,其中第一段用于构建输入的数据文件,第二段用于构建输出的数据文件,第三段则表示如果job运行成功,则程序就会正常退出。
请注意,虽然我们编写MapReduce程序只需要实现map函数和reduce函数,但是在实际开发中往往需要实现三个方法,第三个方法是为了配置MapReduce如何运行map和reduce函数,说白了就是构建一个MapReduce能执行的job。
WordCount提交到集群运行
接下来介绍如何将之前的WordCount代码提交到集群中运行,由于这里是第一次接触到MapReduce,因此这里以伪分布式集群的方式来运行WordCount,实际上完全分布式集群的运行和这里的过程是完全一样的。
伪分布式集群方式运行WordCount程序的步骤如下所示:
(1)在本地Windows机器上使用mvn clean package -DskipTests
命令或者直接在IDEA中使用maven插件进行清理打包,之后就可以生成envy-mapreduce-1.0-SNAPSHOT.jar
包;
(2)将(1)中的jar包上传至虚拟机/home/hadoop/lib
目录下;
(3)在虚拟机/home/hadoop
目录中新建一个hello.text的测试文件,其中的内容如下所示:
1 | Deer Bear River |
之后在HDFS根目录下新建inputwc
目录,之后将这个hello.txt文件上传至该目录:
1 | [root@master sbin]# hadoop fs -mkdir /inputwc |
之后确认文件上传HDFS中:
1 | [root@master sbin]# hadoop fs -ls -R /inputwc |
(4)提交MapReduce作业到集群中运行:
1 | [root@master sbin]# hadoop jar /home/hadoop/lib/envy-mapreduce-1.0-SNAPSHOT.jar com.envy.envymapreduce.app.EnvyWordCountApp |
执行流程如下所示:
(5)查看作业输出结果,如下所示:
1 | [root@master sbin]# hadoop fs -text /outputwc/part-* |
顺便看一下HDFS上/outputwc
目录信息:
1 | [root@master sbin]# hadoop fs -ls -R /outputwc |
这样WordCount作业就执行成功了,那么本篇关于MapReduce的入门学习就到此为止。