写在前面

前面学习的都是MapReduce较为基本的操作,接下来学习一些MapReduce的高级应用,主要包括使用MapReduce完成join操作、排序操作、二次排序操作和小文件合并操作,这些应用在实际工作中非常有帮助。

MapReduce实现join操作

join操作概述

如果你之前熟悉数据库,那么肯定知道使用SQL语法实现join操作是非常简单的,但是在大数据场景下使用MapReduce编程模型来实现类似的join操作其实是比较困难的。通常在实际工作中,我们都是借助于Hive、Spark SQL等框架来实现join,既然这里是学习,那么对于MapReduce实现join操作的原理还是有必要学习一下,它对于理解join操作的底层是有非常大的帮助。接下来开始学习,如何使用MapReduce API来实现join操作。

需求分析

假设现在有如下两个txt文件,里面的内容如下所示:

1
2
3
4
5
6
7
8
9
10
11
//emp.txt数据
519,smith,clerk,8920,1990-10-01,880.00,20
536,alien,salesman,8945,1992-09-11,1600.00,30
558,ward,salesman,8945,1992-09-22,1380.00,30
596,jones,manager,8968,1992-12-19,3682.00,20

//dept.txt数据
10,accounting,new york
20,research,dallas
30,sales,chicago
40,operations,boston

其中emp.txt中各列的含义为empno、ename、ejob、enumber、ebirthday、emoney、deptno。而dept.txt中各列的含义为deptno、dname和city。

将下来有如下一条SQL语句:

1
select e.empno,e.ename,d.deptno,d.dname from emp e join dept d on e.deptno=d.deptno;

可以看到这个SQL语句中只使用了empno、ename、deptno、dname 等4个属性,因此可以定义一个Employee,然后再加一个flag属性,当它值为1标识dept,为0则是emp。
我们希望能使用MapReduce来实现上述SQL的功能。

Map端join的实现原理

接下来学习MapReduce Map端的join的实现原理,如下所示:
(1)Map端读取所有的文件,并在输出的内容中加上标识,用于表示数据是从哪个文件里来的。
(2)在Reduce处理函数中,按照(1)中的标识来对数据进行处理;
(3)之后根据Key,使用join来求出结果并进行输出。

Map端join的代码实现

(1)定义一个员工类。新建一个reducejoin包,并在该包内新建一个Employee类,其中的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public class Employee implements WritableComparable {
private String empNo = "";
private String empName = "";
private String deptNo = "";
private String deptName = "";
//用于区分是员工还是部门
private int flag = 0;

public Employee() {
}

public Employee(String empNo, String empName, String deptNo, String deptName, int flag) {
this.empNo = empNo;
this.empName = empName;
this.deptNo = deptNo;
this.deptName = deptName;
this.flag = flag;
}

/**
* 这个方法后续使用非常方便
* */
public Employee(Employee employee){
this.empNo = employee.empNo;
this.empName = employee.empName;
this.deptNo = employee.deptNo;
this.deptName = employee.deptName;
this.flag = employee.flag;
}

public String getEmpNo() {
return empNo;
}

public void setEmpNo(String empNo) {
this.empNo = empNo;
}

public String getEmpName() {
return empName;
}

public void setEmpName(String empName) {
this.empName = empName;
}

public String getDeptNo() {
return deptNo;
}

public void setDeptNo(String deptNo) {
this.deptNo = deptNo;
}

public String getDeptName() {
return deptName;
}

public void setDeptName(String deptName) {
this.deptName = deptName;
}

public int getFlag() {
return flag;
}

public void setFlag(int flag) {
this.flag = flag;
}

@Override
public String toString() {
return "Employee{" +
"empNo='" + empNo + '\'' +
", empName='" + empName + '\'' +
", deptNo='" + deptNo + '\'' +
", deptName='" + deptName + '\'' +
", flag=" + flag +
'}';
}

@Override
public void readFields(DataInput dataInput) throws IOException {
this.empNo = dataInput.readUTF();
this.empName = dataInput.readUTF();
this.deptNo = dataInput.readUTF();
this.deptName = dataInput.readUTF();
this.flag = dataInput.readInt();
}

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.empNo);
dataOutput.writeUTF(this.empName);
dataOutput.writeUTF(this.deptNo);
dataOutput.writeUTF(this.deptName);
dataOutput.writeInt(this.flag);
}

