写在前面

MapReduce基础内容除了前面介绍的外,还有两个比较重要的内容:Partitioner操作和自定义RecordReader。

Partitioner操作

概述

在进行MapReduce计算时,有时候可能需要将最终的输出数据分到不同的文件中,这种情况是非常普遍的。

举个例子,现在需要将销售额按照省份来进行划分,那么就需要将同一省份的数据在一个文件中。

我们知道最终的输出数据都来自Reducer任务,因此想得到多个文件,这就意味着需要有相同数量的Reducer任务在运行。而Reducer任务的数量又来自于Mapper任务,也就是说任务的数量是由Mapper任务决定的。Mapper任务需要划分数据,将不同的数据分配给不同的Reducer任务,之后由它来生成对应的数据文件。我们称Mapper任务划分数据的过程为Partition,称负责划分数据的类为Partitioner。

请注意,MapReduce默认的Partitioner类为HashPartitioner。一般来说,Partitioner会先计算Key的哈希值(通常为md5值),然后通过Reducer个数来执行取模运算,即key.hashCode%(reducer个数)。这种方式不仅可以随机的将整个Key空间平均分配给每一个Reducer,同时也可以确保不同Mapper产生的相同Key能分配到同一个Reducer。

使用

接下来通过一个例子来学习如何使用Partitioner类。需求是统计每种类型手机的销售情况,要求每种类型手机的统计数据单独存放在一个结果中。

在app包内新建一个MobilePartitionerApp类,其中的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public class MobilePartitionerApp {
public static class MyMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
/**
* 重写map函数,其中的context参数是上下文,用于记录key和value的值
* */
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] s = value.toString().split("\t");
context.write(new Text(s[0]),new IntWritable(Integer.parseInt(s[1])));
}
}

public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
/**
* 重写reduce函数,其中的context参数是上下文,用于记录key和sum的值
* */
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable value:values){
sum+=value.get();
}
context.write(key,new IntWritable(sum));
}
}

public static class MyPartitioner extends Partitioner<Text,IntWritable> {
/**
* 重写getPartition函数,将任务转发给4个不同的Reducer
* */
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
if("xiaomi".equals(key.toString())){
return 0;
}else if("huawei".equals(key.toString())){
return 1;
}else if("iphone12".equals(key.toString())){
return 2;
}else {
return 3;
}
}
}

public static void main(String[] args) throws Exception {
String INPUT_PATH = "hdfs://master:9000/inpartitioner";
String OUTPUT_PATH = "hdfs://master:9000/outpartitioner";

Configuration configuration= new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
if(fileSystem.exists(new Path(OUTPUT_PATH))){
fileSystem.delete(new Path(OUTPUT_PATH),true);
}

Job job = Job.getInstance(configuration,"MobilePartitionerApp");

//运行Jar类
job.setJarByClass(MobilePartitionerApp.class);

//设置map
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//设置reduce
job.setReducerClass(MyReducer.class);
//设置输出结果的key/value的类型,也就是最终存储在HDFS上结果文件的key/value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//设置partitioner
job.setPartitionerClass(MyPartitioner.class);
//设置4个Reducer,每个分区一个
job.setNumReduceTasks(4);

//设置输入格式
//设置输入文件路径
job.setInputFormatClass(TextInputFormat.class);
Path inputPath = new Path(INPUT_PATH);
FileInputFormat.addInputPath(job,inputPath);

//设置输出格式
job.setOutputFormatClass(TextOutputFormat.class);
Path outputPath = new Path(OUTPUT_PATH);
//设置输出文件路径
FileOutputFormat.setOutputPath(job, outputPath);

//提交job
//如果job运行成功,那么程序就会正常退出
System.exit(job.waitForCompletion(true)?0:1);
}
}

运行作业

