写在前面 前面学习的都是MapReduce较为基本的操作,接下来学习一些MapReduce的高级应用,主要包括使用MapReduce完成join操作、排序操作、二次排序操作和小文件合并操作,这些应用在实际工作中非常有帮助。
MapReduce实现join操作 join操作概述 如果你之前熟悉数据库,那么肯定知道使用SQL语法实现join操作是非常简单的,但是在大数据场景下使用MapReduce编程模型来实现类似的join操作其实是比较困难的。通常在实际工作中,我们都是借助于Hive、Spark SQL等框架来实现join,既然这里是学习,那么对于MapReduce实现join操作的原理还是有必要学习一下,它对于理解join操作的底层是有非常大的帮助。接下来开始学习,如何使用MapReduce API来实现join操作。
需求分析 假设现在有如下两个txt文件,里面的内容如下所示:
1 2 3 4 5 6 7 8 9 10 11 //emp.txt数据 519,smith,clerk,8920,1990-10-01,880.00,20 536,alien,salesman,8945,1992-09-11,1600.00,30 558,ward,salesman,8945,1992-09-22,1380.00,30 596,jones,manager,8968,1992-12-19,3682.00,20 //dept.txt数据 10,accounting,new york 20,research,dallas 30,sales,chicago 40,operations,boston
其中emp.txt中各列的含义为empno、ename、ejob、enumber、ebirthday、emoney、deptno。而dept.txt中各列的含义为deptno、dname和city。
将下来有如下一条SQL语句:
1 select e.empno,e.ename,d.deptno,d.dname from emp e join dept d on e.deptno=d.deptno;
可以看到这个SQL语句中只使用了empno、ename、deptno、dname 等4个属性,因此可以定义一个Employee,然后再加一个flag属性,当它值为1标识dept,为0则是emp。 我们希望能使用MapReduce来实现上述SQL的功能。
Map端join的实现原理 接下来学习MapReduce Map端的join的实现原理,如下所示: (1)Map端读取所有的文件,并在输出的内容中加上标识,用于表示数据是从哪个文件里来的。 (2)在Reduce处理函数中,按照(1)中的标识来对数据进行处理; (3)之后根据Key,使用join来求出结果并进行输出。
Map端join的代码实现 (1)定义一个员工类。新建一个reducejoin包,并在该包内新建一个Employee
类,其中的代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 public class Employee implements WritableComparable { private String empNo = ""; private String empName = ""; private String deptNo = ""; private String deptName = ""; //用于区分是员工还是部门 private int flag = 0; public Employee() { } public Employee(String empNo, String empName, String deptNo, String deptName, int flag) { this.empNo = empNo; this.empName = empName; this.deptNo = deptNo; this.deptName = deptName; this.flag = flag; } /** * 这个方法后续使用非常方便 * */ public Employee(Employee employee){ this.empNo = employee.empNo; this.empName = employee.empName; this.deptNo = employee.deptNo; this.deptName = employee.deptName; this.flag = employee.flag; } public String getEmpNo() { return empNo; } public void setEmpNo(String empNo) { this.empNo = empNo; } public String getEmpName() { return empName; } public void setEmpName(String empName) { this.empName = empName; } public String getDeptNo() { return deptNo; } public void setDeptNo(String deptNo) { this.deptNo = deptNo; } public String getDeptName() { return deptName; } public void setDeptName(String deptName) { this.deptName = deptName; } public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } @Override public String toString() { return "Employee{" + "empNo='" + empNo + '\'' + ", empName='" + empName + '\'' + ", deptNo='" + deptNo + '\'' + ", deptName='" + deptName + '\'' + ", flag=" + flag + '}'; } @Override public void readFields(DataInput dataInput) throws IOException { this.empNo = dataInput.readUTF(); this.empName = dataInput.readUTF(); this.deptNo = dataInput.readUTF(); this.deptName = dataInput.readUTF(); this.flag = dataInput.readInt(); } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this.empNo); dataOutput.writeUTF(this.empName); dataOutput.writeUTF(this.deptNo); dataOutput.writeUTF(this.deptName); dataOutput.writeInt(this.flag); } //不进行排序 @Override public int compareTo(Object o) { return 0; } }
(2)自定义一个Mapper类。在reducejoin包内新建一个MyMapper
类,其中的代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class MyMapper extends Mapper<LongWritable, Text,LongWritable,Employee> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] valArray = value.toString().split(","); System.out.println("valArray.length="+valArray.length + " valArray[0]="+valArray[0]); if(valArray.length<=3){ //根据每行依据逗号分隔后形成的数组的长度来判断是emp还是dept //小于或者等于3为dept Employee dept = new Employee(); dept.setDeptNo(valArray[0]); dept.setDeptName(valArray[1]); //dept的flag的值为1 dept.setFlag(1); context.write(new LongWritable(Long.parseLong(valArray[0])),dept); }else{ //反正则说明为emp Employee employee = new Employee(); employee.setEmpNo(valArray[0]); employee.setEmpName(valArray[1]); employee.setDeptNo(valArray[6]); //emp的flag的值为0 //emp的部门deptName需要通过deptNo从dept中获取 employee.setFlag(0); context.write(new LongWritable(Long.parseLong(valArray[0])),employee); } } }
(3)自定义一个Reducer类。在reducejoin包内新建一个MyReducer
类,其中的代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class MyReducer extends Reducer<LongWritable,Employee, NullWritable, Text> { @Override protected void reduce(LongWritable key, Iterable<Employee> values, Context context) throws IOException, InterruptedException { //员工与部门之间存在多对一的关系 Employee dept = null; List<Employee> employeeList = new ArrayList<>(); for(Employee value:values){ if(value.getFlag()==0){ //emp的flag的值为0,故此为emp Employee employee = new Employee(value); employeeList.add(employee); }else{ //dept的flag的值为1,故此为dept dept = new Employee(value); } } if(dept!=null){ for(Employee employee:employeeList){ employee.setDeptName(dept.getDeptName()); context.write(NullWritable.get(),new Text(employee.toString())); } } } }
(4)自定义一个驱动类。在reducejoin包内新建一个MyReduceJoinApp
类,其中的代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class MyReduceJoinApp { public static void main(String[] args) throws URISyntaxException, IOException, ClassNotFoundException, InterruptedException { String INPUT_PATH = "hdfs://master:9000/inputjoin"; String OUTPUT_PATH = "hdfs://master:9000/outputjoin"; Configuration configuration= new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration); if(fileSystem.exists(new Path(OUTPUT_PATH))){ fileSystem.delete(new Path(OUTPUT_PATH),true); } Job job = Job.getInstance(configuration,"MyReduceJoinApp"); //运行Jar类 job.setJarByClass(MyReduceJoinApp.class); //设置自定义Mapper类和map函数输出数据的Key和Value的类型 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Employee.class); //Shuffle把数据从Map端拷贝到Reduce端 //指定Reducer类和输出Key、Value的类型 job.setReducerClass(MyReducer.class); //设置输出结果的key/value的类型,也就是最终存储在HDFS上结果文件的key/value的类型 job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Employee.class); //设置输入/输出格式 FileInputFormat.addInputPath(job,new Path(INPUT_PATH)); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); //提交job //如果job运行成功,那么程序就会正常退出 System.exit(job.waitForCompletion(true)?0:1); } }
运行作业 接下来将提交该作业至集群中运行,操作过程如下所示: (1)在本地Windows机器上使用mvn clean package -DskipTests
命令或者直接在IDEA中使用maven插件进行清理打包,之后就可以生成envy-mapreduce-1.0-SNAPSHOT.jar
包,为了和之前的代码进行区分,这里将其重命名为envy-mapreduce-reducejoin-1.0-SNAPSHOT.jar
; (2)将(1)中的jar包上传至虚拟机/home/hadoop/lib
目录下; (3)在虚拟机/home/hadoop
目录中新建一个emp.txt的测试文件,其中的内容如下所示:
1 2 3 4 519,smith,clerk,8920,1990-10-01,880.00,20 536,alien,salesman,8945,1992-09-11,1600.00,30 558,ward,salesman,8945,1992-09-22,1380.00,30 596,jones,manager,8968,1992-12-19,3682.00,20
接着再新建另一个dept.txt的测试文件,其中的内容如下所示:
1 2 3 4 10,accounting,new york 20,research,dallas 30,sales,chicago 40,operations,boston
之后在HDFS根目录下新建inputjoin
目录(这个目录就必须与你设置的INPUT_PATH
保持一致,前面的hdfs://master:9000
表示主机),之后将这两个文件上传至该目录:
1 2 3 [root@master sbin]# hadoop fs -mkdir /inputjoin [root@master sbin]# hadoop fs -put /home/hadoop/emp.txt /inputjoin [root@master sbin]# hadoop fs -put /home/hadoop/dept.txt /inputjoin
之后确认文件上传HDFS中:
1 2 3 [root@master sbin]# hadoop fs -ls -R /inputjoin -rw-r--r-- 1 root supergroup 80 2020-06-13 21:58 /inputjoin/dept.txt -rw-r--r-- 1 root supergroup 178 2020-06-13 21:56 /inputjoin/emp.txt
(4)提交MapReduce作业到集群中运行:
1 [root@master sbin]# hadoop jar /home/hadoop/lib/envy-mapreduce-reducejoin-1.0-SNAPSHOT.jar com.envy.envymapreduce.reducejoin.MyReduceJoinApp
执行流程如下所示:
(5)查看作业输出结果,如下所示:
1 2 3 4 [root@master lib]# hadoop fs -ls /outputjoin Found 5 items -rw-r--r-- 1 root supergroup 0 2020-06-13 22:00 /outputjoin/_SUCCESS -rw-r--r-- 1 root supergroup 0 2020-06-13 22:00 /outputjoin/part-r-00000
之后查看一下各个文件的内容,如下所示:
1 2 3 4 5 [root@master lib]# hadoop fs -text /outputjoin/* Employee{empNo='519', empName='smith', deptNo='20', deptName='research', flag=0} Employee{empNo='596', empName='jones', deptNo='20', deptName='research0', flag=0} Employee{empNo='536', empName='alien', deptNo='30', deptName='sales', flag=0} Employee{empNo='558', empName='ward', deptNo='30', deptName='sales', flag=0}
MapReduce实现排序操作 需求分析 现在要求对输入文件中的数据进行排序,其中输入文件中的每行内容均为一个数字,即一个数据。要求在每行输出两个间隔的数字,其中第一个数字代表原始数据在原始数据集中的排位,第二个数字则代表原始数据。
MapReduce排序的实现原理 MapReduce默认支持排序,如果Key为封装int的IntWritable类型,那么MapReduce将会按照数字大小来对Key进行排序。如果Key为封装String的Text类型,那么MapReduce将会按照字典顺序来对字符串进行排序。
因此开发者可以使用MapReduce中内置的排序功能来实现上述需求,但在此之前需要了解默认排序的规则,即按照Key值进行排序。
那么上例,我们就应该使用封装int的IntWritable类型,也就是在map中将读入的数据转化成IntWritable类型,之后将其作为Key值(此时的Value值随意);reduce在拿到<Key,Value-list>
之后,将输入的Key作为Value进行输出,并根据Value-list
中元素的个数来决定输出的次数。也就是说输出的Key其实是一个全局变量,它用于统计Key的当前位次信息。
MapReduce排序的代码实现 新建一个sort包,并在里面新建一个SortApp
类,其中的代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public class SortApp { /** * 接口中泛型KEYIN, VALUEIN, KEYOUT, VALUEOUT * */ public static class MyMapper extends Mapper<LongWritable, Text, IntWritable,IntWritable>{ private static IntWritable data = new IntWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); data.set(Integer.parseInt(line)); context.write(data,new IntWritable(1)); } } /** * 接口中泛型KEYIN, VALUEIN, KEYOUT, VALUEOUT * */ public static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable,IntWritable> { private static IntWritable data = new IntWritable(1); @Override protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //此处的key其实就是mapper中的data,即原始数据 for(IntWritable value:values){ context.write(data,key); data = new IntWritable(data.get()+1); } } } public static void main(String[] args) throws URISyntaxException, IOException, ClassNotFoundException, InterruptedException { String INPUT_PATH = "hdfs://master:9000/inputsort"; String OUTPUT_PATH = "hdfs://master:9000/outputsort"; Configuration configuration= new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration); if(fileSystem.exists(new Path(OUTPUT_PATH))){ fileSystem.delete(new Path(OUTPUT_PATH),true); } Job job = Job.getInstance(configuration,"SortApp"); //运行Jar类 job.setJarByClass(SortApp.class); //设置自定义Mapper类和map函数输出数据的Key和Value的类型 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); //Shuffle把数据从Map端拷贝到Reduce端 //指定Reducer类和输出Key、Value的类型 job.setReducerClass(MyReducer.class); //设置输出结果的key/value的类型,也就是最终存储在HDFS上结果文件的key/value的类型 job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); //设置输入/输出格式 FileInputFormat.addInputPath(job,new Path(INPUT_PATH)); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); //提交job //如果job运行成功,那么程序就会正常退出 System.exit(job.waitForCompletion(true)?0:1); } }
运行作业 接下来将提交该作业至集群中运行,操作过程如下所示: (1)在本地Windows机器上使用mvn clean package -DskipTests
命令或者直接在IDEA中使用maven插件进行清理打包,之后就可以生成envy-mapreduce-1.0-SNAPSHOT.jar
包,为了和之前的代码进行区分,这里将其重命名为envy-mapreduce-sort-1.0-SNAPSHOT.jar
; (2)将(1)中的jar包上传至虚拟机/home/hadoop/lib
目录下; (3)在虚拟机/home/hadoop
目录中新建一个sort.txt的测试文件,其中的内容如下所示:
之后在HDFS根目录下新建inputsort
目录(这个目录就必须与你设置的INPUT_PATH
保持一致,前面的hdfs://master:9000
表示主机),之后将这个文件上传至该目录:
1 2 [root@master sbin]# hadoop fs -mkdir /inputsort [root@master sbin]# hadoop fs -put /home/hadoop/sort.txt /inputsort
之后确认文件上传HDFS中:
1 2 [root@master sbin]# hadoop fs -ls -R /inputsort -rw-r--r-- 1 root supergroup 80 2020-06-13 21:58 /inputsort/sort.txt
(4)提交MapReduce作业到集群中运行:
1 [root@master sbin]# hadoop jar /home/hadoop/lib/envy-mapreduce-sort-1.0-SNAPSHOT.jar com.envy.envymapreduce.sort.SortApp
执行流程如下所示:
(5)查看作业输出结果,如下所示:
1 2 3 4 [root@master lib]# hadoop fs -ls /outputsort Found 2 items -rw-r--r-- 1 root supergroup 0 2020-06-16 16:19 /outputsort/_SUCCESS -rw-r--r-- 1 root supergroup 36 2020-06-16 16:19 /outputsort/part-r-00000
之后查看一下各个文件的内容,如下所示:
1 2 3 4 5 6 7 8 9 10 [root@master hadoop]# hadoop fs -text /outputsort/* 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
MapReduce实现二次排序操作 二次排序概述 默认情况下,Map输出的结果会对Key进行默认的排序,但是有时候在对Key进行排序的同时,还需要对Value进行排序,这就是常说的二次排序。
需求分析 现在要求对输入文件中的数据进行二次排序,如下所示的每行两列,列与列之间采用逗号进行分割,要求输出结果先按照第一列的值进行升序排序,如果第一列的值相等,那么就按照第二列的值进行升序排序。下面就是经过二次排序后的内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 30 10 30 20 30 30 30 40 40 5 40 10 40 20 40 30 50 10 50 20 50 50 50 60
MapReduce二次排序的实现原理 (1)Mapper任务会接收输入分片,然后不断调用map函数,进而对记录进行处理,处理完成后,将其转换为新的<key,value>
输出。 (2)对map函数输出的<key,value>
调用分区函数,将数据进行分区。不同分区的数据会被送到不同的Reducer任务中。 (3)对于不同分区的数据,它会按照Key进行排序,因此此处的Key必须实现WritableComparable
接口,该接口继承了Comparable
,因此可以进行比较排序。 (4)对于排序后的<key,value>
,它会按照Key进行分组。如果Key相同,那么相同Key的<key,value>
就被分到一个组中,这样使得最后每个分组都会调用一次reduce函数。 (5)排序、分组后的数据就会被送到Reducer节点中。
MapReduce二次排序的代码实现 新建一个secondsort包,并在里面新建一个IntPair
类,该类就是定义每次(行读取)的两个整数。其中的代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public class IntPair implements WritableComparable<IntPair> { private int first = 0; private int second = 0; public void set(int left,int right){ this.first = left; this.second = right; } public int getFirst(){ return first; } public int getSecond(){ return second; } @Override public int hashCode() { return first+"".hashCode()+second+"".hashCode(); } @Override public boolean equals(Object right) { if(right instanceof IntPair){ IntPair r = (IntPair)right; return r.first == first && r.second == second; }else{ return false; } } @Override public void readFields(DataInput dataInput) throws IOException { first = dataInput.readInt(); second = dataInput.readInt(); } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(first); dataOutput.writeInt(second); } /** * 自定义Key排序时的方法 * */ @Override public int compareTo(IntPair o) { if(first !=o.first){ return first-o.first; }else if(second != o.second){ return second - o.second; }else{ return 0; } } }
接着新建一个SecondarySortApp
类,该类为项目的启动类,里面的代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 public class SecondarySortApp { public static class MyMapper extends Mapper<LongWritable, Text,IntPair, IntWritable>{ private final IntPair key = new IntPair(); private final IntWritable value = new IntWritable(); @Override protected void map(LongWritable inKey, Text inValue, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(inValue.toString()); int left = 0; int right = 0; if(itr.hasMoreTokens()){ left = Integer.parseInt(itr.nextToken()); if(itr.hasMoreTokens()){ right = Integer.parseInt(itr.nextToken()); } key.set(left,right); value.set(right); context.write(key,value); } } /** * 在分组的时候,只比较原来的key,而不是组合key * */ public static class GroupComparator implements RawComparator<IntPair>{ @Override public int compare(byte[] b1, int s1, int i1, byte[] b2, int s2, int i2) { return WritableComparator.compareBytes(b1,s1,Integer.SIZE/8,b2,s2,Integer.SIZE/8); } @Override public int compare(IntPair o1, IntPair o2) { return o1.getFirst() - o2.getFirst(); } } } public static class MyReducer extends Reducer<IntPair,IntWritable,Text,IntWritable>{ private static final Text SEPARATOR = new Text("----------"); private final Text first = new Text(); @Override protected void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { context.write(SEPARATOR,null); first.set(Integer.toString(key.getFirst())); for(IntWritable value:values){ context.write(first,value); } } } public static void main(String[] args) throws URISyntaxException, IOException, ClassNotFoundException, InterruptedException { String INPUT_PATH = "hdfs://master:9000/inputsecondsort"; String OUTPUT_PATH = "hdfs://master:9000/outputsecondsort"; Configuration configuration= new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration); if(fileSystem.exists(new Path(OUTPUT_PATH))){ fileSystem.delete(new Path(OUTPUT_PATH),true); } Job job = Job.getInstance(configuration,"SecondarySortApp"); //运行Jar类 job.setJarByClass(SecondarySortApp.class); //设置自定义Mapper类和map函数输出数据的Key和Value的类型 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(IntPair.class); job.setMapOutputValueClass(IntWritable.class); //分组函数 job.setGroupingComparatorClass(MyMapper.GroupComparator.class); //Shuffle把数据从Map端拷贝到Reduce端 //指定Reducer类和输出Key、Value的类型 job.setReducerClass(MyReducer.class); //设置输出结果的key/value的类型,也就是最终存储在HDFS上结果文件的key/value的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置输入/输出路径 FileInputFormat.addInputPath(job,new Path(INPUT_PATH)); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); //设置输入/输出格式 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //提交job //如果job运行成功,那么程序就会正常退出 System.exit(job.waitForCompletion(true)?0:1); } }
运行作业 接下来将提交该作业至集群中运行,操作过程如下所示: (1)在本地Windows机器上使用mvn clean package -DskipTests
命令或者直接在IDEA中使用maven插件进行清理打包,之后就可以生成envy-mapreduce-1.0-SNAPSHOT.jar
包,为了和之前的代码进行区分,这里将其重命名为envy-mapreduce-secondarysort-1.0-SNAPSHOT.jar
; (2)将(1)中的jar包上传至虚拟机/home/hadoop/lib
目录下; (3)在虚拟机/home/hadoop
目录中新建一个secondarysort.txt的测试文件,其中的内容如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 50 60 30 20 40 20 30 40 50 50 40 10 50 20 40 30 50 10 30 30 40 5 30 10
之后在HDFS根目录下新建inputsecondsort
目录(这个目录就必须与你设置的INPUT_PATH
保持一致,前面的hdfs://master:9000
表示主机),之后将这两个文件上传至该目录:
1 2 [root@master sbin]# hadoop fs -mkdir /inputsecondsort [root@master sbin]# hadoop fs -put /home/hadoop/secondarysort.txt /inputsecondsort
之后确认文件上传HDFS中:
1 2 [root@master sbin]# hadoop fs -ls -R /inputsecondsort -rw-r--r-- 1 root supergroup 73 2020-06-16 15:57 /inputsecondsort/secondarysort.txt
(4)提交MapReduce作业到集群中运行:
1 [root@master sbin]# hadoop jar /home/hadoop/lib/envy-mapreduce-secondarysort-1.0-SNAPSHOT.jar com.envy.envymapreduce.secondsort.SecondarySortApp
执行流程如下所示:
(5)查看作业输出结果,如下所示:
1 2 3 4 [root@master hadoop]# hadoop fs -ls /outputsecondsort Found 2 items -rw-r--r-- 1 root supergroup 0 2020-06-16 16:03 /outputsecondsort/_SUCCESS -rw-r--r-- 1 root supergroup 104 2020-06-16 16:03 /outputsecondsort/part-r-00000
之后查看一下各个文件的内容,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 [root@master hadoop]# hadoop fs -text /outputsecondsort/* ---------- 30 10 30 20 30 30 30 40 ---------- 40 5 40 10 40 20 40 30 ---------- 50 10 50 20 50 50 50 60
使用MapReduce合并小文件 概述 前面说过Hadoop处理单个大文件比处理多个小文件更有效率,此外单个文件也非常占用HDFS的存储空间,因此将小文件合并起来进行处理是较为科学的方式。
需求 要求通过使用MapReduce API来对小文件进行合并,并输出为SequenceFile。
合并小文件的代码实现 新建一个merge包,并在里面新建一个WholeFileRecordReader
类,这是开发者自定义的RecordReader
类,注意它需要继承RecordReader,并重写其中的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> { private FileSplit fileSplit; private Configuration configuration; private BytesWritable value = new BytesWritable(); private boolean processed = false; @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { this.fileSplit = (FileSplit) inputSplit; this.configuration = taskAttemptContext.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if(!processed){ byte[] contents = new byte[(int)fileSplit.getLength()]; Path path = fileSplit.getPath(); FileSystem fileSystem = path.getFileSystem(configuration); FSDataInputStream inputStream = null; try{ inputStream = fileSystem.open(path); IOUtils.readFully(inputStream,contents,0,contents.length); value.set(contents,0,contents.length); }finally { IOUtils.closeStream(inputStream); } processed = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return processed ? 1.0f: 0.0f; } @Override public void close() throws IOException { //do nothing } }
接着新建一个WholeFileInputFormat
类,该类用于将整个文件作为一条记录处理的InputFormat
类,其中的代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> { //设置每个小文件不可分片,保证一个小文件生成一个key-value @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } @Override public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { WholeFileRecordReader wholeFileRecordReader = new WholeFileRecordReader(); wholeFileRecordReader.initialize(inputSplit,taskAttemptContext); return wholeFileRecordReader; } }
最后新建一个启动类WholeMergeApp
,里面的代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public class WholeMergeApp { /** * 将小文件打包成SequenceFile */ static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text,BytesWritable>{ private Text filenameKey; @Override protected void setup(Context context) throws IOException, InterruptedException { InputSplit split = context.getInputSplit(); Path path =( (FileSplit)split).getPath(); filenameKey = new Text(path.toString()); } @Override protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(filenameKey,value); } } public static void main(String[] args) throws URISyntaxException, IOException, ClassNotFoundException, InterruptedException { String INPUT_PATH = "hdfs://master:9000/inputmerge"; String OUTPUT_PATH = "hdfs://master:9000/outputmerge"; Configuration configuration= new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration); if(fileSystem.exists(new Path(OUTPUT_PATH))){ fileSystem.delete(new Path(OUTPUT_PATH),true); } Job job = Job.getInstance(configuration,"WholeMergeApp"); //运行Jar类 job.setJarByClass(WholeMergeApp.class); //设置输入/输出格式 job.setInputFormatClass(WholeFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); //设置输出结果的key/value的类型,也就是最终存储在HDFS上结果文件的key/value的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); job.setMapperClass(SequenceFileMapper.class); //设置输入/输出目录 FileInputFormat.addInputPath(job,new Path(INPUT_PATH)); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); //提交job //如果job运行成功,那么程序就会正常退出 System.exit(job.waitForCompletion(true)?0:1); } }
运行作业 接下来将提交该作业至集群中运行,操作过程如下所示: (1)在本地Windows机器上使用mvn clean package -DskipTests
命令或者直接在IDEA中使用maven插件进行清理打包,之后就可以生成envy-mapreduce-1.0-SNAPSHOT.jar
包,为了和之前的代码进行区分,这里将其重命名为envy-mapreduce-merge-1.0-SNAPSHOT.jar
; (2)将(1)中的jar包上传至虚拟机/home/hadoop/lib
目录下; (3)在虚拟机/home/hadoop
目录中新建一个merge.txt的测试文件,其中的内容如下所示:
之后在HDFS根目录下新建inputmerge
目录(这个目录就必须与你设置的INPUT_PATH
保持一致,前面的hdfs://master:9000
表示主机),之后将这两个文件上传至该目录:
1 2 [root@master sbin]# hadoop fs -mkdir /inputmerge [root@master sbin]# hadoop fs -put /home/hadoop/merge.txt /inputmerge
之后确认文件上传HDFS中:
1 2 [root@master sbin]# hadoop fs -ls -R /inputmerge -rw-r--r-- 1 root supergroup 28 2020-06-16 22:19 /inputmerge/merge.txt
(4)提交MapReduce作业到集群中运行:
1 [root@master sbin]# hadoop jar /home/hadoop/lib/envy-mapreduce-merge-1.0-SNAPSHOT.jar com.envy.envymapreduce.merge.WholeMergeApp
执行流程如下所示:
(5)查看作业输出结果,如下所示:
1 2 3 4 [root@master hadoop]# hadoop fs -ls /outputmerge Found 2 items -rw-r--r-- 1 root supergroup 0 2020-06-16 22:21 /outputmerge/_SUCCESS -rw-r--r-- 1 root supergroup 167 2020-06-16 22:21 /outputmerge/part-r-00000
之后查看一下各个文件的内容,如下所示:
1 2 [root@master hadoop]# hadoop fs -text /outputmerge/* hdfs://master:9000/inputmerge/merge.txt 35 30 20 20 36 30 0a 33 30 20 20 32 30 0a 34 30 20 20 32 30 0a 33 30 20 20 34 30 0a
这样本篇关于MapReduce的高级应用相关内容就到此为止,后续学习其他内容。