//不进行排序
@Override
public int compareTo(Object o) {
return 0;
}
}

(2)自定义一个Mapper类。在reducejoin包内新建一个MyMapper类,其中的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class MyMapper extends Mapper<LongWritable, Text,LongWritable,Employee> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] valArray = value.toString().split(",");
System.out.println("valArray.length="+valArray.length + " valArray[0]="+valArray[0]);

if(valArray.length<=3){
//根据每行依据逗号分隔后形成的数组的长度来判断是emp还是dept
//小于或者等于3为dept
Employee dept = new Employee();
dept.setDeptNo(valArray[0]);
dept.setDeptName(valArray[1]);
//dept的flag的值为1
dept.setFlag(1);
context.write(new LongWritable(Long.parseLong(valArray[0])),dept);
}else{
//反正则说明为emp
Employee employee = new Employee();
employee.setEmpNo(valArray[0]);
employee.setEmpName(valArray[1]);
employee.setDeptNo(valArray[6]);
//emp的flag的值为0
//emp的部门deptName需要通过deptNo从dept中获取
employee.setFlag(0);
context.write(new LongWritable(Long.parseLong(valArray[0])),employee);
}
}
}

(3)自定义一个Reducer类。在reducejoin包内新建一个MyReducer类,其中的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class MyReducer extends Reducer<LongWritable,Employee, NullWritable, Text> {
@Override
protected void reduce(LongWritable key, Iterable<Employee> values, Context context) throws IOException, InterruptedException {
//员工与部门之间存在多对一的关系
Employee dept = null;
List<Employee> employeeList = new ArrayList<>();

for(Employee value:values){
if(value.getFlag()==0){
//emp的flag的值为0,故此为emp
Employee employee = new Employee(value);
employeeList.add(employee);
}else{
//dept的flag的值为1,故此为dept
dept = new Employee(value);
}
}

if(dept!=null){
for(Employee employee:employeeList){
employee.setDeptName(dept.getDeptName());
context.write(NullWritable.get(),new Text(employee.toString()));
}
}
}
}

(4)自定义一个驱动类。在reducejoin包内新建一个MyReduceJoinApp类,其中的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class MyReduceJoinApp {
public static void main(String[] args) throws URISyntaxException, IOException, ClassNotFoundException, InterruptedException {
String INPUT_PATH = "hdfs://master:9000/inputjoin";
String OUTPUT_PATH = "hdfs://master:9000/outputjoin";

Configuration configuration= new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
if(fileSystem.exists(new Path(OUTPUT_PATH))){
fileSystem.delete(new Path(OUTPUT_PATH),true);
}

Job job = Job.getInstance(configuration,"MyReduceJoinApp");

//运行Jar类
job.setJarByClass(MyReduceJoinApp.class);

//设置自定义Mapper类和map函数输出数据的Key和Value的类型
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Employee.class);

//Shuffle把数据从Map端拷贝到Reduce端
//指定Reducer类和输出Key、Value的类型
job.setReducerClass(MyReducer.class);
//设置输出结果的key/value的类型,也就是最终存储在HDFS上结果文件的key/value的类型
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Employee.class);


//设置输入/输出格式
FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));


//提交job
//如果job运行成功,那么程序就会正常退出
System.exit(job.waitForCompletion(true)?0:1);
}
}

运行作业

接下来将提交该作业至集群中运行,操作过程如下所示:
(1)在本地Windows机器上使用mvn clean package -DskipTests命令或者直接在IDEA中使用maven插件进行清理打包,之后就可以生成envy-mapreduce-1.0-SNAPSHOT.jar包,为了和之前的代码进行区分,这里将其重命名为envy-mapreduce-reducejoin-1.0-SNAPSHOT.jar
(2)将(1)中的jar包上传至虚拟机/home/hadoop/lib目录下;
(3)在虚拟机/home/hadoop目录中新建一个emp.txt的测试文件,其中的内容如下所示:

1
2
3
4
519,smith,clerk,8920,1990-10-01,880.00,20
536,alien,salesman,8945,1992-09-11,1600.00,30
558,ward,salesman,8945,1992-09-22,1380.00,30
596,jones,manager,8968,1992-12-19,3682.00,20

接着再新建另一个dept.txt的测试文件,其中的内容如下所示:

1
2
3
4
10,accounting,new york
20,research,dallas
30,sales,chicago
40,operations,boston

之后在HDFS根目录下新建inputjoin目录(这个目录就必须与你设置的INPUT_PATH保持一致,前面的hdfs://master:9000表示主机),之后将这两个文件上传至该目录:

1
2
3
[root@master sbin]# hadoop fs -mkdir /inputjoin
[root@master sbin]# hadoop fs -put /home/hadoop/emp.txt /inputjoin
[root@master sbin]# hadoop fs -put /home/hadoop/dept.txt /inputjoin

之后确认文件上传HDFS中:

1
2
3
[root@master sbin]# hadoop fs -ls -R /inputjoin
-rw-r--r-- 1 root supergroup 80 2020-06-13 21:58 /inputjoin/dept.txt
-rw-r--r-- 1 root supergroup 178 2020-06-13 21:56 /inputjoin/emp.txt

(4)提交MapReduce作业到集群中运行:

1
[root@master sbin]# hadoop jar /home/hadoop/lib/envy-mapreduce-reducejoin-1.0-SNAPSHOT.jar com.envy.envymapreduce.reducejoin.MyReduceJoinApp

执行流程如下所示:

(5)查看作业输出结果,如下所示:

1
2
3
4
[root@master lib]# hadoop fs -ls /outputjoin
Found 5 items
-rw-r--r-- 1 root supergroup 0 2020-06-13 22:00 /outputjoin/_SUCCESS
-rw-r--r-- 1 root supergroup 0 2020-06-13 22:00 /outputjoin/part-r-00000

之后查看一下各个文件的内容,如下所示:

1
2
3
4
5
[root@master lib]# hadoop fs -text /outputjoin/*
Employee{empNo='519', empName='smith', deptNo='20', deptName='research', flag=0}
Employee{empNo='596', empName='jones', deptNo='20', deptName='research0', flag=0}
Employee{empNo='536', empName='alien', deptNo='30', deptName='sales', flag=0}
Employee{empNo='558', empName='ward', deptNo='30', deptName='sales', flag=0}

MapReduce实现排序操作

需求分析

现在要求对输入文件中的数据进行排序,其中输入文件中的每行内容均为一个数字,即一个数据。要求在每行输出两个间隔的数字,其中第一个数字代表原始数据在原始数据集中的排位,第二个数字则代表原始数据。

MapReduce排序的实现原理

MapReduce默认支持排序,如果Key为封装int的IntWritable类型,那么MapReduce将会按照数字大小来对Key进行排序。如果Key为封装String的Text类型,那么MapReduce将会按照字典顺序来对字符串进行排序。

因此开发者可以使用MapReduce中内置的排序功能来实现上述需求,但在此之前需要了解默认排序的规则,即按照Key值进行排序。

那么上例,我们就应该使用封装int的IntWritable类型,也就是在map中将读入的数据转化成IntWritable类型,之后将其作为Key值(此时的Value值随意);reduce在拿到<Key,Value-list>之后,将输入的Key作为Value进行输出,并根据Value-list中元素的个数来决定输出的次数。也就是说输出的Key其实是一个全局变量,它用于统计Key的当前位次信息。

MapReduce排序的代码实现

新建一个sort包,并在里面新建一个SortApp类,其中的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class SortApp {
/**
* 接口中泛型KEYIN, VALUEIN, KEYOUT, VALUEOUT
* */
public static class MyMapper extends Mapper<LongWritable, Text, IntWritable,IntWritable>{

private static IntWritable data = new IntWritable();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
data.set(Integer.parseInt(line));
context.write(data,new IntWritable(1));
}
}

/**
* 接口中泛型KEYIN, VALUEIN, KEYOUT, VALUEOUT
* */
public static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable,IntWritable> {

private static IntWritable data = new IntWritable(1);

@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//此处的key其实就是mapper中的data,即原始数据
for(IntWritable value:values){
context.write(data,key);
data = new IntWritable(data.get()+1);
}
}
}