接下来将提交该作业至集群中运行,操作过程如下所示:
(1)在本地Windows机器上使用mvn clean package -DskipTests命令或者直接在IDEA中使用maven插件进行清理打包,之后就可以生成envy-mapreduce-1.0-SNAPSHOT.jar包,为了和之前的代码进行区分,这里将其重命名为envy-mapreduce-partitioner-1.0-SNAPSHOT.jar
(2)将(1)中的jar包上传至虚拟机/home/hadoop/lib目录下;
(3)在虚拟机/home/hadoop目录中新建一个part_1.txt的测试文件,其中的内容如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
huawei,1
iphoneX,1
huawei,1
iphoneX,1
xiaomi,1
meizu,1
huawei,1
iphoneX,1
xiaomi,1
huawei,1
meizu,1
iphoneX,1
xiaomi,1
huawei,1
meizu,1

接着再新建另一个part_2.txt的测试文件,其中的内容如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
huawei,1
huawei,1
meizu,1
iphoneX,1
xiaomi,1
iphoneX,1
huawei,1
iphoneX,1
xiaomi,1
meizu,1
huawei,1
meizu,1
iphoneX,1
xiaomi,1
iphoneX,1
iphoneX,1
huawei,1
iphoneX,1
huawei,1
meizu,1

之后在HDFS根目录下新建inpartitioner目录(这个目录就必须与你设置的INPUT_PATH保持一致,前面的hdfs://master:9000表示主机),之后将这两个文件上传至该目录:

1
2
[root@master sbin]# hadoop fs -mkdir /inpartitioner
[root@master sbin]# hadoop fs -put /home/hadoop/part_1.txt part_2.txt /inpartitioner

之后确认文件上传HDFS中:

1
2
3
[root@master sbin]# hadoop fs -ls -R /inpartitioner
-rw-r--r-- 1 root supergroup 353 2020-06-11 22:24 /inpartitioner/part_1.txt
-rw-r--r-- 1 root supergroup 345 2020-06-11 22:24 /inpartitioner/part_2.txt

(4)提交MapReduce作业到集群中运行:

1
[root@master sbin]# hadoop jar /home/hadoop/lib/envy-mapreduce-partitioner-1.0-SNAPSHOT.jar com.envy.envymapreduce.app.MobilePartitionerApp

执行流程如下所示:

(5)查看作业输出结果,如下所示:

1
2
3
4
5
6
7
[root@master lib]# hadoop fs -ls /outpartitioner
Found 5 items
-rw-r--r-- 1 root supergroup 0 2020-06-11 23:11 /outpartitioner/_SUCCESS
-rw-r--r-- 1 root supergroup 9 2020-06-11 23:11 /outpartitioner/part-r-00000
-rw-r--r-- 1 root supergroup 10 2020-06-11 23:11 /outpartitioner/part-r-00001
-rw-r--r-- 1 root supergroup 0 2020-06-11 23:11 /outpartitioner/part-r-00002
-rw-r--r-- 1 root supergroup 19 2020-06-11 23:11 /outpartitioner/part-r-00003

之后查看一下各个文件的内容,如下所示:

1
2
3
4
5
[root@master lib]# hadoop fs -text /outpartitioner/part-r-*
xiaomi 6
huawei 11
iphoneX 11
meizu 7

自定义RecordReader

概述

RecordReader表示从以怎样的方式从分片中读取一条记录,请注意每读取一条记录都会调用一次RecordReader类。

系统默认的RecordReader类是LineRecordReader,同时它也是TextInputFormat对应的RecordReader。而SequenceFileInputFormat对应的RecordReader类是SequenceFileRecordReader。也就是说LineRecordReader以每行的偏移量作为读入Map的Key,每行的内容作为读入Map的Value。

但是在很多情况下,Hadoop内置的RecordReader并不能满足需求。举个例子,开发者希望在读取记录的时候,Map读入的Key不是偏移量,而是行号或者文件名称,那么此时就需要自定义RecordReader类了。

步骤

开发者自定义RecordReader的步骤如下:
(1)定义一个类,使其继承抽象类RecordReader,即提供一个实现RecordReader的实例;
(2)实现自定义InputFormat类,重写InputFormat类中的CreateRecordReader()方法,注意该方法返回值是自定义的RecordReader实例;
(3)配置job.setInputFormatClass()方法为自定义的InputFormat实例。

使用

接下来通过一个例子来学习如何自定义RecordReader。需求是分别统计数据文件中奇数行和偶数行的和,测试文件已经给出。

新建一个recordreader包,接着在该包内新建一个MyPartitioner类,注意这个类需要继承Partitioner类,内容如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class MyPartitioner extends Partitioner<LongWritable, Text> {
@Override
public int getPartition(LongWritable key, Text value, int numPartitions) {
//偶数放到第二个分区进行计算
if(key.get()%2==0){
//将输入到reduce中的key设置为1
key.set(1);
return 1;
}else{
//奇数放到第一个分区进行计算
//将输入到reduce中的key设置为0
key.set(0);
return 0;
}
}
}

接着在该包内新建一个MyInputFormat类,注意这个类需要继承FileInputFormat类,内容如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class MyInputFormat extends FileInputFormat<LongWritable, Text> {
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
//返回自定义的RecordReader类
return new RecordReaderApp.MyRecordReader();
}

/**
* 为了使切分数据的时候行号不发生错乱,这里设置为不进行切分
* */
protected boolean isSplitable(FileSystem fileSystem, Path filename){
return false;
}
}

