写在前面

前面学习的都是MapReduce较为基础的内容,接下来学习如何开发MapReduce应用,其中最重要的就是学习MapReduce的类型、输入格式、输出格式和Combiner的使用。

MapReduce的类型

其实这里主要学习的是MapReduce的输入和输出类型。前面也说过,使用Hadoop中的MapReduce编程模型非常简单,开发者只需实现map和reduce方法中的输入和输出key/value键值对的类型即可,下面来学习各种数据类型在MapReduce中是如何使用的。

通过查看WritableComparable接口,可以发现Hadoop提供了很多与Java数据类型相对应的数据类型,如下表所示:

Hadoop中数据类型 Java中的数据类型
IntWritable Integer
BooleanWritable Boolean
FloatWritable Float
LongWritable Long
DoubleWritable Double
ShortWritable Short
ByteWritable Byte

同时可以发现MapReduce中的map和reduce函数需要满足如下格式:

1
2
map:(k1,v1) -> list(l2,v2)
reduce:(k2,list(v2)) -> list(k3,v3)

从这个格式可以知道,reduce函数的输入类型必须与map函数的输出类型保持一致。

接下来学习MapReduce中常用的设置,了解这些设置对于灵活使用MapReduce非常有帮助:
(1)输入数据类型由输入格式(InputFormat)来设置。举个例子,如TextInputFormat的Key类型是LongWritable类型,Value类型是Text。

(2)map的输出的Key的类型由setMapOutputKeyClass来设置,Value的类型由setMapOutputValueClass来设置。

(3)reduce的输出的Key的类型由setOutputKeyClass来设置,Value的类型由setOutputValueClass来设置。

MapReduce的输入格式

一般情况下,MapReduce处理的数据文件都是存储在HDFS上的,且这些数据文件的格式任意,可以是基于行的日志文件、也可以是二进制的图片、视频等。这些文件一般比较大,可能几十GB,甚至更大,那么问题来了,此时MapReduce如何读取这些数据呢?

InputFormat接口

InputFormat接口非常重要,它用于指定输入文件如何被Hadoop分块。InputFormat可以从一个job中得到一个split集合,之后再给这个split集合配上一个合适的RecordReader来读取每个split集合中的数据。

查看一下这个InputFormat接口的源码:

1
2
3
4
5
6
7
@Public
@Stable
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf var1, int var2) throws IOException;

RecordReader<K, V> getRecordReader(InputSplit var1, JobConf var2, Reporter var3) throws IOException;
}

可以看到这个接口中有两个方法,再来看一下这个同名的抽象类的源码,可以看到除了无参的构造方法外,它也有两个方法,而且似乎和上面接口中的方法非常相似:

1
2
3
4
5
6
7
8
9
10
@Public
@Stable
public abstract class InputFormat<K, V> {
public InputFormat() {
}

public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;

public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}

可以看到接口中的getSplits和getRecordReader方法与抽象类中的getSplits和createRecordReader的参数和返回值是不同的,但是两者本质上是没有区别的,因此这里就以抽象类为例来介绍这两个方法的作用。

(1)getSplits(JobContext var1)方法用于将一个大数据在逻辑上分成多片。

举个例子,假设现在的数据库有100条数据,它们按照主键ID升序存储。之后每20条分为一片,那么这个List的大小就是5,之后每个InputSplit记录两个参数,第一个是是这个分片的起始ID,第二个则是这个分片数据的大小,此处为20。通过这个例子,我们知道这个InputSplit并没有真正存储数据,只是提供了一个如何将数据分片的方法。

(2)createRecordReader(InputSplit var1, TaskAttemptContext var2)方法会根据InputSplit定义的方法,来返回一个可以读取分片记录的RecordReader对象。

前面说了getSplits(JobContext var1)方法用于获取由输入文件计算出来的InputSplit,而且在后面计算InputSplit时,会考虑输入文件是否可以分割、文件存储时分块的大小和文件大小等因素。而createRecordReader(InputSplit var1, TaskAttemptContext var2)方法则提供了前面提到的RecordReader的实现,将Key/Value键值对从InputSplit中正确读出,如LineRecordReader,它以Key为偏移值,Value为每行数据,这使得所有createRecordReader()方法返回LineRecordReader的InputFormat都是以偏移值为Key、每行数据为Value的形式读取输入分片的。