public static void main(String[] args) throws URISyntaxException, IOException, ClassNotFoundException, InterruptedException {
String INPUT_PATH = "hdfs://master:9000/inputsort";
String OUTPUT_PATH = "hdfs://master:9000/outputsort";

Configuration configuration= new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
if(fileSystem.exists(new Path(OUTPUT_PATH))){
fileSystem.delete(new Path(OUTPUT_PATH),true);
}

Job job = Job.getInstance(configuration,"SortApp");

//运行Jar类
job.setJarByClass(SortApp.class);

//设置自定义Mapper类和map函数输出数据的Key和Value的类型
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);

//Shuffle把数据从Map端拷贝到Reduce端
//指定Reducer类和输出Key、Value的类型
job.setReducerClass(MyReducer.class);
//设置输出结果的key/value的类型,也就是最终存储在HDFS上结果文件的key/value的类型
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);

//设置输入/输出格式
FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

//提交job
//如果job运行成功,那么程序就会正常退出
System.exit(job.waitForCompletion(true)?0:1);
}
}

运行作业

接下来将提交该作业至集群中运行,操作过程如下所示:
(1)在本地Windows机器上使用mvn clean package -DskipTests命令或者直接在IDEA中使用maven插件进行清理打包,之后就可以生成envy-mapreduce-1.0-SNAPSHOT.jar包,为了和之前的代码进行区分,这里将其重命名为envy-mapreduce-sort-1.0-SNAPSHOT.jar
(2)将(1)中的jar包上传至虚拟机/home/hadoop/lib目录下;
(3)在虚拟机/home/hadoop目录中新建一个sort.txt的测试文件,其中的内容如下所示:

1
2
3
4
5
6
7
8
9
1
4
3
6
8
5
2
9
7

之后在HDFS根目录下新建inputsort目录(这个目录就必须与你设置的INPUT_PATH保持一致,前面的hdfs://master:9000表示主机),之后将这个文件上传至该目录:

1
2
[root@master sbin]# hadoop fs -mkdir /inputsort
[root@master sbin]# hadoop fs -put /home/hadoop/sort.txt /inputsort

之后确认文件上传HDFS中:

1
2
[root@master sbin]# hadoop fs -ls -R /inputsort
-rw-r--r-- 1 root supergroup 80 2020-06-13 21:58 /inputsort/sort.txt

(4)提交MapReduce作业到集群中运行:

1
[root@master sbin]# hadoop jar /home/hadoop/lib/envy-mapreduce-sort-1.0-SNAPSHOT.jar com.envy.envymapreduce.sort.SortApp

执行流程如下所示:

(5)查看作业输出结果,如下所示:

1
2
3
4
[root@master lib]# hadoop fs -ls /outputsort
Found 2 items
-rw-r--r-- 1 root supergroup 0 2020-06-16 16:19 /outputsort/_SUCCESS
-rw-r--r-- 1 root supergroup 36 2020-06-16 16:19 /outputsort/part-r-00000

之后查看一下各个文件的内容,如下所示:

1
2
3
4
5
6
7
8
9
10
[root@master hadoop]# hadoop fs -text /outputsort/*
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9

MapReduce实现二次排序操作

二次排序概述

默认情况下,Map输出的结果会对Key进行默认的排序,但是有时候在对Key进行排序的同时,还需要对Value进行排序,这就是常说的二次排序。

需求分析

现在要求对输入文件中的数据进行二次排序,如下所示的每行两列,列与列之间采用逗号进行分割,要求输出结果先按照第一列的值进行升序排序,如果第一列的值相等,那么就按照第二列的值进行升序排序。下面就是经过二次排序后的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
30 10
30 20
30 30
30 40

40 5
40 10
40 20
40 30

50 10
50 20
50 50
50 60

MapReduce二次排序的实现原理

(1)Mapper任务会接收输入分片,然后不断调用map函数,进而对记录进行处理,处理完成后,将其转换为新的<key,value>输出。
(2)对map函数输出的<key,value>调用分区函数,将数据进行分区。不同分区的数据会被送到不同的Reducer任务中。
(3)对于不同分区的数据,它会按照Key进行排序,因此此处的Key必须实现WritableComparable接口,该接口继承了Comparable,因此可以进行比较排序。
(4)对于排序后的<key,value>,它会按照Key进行分组。如果Key相同,那么相同Key的<key,value>就被分到一个组中,这样使得最后每个分组都会调用一次reduce函数。
(5)排序、分组后的数据就会被送到Reducer节点中。

MapReduce二次排序的代码实现

新建一个secondsort包,并在里面新建一个IntPair类,该类就是定义每次(行读取)的两个整数。其中的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class IntPair implements WritableComparable<IntPair> {
private int first = 0;
private int second = 0;

public void set(int left,int right){
this.first = left;
this.second = right;
}

public int getFirst(){
return first;
}

public int getSecond(){
return second;
}

@Override
public int hashCode() {
return first+"".hashCode()+second+"".hashCode();
}

@Override
public boolean equals(Object right) {
if(right instanceof IntPair){
IntPair r = (IntPair)right;
return r.first == first && r.second == second;
}else{
return false;
}
}

@Override
public void readFields(DataInput dataInput) throws IOException {
first = dataInput.readInt();
second = dataInput.readInt();
}

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(first);
dataOutput.writeInt(second);
}

/**
* 自定义Key排序时的方法
* */
@Override
public int compareTo(IntPair o) {
if(first !=o.first){
return first-o.first;
}else if(second != o.second){
return second - o.second;
}else{
return 0;
}
}
}