然后在该包内新建一个RecordReaderApp类,里面需要自定义一个RecordReader类并让其继承抽象类RecordReader,内容如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
public class RecordReaderApp {

/**
* 自定义RecordReader类
*/
public static class MyRecordReader extends RecordReader<LongWritable, Text>{
//相对于整个分片来说,起始位置
private long startPosition;

//相对于整个分片来说,结束位置
private long stopPostiton;

//相对于整个分片来说,当前位置
private long nowPosition;

//文件输入流
private FSDataInputStream fsDataInputStream = null;

//定义Key、Value类型
private LongWritable key = null;
private Text value = null;

//定义行阅读器,注意是hadoop.util包下的类
private LineReader reader = null;

@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
//获取分片
FileSplit fileSplit = (FileSplit) inputSplit;
//获取起始位置
startPosition = fileSplit.getStart();
//获取结束位置
stopPostiton = fileSplit.getLength()+startPosition;
//创建配置
Configuration configuration = taskAttemptContext.getConfiguration();
//获取文件路径
Path path = fileSplit.getPath();
//根据路径获取文件系统
FileSystem fileSystem = path.getFileSystem(configuration);
//打开文件输入流
fsDataInputStream =fileSystem.open(path);
//找到开始位置开始读取
fsDataInputStream.seek(startPosition);
//创建阅读器
reader = new LineReader(fsDataInputStream);
//将当前位置设置为1
nowPosition = 1;
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if(key ==null){
key = new LongWritable();
}
key.set(nowPosition);
if(value ==null){
value = new Text();
}
if(reader.readLine(value)==0){
return false;
}
nowPosition ++;
return true;
}

@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}

@Override
public void close() throws IOException {
fsDataInputStream.close();
}
}

public static class MyMapper extends Mapper<LongWritable, Text,LongWritable, Text> {
/**
* 重写map函数
* */
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable,Text,LongWritable,Text>.Context context) throws IOException, InterruptedException {
//直接将读取的记录写出去
context.write(key,value);
}
}

public static class MyReducer extends Reducer<LongWritable,Text,Text,LongWritable> {
//创建写出去的Key和Value
private Text outKey = new Text();
private LongWritable outValue = new LongWritable();

/**
* 重写reduce函数
* */
@Override
public void reduce(LongWritable key, Iterable<Text> values, Reducer<LongWritable,Text,Text,LongWritable>.Context context) throws IOException, InterruptedException {
System.out.println("奇数行还是偶数行:"+key);

//定义求和的变量
long sum = 0;
//遍历values进行求和
for(Text value:values){
//累加操作
sum += Long.parseLong(value.toString());
}
//判断奇偶数
if(key.get()==0){
outKey.set("奇数之和为:");
}else{
outKey.set("偶数之和为:");
}
//设置value
outValue.set(sum);
//将结果写出去
context.write(outKey,outValue);
}
}