InputFormat接口的实现类

下面是InputFormat接口所有的实现类:

接下来学习几个工作中常用的InputFormat接口的实现类,如下所示:
(1)FileInputFormatFileInputFormat是所有使用文件作为数据源的InputFormat实现的基类,其主要作用是指定作业的输入文件所在的路径。由于作业的输入被设定为一组路径,因此这对于作业路径的指定是非常有利的。FileInputFormat类提供了2种静态方法用于指定作业的输入路径:

1
2
public static void addInputPaths(JobConf conf, String commaSeparatedPaths);
public static void addInputPath(JobConf conf, Path path);

(2)KeyValueTextInputFormatKeyValueTextInputFormat它每一行均为一条记录,被分隔符(缺省为Tab)分割为Key(Text类型)、Value(Text类型)。

MapReduce的输出格式

针对前面学习的输入格式,Hadoop都有其对应的输出格式。默认情况下只有一个Reduce,也就是一个输出文件,其文件名为part-r-00000。查看一下之前wordCount程序的输出文件,如下所示:

1
2
3
[root@master ~]# hadoop fs -ls -R /outputwc
-rw-r--r-- 1 root supergroup 0 2020-06-04 21:27 /outputwc/_SUCCESS
-rw-r--r-- 1 root supergroup 28 2020-06-04 21:27 /outputwc/part-r-00000

输出文件的个数与Reduce的个数一致,如果有两个Reduce,那么输出结果就有两个文件,第一个为part-r-00000,第二个为part-r-00001,以此类推。

OutputFormat接口

OutputFormat接口主要用于描述数据的格式,它能够将用户提供的Key/Value键值对写入特定格式额度文件中。通过OutputFormat接口,开发者也可以实现具体的输出格式,但是过程较为复杂,通常都是使用默认的格式进行输出。

OutputFormat接口实现类

Hadoop既然提供了大量的InputFormat接口的实现类,那么就有与之对应的OutputFormat接口实现类,进而完成数据的输入输出操作。

下面是OutputFormat接口所有的实现类:

请注意OutputFormat类是MapReduce输出的基类,所有MapReduce输出都实现了OutputFormat接口。

接下来学习几个工作中常用的OutputFormat接口的实现类,如下所示:
(1)TextOutputFormat(文本输出),这也是默认的输出格式,也就是说MapReduce默认将数据以文本的形式进行输出。TextOutputFormat将每条记录写为文本行,利用它的键和值可以实现Writable的任意类型,之后TextOutputFormat可以调用toString()方法将它们转换为字符串。

请注意每个Key/Value键值对由制表符进行分割,当然也可以通过设置mapreduce.output.textoutputformat.separator属性来修改默认的分隔符:

请注意,与FileOutputFormat相对应的输入格式是KeyValueTextInputFormat,同样它也是可以通过前面的mapreduce.output.textoutputformat.separator属性来设置分隔符进而将Key/Value键值对进行文本分割。

还可以使用NullWritable来省略输出的Key或者Value,甚至两者都省略,此时就相当于NullOutputFormat输出格式,这会导致数据无分割符进行输出,此时输出只适合用TextInputFormat来读取。
(2)SequenceFileOutputFormat(二进制输出),它用于将它的输出写为一个顺序文件。请注意,当输出需要作为后续MapReduce任务的输入时,此时推荐使用SequenceFileOutputFormat(二进制输出),因为它格式紧凑,便于压缩。

Combiner操作

概述

通过前面的学习,我们知道Hadoop框架使用Mapper将数据处理成一个Key/Value键值对,之后再在网络节点中对其进行整理(shuffle),接着再使用Reducer处理数据并进行最终输出。

假设现在有一个场景:现在有10亿个数据,Mapper会生成10亿个Key/Value键值对,这些键值对在网络间进行传输,但是现在我们仅仅是想对数据求最大值,那么此时Mapper只需输出它所知道的最大值即可,这样不仅能减轻网络压力,还可以提高程序的执行效率。

