如何开发MapReduce应用
写在前面
前面学习的都是MapReduce较为基础的内容,接下来学习如何开发MapReduce应用,其中最重要的就是学习MapReduce的类型、输入格式、输出格式和Combiner的使用。
MapReduce的类型
其实这里主要学习的是MapReduce的输入和输出类型。前面也说过,使用Hadoop中的MapReduce编程模型非常简单,开发者只需实现map和reduce方法中的输入和输出key/value键值对的类型即可,下面来学习各种数据类型在MapReduce中是如何使用的。
通过查看WritableComparable接口,可以发现Hadoop提供了很多与Java数据类型相对应的数据类型,如下表所示:
Hadoop中数据类型 | Java中的数据类型 |
---|---|
IntWritable | Integer |
BooleanWritable | Boolean |
FloatWritable | Float |
LongWritable | Long |
DoubleWritable | Double |
ShortWritable | Short |
ByteWritable | Byte |
同时可以发现MapReduce中的map和reduce函数需要满足如下格式:
1 | map:(k1,v1) -> list(l2,v2) |
从这个格式可以知道,reduce函数的输入类型必须与map函数的输出类型保持一致。
接下来学习MapReduce中常用的设置,了解这些设置对于灵活使用MapReduce非常有帮助:
(1)输入数据类型由输入格式(InputFormat)来设置。举个例子,如TextInputFormat的Key类型是LongWritable类型,Value类型是Text。
(2)map的输出的Key的类型由setMapOutputKeyClass来设置,Value的类型由setMapOutputValueClass来设置。
(3)reduce的输出的Key的类型由setOutputKeyClass来设置,Value的类型由setOutputValueClass来设置。
MapReduce的输入格式
一般情况下,MapReduce处理的数据文件都是存储在HDFS上的,且这些数据文件的格式任意,可以是基于行的日志文件、也可以是二进制的图片、视频等。这些文件一般比较大,可能几十GB,甚至更大,那么问题来了,此时MapReduce如何读取这些数据呢?
InputFormat接口
InputFormat接口非常重要,它用于指定输入文件如何被Hadoop分块。InputFormat可以从一个job中得到一个split集合,之后再给这个split集合配上一个合适的RecordReader来读取每个split集合中的数据。
查看一下这个InputFormat接口的源码:
1 | @Public |
可以看到这个接口中有两个方法,再来看一下这个同名的抽象类的源码,可以看到除了无参的构造方法外,它也有两个方法,而且似乎和上面接口中的方法非常相似:
1 | @Public |
可以看到接口中的getSplits和getRecordReader方法与抽象类中的getSplits和createRecordReader的参数和返回值是不同的,但是两者本质上是没有区别的,因此这里就以抽象类为例来介绍这两个方法的作用。
(1)getSplits(JobContext var1)
方法用于将一个大数据在逻辑上分成多片。
举个例子,假设现在的数据库有100条数据,它们按照主键ID升序存储。之后每20条分为一片,那么这个List的大小就是5,之后每个InputSplit记录两个参数,第一个是是这个分片的起始ID,第二个则是这个分片数据的大小,此处为20。通过这个例子,我们知道这个InputSplit并没有真正存储数据,只是提供了一个如何将数据分片的方法。
(2)createRecordReader(InputSplit var1, TaskAttemptContext var2)
方法会根据InputSplit定义的方法,来返回一个可以读取分片记录的RecordReader对象。
前面说了getSplits(JobContext var1)
方法用于获取由输入文件计算出来的InputSplit,而且在后面计算InputSplit时,会考虑输入文件是否可以分割、文件存储时分块的大小和文件大小等因素。而createRecordReader(InputSplit var1, TaskAttemptContext var2)
方法则提供了前面提到的RecordReader的实现,将Key/Value键值对从InputSplit中正确读出,如LineRecordReader,它以Key为偏移值,Value为每行数据,这使得所有createRecordReader()
方法返回LineRecordReader的InputFormat都是以偏移值为Key、每行数据为Value的形式读取输入分片的。
InputFormat接口的实现类
下面是InputFormat接口所有的实现类:
接下来学习几个工作中常用的InputFormat接口的实现类,如下所示:
(1)FileInputFormat
。FileInputFormat
是所有使用文件作为数据源的InputFormat
实现的基类,其主要作用是指定作业的输入文件所在的路径。由于作业的输入被设定为一组路径,因此这对于作业路径的指定是非常有利的。FileInputFormat
类提供了2种静态方法用于指定作业的输入路径:
1 | public static void addInputPaths(JobConf conf, String commaSeparatedPaths); |
(2)KeyValueTextInputFormat
。KeyValueTextInputFormat
它每一行均为一条记录,被分隔符(缺省为Tab)分割为Key(Text类型)、Value(Text类型)。
MapReduce的输出格式
针对前面学习的输入格式,Hadoop都有其对应的输出格式。默认情况下只有一个Reduce,也就是一个输出文件,其文件名为part-r-00000
。查看一下之前wordCount程序的输出文件,如下所示:
1 | [root@master ~]# hadoop fs -ls -R /outputwc |
输出文件的个数与Reduce的个数一致,如果有两个Reduce,那么输出结果就有两个文件,第一个为part-r-00000
,第二个为part-r-00001
,以此类推。
OutputFormat接口
OutputFormat接口主要用于描述数据的格式,它能够将用户提供的Key/Value键值对写入特定格式额度文件中。通过OutputFormat接口,开发者也可以实现具体的输出格式,但是过程较为复杂,通常都是使用默认的格式进行输出。
OutputFormat接口实现类
Hadoop既然提供了大量的InputFormat接口的实现类,那么就有与之对应的OutputFormat接口实现类,进而完成数据的输入输出操作。
下面是OutputFormat接口所有的实现类:
请注意OutputFormat类是MapReduce输出的基类,所有MapReduce输出都实现了OutputFormat接口。
接下来学习几个工作中常用的OutputFormat接口的实现类,如下所示:
(1)TextOutputFormat
(文本输出),这也是默认的输出格式,也就是说MapReduce默认将数据以文本的形式进行输出。TextOutputFormat
将每条记录写为文本行,利用它的键和值可以实现Writable的任意类型,之后TextOutputFormat
可以调用toString()
方法将它们转换为字符串。
请注意每个Key/Value键值对由制表符进行分割,当然也可以通过设置mapreduce.output.textoutputformat.separator
属性来修改默认的分隔符:
请注意,与FileOutputFormat
相对应的输入格式是KeyValueTextInputFormat
,同样它也是可以通过前面的mapreduce.output.textoutputformat.separator
属性来设置分隔符进而将Key/Value键值对进行文本分割。
还可以使用NullWritable来省略输出的Key或者Value,甚至两者都省略,此时就相当于NullOutputFormat输出格式,这会导致数据无分割符进行输出,此时输出只适合用TextInputFormat来读取。
(2)SequenceFileOutputFormat
(二进制输出),它用于将它的输出写为一个顺序文件。请注意,当输出需要作为后续MapReduce任务的输入时,此时推荐使用SequenceFileOutputFormat
(二进制输出),因为它格式紧凑,便于压缩。
Combiner操作
概述
通过前面的学习,我们知道Hadoop框架使用Mapper将数据处理成一个Key/Value键值对,之后再在网络节点中对其进行整理(shuffle),接着再使用Reducer处理数据并进行最终输出。
假设现在有一个场景:现在有10亿个数据,Mapper会生成10亿个Key/Value键值对,这些键值对在网络间进行传输,但是现在我们仅仅是想对数据求最大值,那么此时Mapper只需输出它所知道的最大值即可,这样不仅能减轻网络压力,还可以提高程序的执行效率。
正是由于这种场景在实际工作中非常普通,因此MapReduce框架提供了Combiner来避免map任务与reduce任务之间无效的数据传输。Hadoop允许用户针对map任务的输出指定一个合并函数,用于减少输出到Reducer中的数据量。也就是说这个合并函数主要是为了削减Mapper的输出数量,进而减少网络带宽和Reducer上的负载。
开发者完全可以将Combiner操作,看成是在每个单独的节点上,先进行一次Reducer操作,只不过此时的输入和输出的参数和Reducer是一样的。
请注意开发者可以使用Combiner操作来进行求和、求最值,但是无法求平均数。
下图是之前WordCount程序的执行过程:
使用Combiner操作的执行过程:
使用
接下来以之前学习的WordCount程序为例,来学习如何在实际工作中使用Combiner操作。
在app包内新建一个WordCountCombinerApp
类,其中的代码如下所示:
1 | public class WordCountCombinerApp { |