//定义一个driver
public static void main(String[] args) throws Exception {
String INPUT_PATH = "hdfs://master:9000/inrecordreader";
String OUTPUT_PATH = "hdfs://master:9000/outrecordreader";

Configuration configuration= new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
if(fileSystem.exists(new Path(OUTPUT_PATH))){
fileSystem.delete(new Path(OUTPUT_PATH),true);
}

Job job = Job.getInstance(configuration,"RecordReaderApp");

//运行Jar类
job.setJarByClass(RecordReaderApp.class);

//设置输入格式
//设置输入文件路径
FileInputFormat.setInputPaths(job,INPUT_PATH);
job.setInputFormatClass(MyInputFormat.class);

//设置自定义Mapper类和map函数输出数据的Key和Value的类型
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);

//设置分区和reduce数量(reduce的数量应与分区数量相对应)
//设置partitioner
job.setPartitionerClass(MyPartitioner.class);
//设置4个Reducer,每个分区一个
job.setNumReduceTasks(4);

//Shuffle把数据从Map端拷贝到Reduce端
//指定Reducer类和输出Key、Value的类型
job.setReducerClass(MyReducer.class);
//设置输出结果的key/value的类型,也就是最终存储在HDFS上结果文件的key/value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);


//设置输出格式
//设置输出文件路径
Path outputPath = new Path(OUTPUT_PATH);
FileOutputFormat.setOutputPath(job, outputPath);
job.setOutputFormatClass(TextOutputFormat.class);


//提交job
//如果job运行成功,那么程序就会正常退出
System.exit(job.waitForCompletion(true)?0:1);
}
}

运行作业

接下来将提交该作业至集群中运行,操作过程如下所示:
(1)在本地Windows机器上使用mvn clean package -DskipTests命令或者直接在IDEA中使用maven插件进行清理打包,之后就可以生成envy-mapreduce-1.0-SNAPSHOT.jar包,为了和之前的代码进行区分,这里将其重命名为envy-mapreduce-recordreader-1.0-SNAPSHOT.jar
(2)将(1)中的jar包上传至虚拟机/home/hadoop/lib目录下;
(3)在虚拟机/home/hadoop目录中新建一个recordreader.txt的测试文件,其中的内容如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

之后在HDFS根目录下新建inrecordreader目录(这个目录就必须与你设置的INPUT_PATH保持一致,前面的hdfs://master:9000表示主机),之后将这两个文件上传至该目录:

1
2
[root@master sbin]# hadoop fs -mkdir /inrecordreader
[root@master sbin]# hadoop fs -put /home/hadoop/recordreader.txt /inrecordreader

之后确认文件上传HDFS中:

1
2
[root@master sbin]# hadoop fs -ls -R /inrecordreader
-rw-r--r-- 1 root supergroup 39 2020-06-12 21:36 /inrecordreader/recordreader.txt

(4)提交MapReduce作业到集群中运行:

1
[root@master sbin]# hadoop jar /home/hadoop/lib/envy-mapreduce-recordreader-1.0-SNAPSHOT.jar com.envy.envymapreduce.recordreader.RecordReaderApp

执行流程如下所示:

(5)查看作业输出结果,如下所示:

1
2
3
4
5
6
7
[root@master lib]# hadoop fs -ls /outrecordreader
Found 5 items
-rw-r--r-- 1 root supergroup 0 2020-06-12 21:42 /outrecordreader/_SUCCESS
-rw-r--r-- 1 root supergroup 22 2020-06-12 21:42 /outrecordreader/part-r-00000
-rw-r--r-- 1 root supergroup 22 2020-06-12 21:42 /outrecordreader/part-r-00001
-rw-r--r-- 1 root supergroup 0 2020-06-12 21:42 /outrecordreader/part-r-00002
-rw-r--r-- 1 root supergroup 0 2020-06-12 21:42 /outrecordreader/part-r-00003

之后查看一下各个文件的内容,如下所示:

1
2
3
[root@master lib]# hadoop fs -text /outrecordreader/part-r-*
奇数之和为: 64
偶数之和为: 72