正是由于这种场景在实际工作中非常普通,因此MapReduce框架提供了Combiner来避免map任务与reduce任务之间无效的数据传输。Hadoop允许用户针对map任务的输出指定一个合并函数,用于减少输出到Reducer中的数据量。也就是说这个合并函数主要是为了削减Mapper的输出数量,进而减少网络带宽和Reducer上的负载。

开发者完全可以将Combiner操作,看成是在每个单独的节点上,先进行一次Reducer操作,只不过此时的输入和输出的参数和Reducer是一样的。

请注意开发者可以使用Combiner操作来进行求和、求最值,但是无法求平均数。

下图是之前WordCount程序的执行过程:

使用Combiner操作的执行过程:

使用

接下来以之前学习的WordCount程序为例,来学习如何在实际工作中使用Combiner操作。

在app包内新建一个WordCountCombinerApp类,其中的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
public class WordCountCombinerApp {
public static class MyMapper extends Mapper<Object, Text,Text, IntWritable>{
//new一个int类型用于计数,并赋值为1,因为不管每个单词出现几次,都是直接输出1
//假设输入数据为nice to nice,one赋值为1,那么使用context.write(word,one)的输出结果就是nice 1,to 1,nice 1
//如果one赋值为2,那么使用context.write(word,one)的输出结果就是nice 2,to 2,nice 2
private static final IntWritable one = new IntWritable(1);
//new一个String类型
private Text word = new Text();


/**
* 重写map函数,其中的context参数是上下文,用于记录key和value的值
* */
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//字符串分解类,将一行单词分解为多个单词
StringTokenizer itr = new StringTokenizer(value.toString());
//循环条件表示返回是否还有分隔符
while (itr.hasMoreTokens()){
//使用itr.nextToken()来获取单词
//word.set()方法将Java数据类型转换为Hadoop数据类型,这样后续才能输出
word.set(itr.nextToken());
//按照输出格式来输出结果,如nice 1
context.write(word,one);
}
}
}

public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

/**
* 重写reduce函数,其中的context参数是上下文,用于记录key和result的值
* */
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//定义一个Java数据类型的sum,用于记录相同单词的数量
int sum = 0;
//遍历得到的结果
for(IntWritable value:values){
//由于value是IntWritable类型,因此需要调用get()方法来返回Java类型的值
//请注意IntWritable.get()和IntWritable.set()都可以进行数据类型转换,不同的是set是将Java数据类型转换为Hadoop数据类型
//而get则是将Hadoop数据类型转换为Java数据类型

sum+=value.get();
}
//将Java数据类型转换为Hadoop数据类型
result.set(sum);
//将结果输出到HDFS中
context.write(key,result);
}
}

public static void main(String[] args) throws Exception {
//配置输入路径
String INPUT_PATH = "hdfs://master:9000/cominputwc";
//配置输出路径
String OUTPUT_PATH = "hdfs://master:9000/comoutputwc";

//读取Hadoop的配置文件
Configuration configuration= new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
if(fileSystem.exists(new Path(OUTPUT_PATH))){
fileSystem.delete(new Path(OUTPUT_PATH),true);
}

//创建一个任务,第一个参数是配置对象,第二个参数是任务名称,这里是EnvyWordCountApp
Job job = Job.getInstance(configuration,"WordCountCombinerApp");

//运行Jar类
job.setJarByClass(WordCountCombinerApp.class);

//设置map
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//设置combiner
job.setCombinerClass(MyReducer.class);

//设置reduce
job.setReducerClass(MyReducer.class);
//设置输出结果的key/value的类型,也就是最终存储在HDFS上结果文件的key/value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//设置输入格式
//设置输入文件路径
job.setInputFormatClass(TextInputFormat.class);
Path inputPath = new Path(INPUT_PATH);
FileInputFormat.addInputPath(job,inputPath);

//设置输出格式
job.setOutputFormatClass(TextOutputFormat.class);
Path outputPath = new Path(OUTPUT_PATH);
//设置输出文件路径
FileOutputFormat.setOutputPath(job, outputPath);

//提交job
//如果job运行成功,那么程序就会正常退出
System.exit(job.waitForCompletion(true)?0:1);
}
}