接着新建一个SecondarySortApp类,该类为项目的启动类,里面的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
public class SecondarySortApp {
public static class MyMapper extends Mapper<LongWritable, Text,IntPair, IntWritable>{
private final IntPair key = new IntPair();
private final IntWritable value = new IntWritable();

@Override
protected void map(LongWritable inKey, Text inValue, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(inValue.toString());
int left = 0;
int right = 0;
if(itr.hasMoreTokens()){
left = Integer.parseInt(itr.nextToken());
if(itr.hasMoreTokens()){
right = Integer.parseInt(itr.nextToken());
}
key.set(left,right);
value.set(right);
context.write(key,value);
}
}

/**
* 在分组的时候,只比较原来的key,而不是组合key
* */
public static class GroupComparator implements RawComparator<IntPair>{
@Override
public int compare(byte[] b1, int s1, int i1, byte[] b2, int s2, int i2) {
return WritableComparator.compareBytes(b1,s1,Integer.SIZE/8,b2,s2,Integer.SIZE/8);
}

@Override
public int compare(IntPair o1, IntPair o2) {
return o1.getFirst() - o2.getFirst();
}
}
}

public static class MyReducer extends Reducer<IntPair,IntWritable,Text,IntWritable>{
private static final Text SEPARATOR = new Text("----------");
private final Text first = new Text();

@Override
protected void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
context.write(SEPARATOR,null);
first.set(Integer.toString(key.getFirst()));
for(IntWritable value:values){
context.write(first,value);
}
}
}


public static void main(String[] args) throws URISyntaxException, IOException, ClassNotFoundException, InterruptedException {
String INPUT_PATH = "hdfs://master:9000/inputsecondsort";
String OUTPUT_PATH = "hdfs://master:9000/outputsecondsort";

Configuration configuration= new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
if(fileSystem.exists(new Path(OUTPUT_PATH))){
fileSystem.delete(new Path(OUTPUT_PATH),true);
}

Job job = Job.getInstance(configuration,"SecondarySortApp");

//运行Jar类
job.setJarByClass(SecondarySortApp.class);

//设置自定义Mapper类和map函数输出数据的Key和Value的类型
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(IntWritable.class);

//分组函数
job.setGroupingComparatorClass(MyMapper.GroupComparator.class);

//Shuffle把数据从Map端拷贝到Reduce端
//指定Reducer类和输出Key、Value的类型
job.setReducerClass(MyReducer.class);
//设置输出结果的key/value的类型,也就是最终存储在HDFS上结果文件的key/value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);


//设置输入/输出路径
FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

//设置输入/输出格式
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);


//提交job
//如果job运行成功,那么程序就会正常退出
System.exit(job.waitForCompletion(true)?0:1);
}
}

运行作业

接下来将提交该作业至集群中运行,操作过程如下所示:
(1)在本地Windows机器上使用mvn clean package -DskipTests命令或者直接在IDEA中使用maven插件进行清理打包,之后就可以生成envy-mapreduce-1.0-SNAPSHOT.jar包,为了和之前的代码进行区分,这里将其重命名为envy-mapreduce-secondarysort-1.0-SNAPSHOT.jar
(2)将(1)中的jar包上传至虚拟机/home/hadoop/lib目录下;
(3)在虚拟机/home/hadoop目录中新建一个secondarysort.txt的测试文件,其中的内容如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
50  60
30 20
40 20
30 40

50 50
40 10
50 20
40 30

50 10
30 30
40 5
30 10

之后在HDFS根目录下新建inputsecondsort目录(这个目录就必须与你设置的INPUT_PATH保持一致,前面的hdfs://master:9000表示主机),之后将这两个文件上传至该目录:

1
2
[root@master sbin]# hadoop fs -mkdir /inputsecondsort
[root@master sbin]# hadoop fs -put /home/hadoop/secondarysort.txt /inputsecondsort

之后确认文件上传HDFS中:

1
2
[root@master sbin]# hadoop fs -ls -R /inputsecondsort
-rw-r--r-- 1 root supergroup 73 2020-06-16 15:57 /inputsecondsort/secondarysort.txt

(4)提交MapReduce作业到集群中运行:

1
[root@master sbin]# hadoop jar /home/hadoop/lib/envy-mapreduce-secondarysort-1.0-SNAPSHOT.jar com.envy.envymapreduce.secondsort.SecondarySortApp

执行流程如下所示:

(5)查看作业输出结果,如下所示:

1
2
3
4
[root@master hadoop]# hadoop fs -ls /outputsecondsort
Found 2 items
-rw-r--r-- 1 root supergroup 0 2020-06-16 16:03 /outputsecondsort/_SUCCESS
-rw-r--r-- 1 root supergroup 104 2020-06-16 16:03 /outputsecondsort/part-r-00000

之后查看一下各个文件的内容,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[root@master hadoop]# hadoop fs -text /outputsecondsort/*
----------
30 10
30 20
30 30
30 40
----------
40 5
40 10
40 20
40 30
----------
50 10
50 20
50 50
50 60

使用MapReduce合并小文件

概述

前面说过Hadoop处理单个大文件比处理多个小文件更有效率,此外单个文件也非常占用HDFS的存储空间,因此将小文件合并起来进行处理是较为科学的方式。

需求

要求通过使用MapReduce API来对小文件进行合并,并输出为SequenceFile。

合并小文件的代码实现

新建一个merge包,并在里面新建一个WholeFileRecordReader类,这是开发者自定义的RecordReader类,注意它需要继承RecordReader,并重写其中的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {

private FileSplit fileSplit;
private Configuration configuration;
private BytesWritable value = new BytesWritable();
private boolean processed = false;

@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
this.fileSplit = (FileSplit) inputSplit;
this.configuration = taskAttemptContext.getConfiguration();
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if(!processed){
byte[] contents = new byte[(int)fileSplit.getLength()];
Path path = fileSplit.getPath();
FileSystem fileSystem = path.getFileSystem(configuration);
FSDataInputStream inputStream = null;
try{
inputStream = fileSystem.open(path);
IOUtils.readFully(inputStream,contents,0,contents.length);
value.set(contents,0,contents.length);
}finally {
IOUtils.closeStream(inputStream);
}
processed = true;
return true;
}
return false;
}

@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}

@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
return processed ? 1.0f: 0.0f;
}

@Override
public void close() throws IOException {
//do nothing
}
}

接着新建一个WholeFileInputFormat类,该类用于将整个文件作为一条记录处理的InputFormat类,其中的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
//设置每个小文件不可分片,保证一个小文件生成一个key-value
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}

@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
WholeFileRecordReader wholeFileRecordReader = new WholeFileRecordReader();
wholeFileRecordReader.initialize(inputSplit,taskAttemptContext);
return wholeFileRecordReader;
}
}

最后新建一个启动类WholeMergeApp,里面的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class WholeMergeApp {
/**
* 将小文件打包成SequenceFile
*/
static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text,BytesWritable>{
private Text filenameKey;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
InputSplit split = context.getInputSplit();
Path path =( (FileSplit)split).getPath();
filenameKey = new Text(path.toString());
}

@Override
protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(filenameKey,value);
}
}


public static void main(String[] args) throws URISyntaxException, IOException, ClassNotFoundException, InterruptedException {
String INPUT_PATH = "hdfs://master:9000/inputmerge";
String OUTPUT_PATH = "hdfs://master:9000/outputmerge";

Configuration configuration= new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
if(fileSystem.exists(new Path(OUTPUT_PATH))){
fileSystem.delete(new Path(OUTPUT_PATH),true);
}

Job job = Job.getInstance(configuration,"WholeMergeApp");

//运行Jar类
job.setJarByClass(WholeMergeApp.class);

//设置输入/输出格式
job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);


//设置输出结果的key/value的类型,也就是最终存储在HDFS上结果文件的key/value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
job.setMapperClass(SequenceFileMapper.class);

//设置输入/输出目录
FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

//提交job
//如果job运行成功,那么程序就会正常退出
System.exit(job.waitForCompletion(true)?0:1);
}
}

运行作业

接下来将提交该作业至集群中运行,操作过程如下所示:
(1)在本地Windows机器上使用mvn clean package -DskipTests命令或者直接在IDEA中使用maven插件进行清理打包,之后就可以生成envy-mapreduce-1.0-SNAPSHOT.jar包,为了和之前的代码进行区分,这里将其重命名为envy-mapreduce-merge-1.0-SNAPSHOT.jar
(2)将(1)中的jar包上传至虚拟机/home/hadoop/lib目录下;
(3)在虚拟机/home/hadoop目录中新建一个merge.txt的测试文件,其中的内容如下所示:

1
2
3
4
50  60
30 20
40 20
30 40

之后在HDFS根目录下新建inputmerge目录(这个目录就必须与你设置的INPUT_PATH保持一致,前面的hdfs://master:9000表示主机),之后将这两个文件上传至该目录:

1
2
[root@master sbin]# hadoop fs -mkdir /inputmerge
[root@master sbin]# hadoop fs -put /home/hadoop/merge.txt /inputmerge

之后确认文件上传HDFS中:

1
2
[root@master sbin]# hadoop fs -ls -R /inputmerge
-rw-r--r-- 1 root supergroup 28 2020-06-16 22:19 /inputmerge/merge.txt

(4)提交MapReduce作业到集群中运行:

1
[root@master sbin]# hadoop jar /home/hadoop/lib/envy-mapreduce-merge-1.0-SNAPSHOT.jar com.envy.envymapreduce.merge.WholeMergeApp

执行流程如下所示:

(5)查看作业输出结果,如下所示:

1
2
3
4
[root@master hadoop]# hadoop fs -ls /outputmerge
Found 2 items
-rw-r--r-- 1 root supergroup 0 2020-06-16 22:21 /outputmerge/_SUCCESS
-rw-r--r-- 1 root supergroup 167 2020-06-16 22:21 /outputmerge/part-r-00000

之后查看一下各个文件的内容,如下所示:

1
2
[root@master hadoop]# hadoop fs -text /outputmerge/*
hdfs://master:9000/inputmerge/merge.txt 35 30 20 20 36 30 0a 33 30 20 20 32 30 0a 34 30 20 20 32 30 0a 33 30 20 20 34 30 0a

这样本篇关于MapReduce的高级应用相关内容就到此为止,后续学习其他内容。