写在前面 MapReduce基础内容除了前面介绍的外,还有两个比较重要的内容:Partitioner操作和自定义RecordReader。
Partitioner操作 概述 在进行MapReduce计算时,有时候可能需要将最终的输出数据分到不同的文件中,这种情况是非常普遍的。
举个例子,现在需要将销售额按照省份来进行划分,那么就需要将同一省份的数据在一个文件中。
我们知道最终的输出数据都来自Reducer任务,因此想得到多个文件,这就意味着需要有相同数量的Reducer任务在运行。而Reducer任务的数量又来自于Mapper任务,也就是说任务的数量是由Mapper任务决定的。Mapper任务需要划分数据,将不同的数据分配给不同的Reducer任务,之后由它来生成对应的数据文件。我们称Mapper任务划分数据的过程为Partition,称负责划分数据的类为Partitioner。
请注意,MapReduce默认的Partitioner类为HashPartitioner。一般来说,Partitioner会先计算Key的哈希值(通常为md5值),然后通过Reducer个数来执行取模运算,即key.hashCode%(reducer个数)
。这种方式不仅可以随机的将整个Key空间平均分配给每一个Reducer,同时也可以确保不同Mapper产生的相同Key能分配到同一个Reducer。
使用 接下来通过一个例子来学习如何使用Partitioner类。需求是统计每种类型手机的销售情况,要求每种类型手机的统计数据单独存放在一个结果中。
在app包内新建一个MobilePartitionerApp
类,其中的代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 public class MobilePartitionerApp { public static class MyMapper extends Mapper<LongWritable, Text,Text, IntWritable> { /** * 重写map函数,其中的context参数是上下文,用于记录key和value的值 * */ @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] s = value.toString().split("\t"); context.write(new Text(s[0]),new IntWritable(Integer.parseInt(s[1]))); } } public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> { /** * 重写reduce函数,其中的context参数是上下文,用于记录key和sum的值 * */ @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable value:values){ sum+=value.get(); } context.write(key,new IntWritable(sum)); } } public static class MyPartitioner extends Partitioner<Text,IntWritable> { /** * 重写getPartition函数,将任务转发给4个不同的Reducer * */ @Override public int getPartition(Text key, IntWritable value, int numPartitions) { if("xiaomi".equals(key.toString())){ return 0; }else if("huawei".equals(key.toString())){ return 1; }else if("iphone12".equals(key.toString())){ return 2; }else { return 3; } } } public static void main(String[] args) throws Exception { String INPUT_PATH = "hdfs://master:9000/inpartitioner"; String OUTPUT_PATH = "hdfs://master:9000/outpartitioner"; Configuration configuration= new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration); if(fileSystem.exists(new Path(OUTPUT_PATH))){ fileSystem.delete(new Path(OUTPUT_PATH),true); } Job job = Job.getInstance(configuration,"MobilePartitionerApp"); //运行Jar类 job.setJarByClass(MobilePartitionerApp.class); //设置map job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置reduce job.setReducerClass(MyReducer.class); //设置输出结果的key/value的类型,也就是最终存储在HDFS上结果文件的key/value的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置partitioner job.setPartitionerClass(MyPartitioner.class); //设置4个Reducer,每个分区一个 job.setNumReduceTasks(4); //设置输入格式 //设置输入文件路径 job.setInputFormatClass(TextInputFormat.class); Path inputPath = new Path(INPUT_PATH); FileInputFormat.addInputPath(job,inputPath); //设置输出格式 job.setOutputFormatClass(TextOutputFormat.class); Path outputPath = new Path(OUTPUT_PATH); //设置输出文件路径 FileOutputFormat.setOutputPath(job, outputPath); //提交job //如果job运行成功,那么程序就会正常退出 System.exit(job.waitForCompletion(true)?0:1); } }
运行作业 接下来将提交该作业至集群中运行,操作过程如下所示: (1)在本地Windows机器上使用mvn clean package -DskipTests
命令或者直接在IDEA中使用maven插件进行清理打包,之后就可以生成envy-mapreduce-1.0-SNAPSHOT.jar
包,为了和之前的代码进行区分,这里将其重命名为envy-mapreduce-partitioner-1.0-SNAPSHOT.jar
; (2)将(1)中的jar包上传至虚拟机/home/hadoop/lib
目录下; (3)在虚拟机/home/hadoop
目录中新建一个part_1.txt的测试文件,其中的内容如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 huawei,1 iphoneX,1 huawei,1 iphoneX,1 xiaomi,1 meizu,1 huawei,1 iphoneX,1 xiaomi,1 huawei,1 meizu,1 iphoneX,1 xiaomi,1 huawei,1 meizu,1
接着再新建另一个part_2.txt的测试文件,其中的内容如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 huawei,1 huawei,1 meizu,1 iphoneX,1 xiaomi,1 iphoneX,1 huawei,1 iphoneX,1 xiaomi,1 meizu,1 huawei,1 meizu,1 iphoneX,1 xiaomi,1 iphoneX,1 iphoneX,1 huawei,1 iphoneX,1 huawei,1 meizu,1
之后在HDFS根目录下新建inpartitioner
目录(这个目录就必须与你设置的INPUT_PATH
保持一致,前面的hdfs://master:9000
表示主机),之后将这两个文件上传至该目录:
1 2 [root@master sbin]# hadoop fs -mkdir /inpartitioner [root@master sbin]# hadoop fs -put /home/hadoop/part_1.txt part_2.txt /inpartitioner
之后确认文件上传HDFS中:
1 2 3 [root@master sbin]# hadoop fs -ls -R /inpartitioner -rw-r--r-- 1 root supergroup 353 2020-06-11 22:24 /inpartitioner/part_1.txt -rw-r--r-- 1 root supergroup 345 2020-06-11 22:24 /inpartitioner/part_2.txt
(4)提交MapReduce作业到集群中运行:
1 [root@master sbin]# hadoop jar /home/hadoop/lib/envy-mapreduce-partitioner-1.0-SNAPSHOT.jar com.envy.envymapreduce.app.MobilePartitionerApp
执行流程如下所示:
(5)查看作业输出结果,如下所示:
1 2 3 4 5 6 7 [root@master lib]# hadoop fs -ls /outpartitioner Found 5 items -rw-r--r-- 1 root supergroup 0 2020-06-11 23:11 /outpartitioner/_SUCCESS -rw-r--r-- 1 root supergroup 9 2020-06-11 23:11 /outpartitioner/part-r-00000 -rw-r--r-- 1 root supergroup 10 2020-06-11 23:11 /outpartitioner/part-r-00001 -rw-r--r-- 1 root supergroup 0 2020-06-11 23:11 /outpartitioner/part-r-00002 -rw-r--r-- 1 root supergroup 19 2020-06-11 23:11 /outpartitioner/part-r-00003
之后查看一下各个文件的内容,如下所示:
1 2 3 4 5 [root@master lib]# hadoop fs -text /outpartitioner/part-r-* xiaomi 6 huawei 11 iphoneX 11 meizu 7
自定义RecordReader 概述 RecordReader
表示从以怎样的方式从分片中读取一条记录,请注意每读取一条记录都会调用一次RecordReader
类。
系统默认的RecordReader
类是LineRecordReader
,同时它也是TextInputFormat
对应的RecordReader
。而SequenceFileInputFormat
对应的RecordReader
类是SequenceFileRecordReader
。也就是说LineRecordReader
以每行的偏移量作为读入Map的Key,每行的内容作为读入Map的Value。
但是在很多情况下,Hadoop内置的RecordReader
并不能满足需求。举个例子,开发者希望在读取记录的时候,Map读入的Key不是偏移量,而是行号或者文件名称,那么此时就需要自定义RecordReader
类了。
步骤 开发者自定义RecordReader
的步骤如下: (1)定义一个类,使其继承抽象类RecordReader
,即提供一个实现RecordReader
的实例; (2)实现自定义InputFormat
类,重写InputFormat
类中的CreateRecordReader()
方法,注意该方法返回值是自定义的RecordReader
实例; (3)配置job.setInputFormatClass()
方法为自定义的InputFormat
实例。
使用 接下来通过一个例子来学习如何自定义RecordReader
。需求是分别统计数据文件中奇数行和偶数行的和,测试文件已经给出。
新建一个recordreader包,接着在该包内新建一个MyPartitioner
类,注意这个类需要继承Partitioner
类,内容如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class MyPartitioner extends Partitioner<LongWritable, Text> { @Override public int getPartition(LongWritable key, Text value, int numPartitions) { //偶数放到第二个分区进行计算 if(key.get()%2==0){ //将输入到reduce中的key设置为1 key.set(1); return 1; }else{ //奇数放到第一个分区进行计算 //将输入到reduce中的key设置为0 key.set(0); return 0; } } }
接着在该包内新建一个MyInputFormat
类,注意这个类需要继承FileInputFormat
类,内容如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class MyInputFormat extends FileInputFormat<LongWritable, Text> { @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { //返回自定义的RecordReader类 return new RecordReaderApp.MyRecordReader(); } /** * 为了使切分数据的时候行号不发生错乱,这里设置为不进行切分 * */ protected boolean isSplitable(FileSystem fileSystem, Path filename){ return false; } }
然后在该包内新建一个RecordReaderApp
类,里面需要自定义一个RecordReader
类并让其继承抽象类RecordReader
,内容如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 public class RecordReaderApp { /** * 自定义RecordReader类 */ public static class MyRecordReader extends RecordReader<LongWritable, Text>{ //相对于整个分片来说,起始位置 private long startPosition; //相对于整个分片来说,结束位置 private long stopPostiton; //相对于整个分片来说,当前位置 private long nowPosition; //文件输入流 private FSDataInputStream fsDataInputStream = null; //定义Key、Value类型 private LongWritable key = null; private Text value = null; //定义行阅读器,注意是hadoop.util包下的类 private LineReader reader = null; @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { //获取分片 FileSplit fileSplit = (FileSplit) inputSplit; //获取起始位置 startPosition = fileSplit.getStart(); //获取结束位置 stopPostiton = fileSplit.getLength()+startPosition; //创建配置 Configuration configuration = taskAttemptContext.getConfiguration(); //获取文件路径 Path path = fileSplit.getPath(); //根据路径获取文件系统 FileSystem fileSystem = path.getFileSystem(configuration); //打开文件输入流 fsDataInputStream =fileSystem.open(path); //找到开始位置开始读取 fsDataInputStream.seek(startPosition); //创建阅读器 reader = new LineReader(fsDataInputStream); //将当前位置设置为1 nowPosition = 1; } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if(key ==null){ key = new LongWritable(); } key.set(nowPosition); if(value ==null){ value = new Text(); } if(reader.readLine(value)==0){ return false; } nowPosition ++; return true; } @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { fsDataInputStream.close(); } } public static class MyMapper extends Mapper<LongWritable, Text,LongWritable, Text> { /** * 重写map函数 * */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable,Text,LongWritable,Text>.Context context) throws IOException, InterruptedException { //直接将读取的记录写出去 context.write(key,value); } } public static class MyReducer extends Reducer<LongWritable,Text,Text,LongWritable> { //创建写出去的Key和Value private Text outKey = new Text(); private LongWritable outValue = new LongWritable(); /** * 重写reduce函数 * */ @Override public void reduce(LongWritable key, Iterable<Text> values, Reducer<LongWritable,Text,Text,LongWritable>.Context context) throws IOException, InterruptedException { System.out.println("奇数行还是偶数行:"+key); //定义求和的变量 long sum = 0; //遍历values进行求和 for(Text value:values){ //累加操作 sum += Long.parseLong(value.toString()); } //判断奇偶数 if(key.get()==0){ outKey.set("奇数之和为:"); }else{ outKey.set("偶数之和为:"); } //设置value outValue.set(sum); //将结果写出去 context.write(outKey,outValue); } } //定义一个driver public static void main(String[] args) throws Exception { String INPUT_PATH = "hdfs://master:9000/inrecordreader"; String OUTPUT_PATH = "hdfs://master:9000/outrecordreader"; Configuration configuration= new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration); if(fileSystem.exists(new Path(OUTPUT_PATH))){ fileSystem.delete(new Path(OUTPUT_PATH),true); } Job job = Job.getInstance(configuration,"RecordReaderApp"); //运行Jar类 job.setJarByClass(RecordReaderApp.class); //设置输入格式 //设置输入文件路径 FileInputFormat.setInputPaths(job,INPUT_PATH); job.setInputFormatClass(MyInputFormat.class); //设置自定义Mapper类和map函数输出数据的Key和Value的类型 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); //设置分区和reduce数量(reduce的数量应与分区数量相对应) //设置partitioner job.setPartitionerClass(MyPartitioner.class); //设置4个Reducer,每个分区一个 job.setNumReduceTasks(4); //Shuffle把数据从Map端拷贝到Reduce端 //指定Reducer类和输出Key、Value的类型 job.setReducerClass(MyReducer.class); //设置输出结果的key/value的类型,也就是最终存储在HDFS上结果文件的key/value的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //设置输出格式 //设置输出文件路径 Path outputPath = new Path(OUTPUT_PATH); FileOutputFormat.setOutputPath(job, outputPath); job.setOutputFormatClass(TextOutputFormat.class); //提交job //如果job运行成功,那么程序就会正常退出 System.exit(job.waitForCompletion(true)?0:1); } }
运行作业 接下来将提交该作业至集群中运行,操作过程如下所示: (1)在本地Windows机器上使用mvn clean package -DskipTests
命令或者直接在IDEA中使用maven插件进行清理打包,之后就可以生成envy-mapreduce-1.0-SNAPSHOT.jar
包,为了和之前的代码进行区分,这里将其重命名为envy-mapreduce-recordreader-1.0-SNAPSHOT.jar
; (2)将(1)中的jar包上传至虚拟机/home/hadoop/lib
目录下; (3)在虚拟机/home/hadoop
目录中新建一个recordreader.txt
的测试文件,其中的内容如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
之后在HDFS根目录下新建inrecordreader
目录(这个目录就必须与你设置的INPUT_PATH
保持一致,前面的hdfs://master:9000
表示主机),之后将这两个文件上传至该目录:
1 2 [root@master sbin]# hadoop fs -mkdir /inrecordreader [root@master sbin]# hadoop fs -put /home/hadoop/recordreader.txt /inrecordreader
之后确认文件上传HDFS中:
1 2 [root@master sbin]# hadoop fs -ls -R /inrecordreader -rw-r--r-- 1 root supergroup 39 2020-06-12 21:36 /inrecordreader/recordreader.txt
(4)提交MapReduce作业到集群中运行:
1 [root@master sbin]# hadoop jar /home/hadoop/lib/envy-mapreduce-recordreader-1.0-SNAPSHOT.jar com.envy.envymapreduce.recordreader.RecordReaderApp
执行流程如下所示:
(5)查看作业输出结果,如下所示:
1 2 3 4 5 6 7 [root@master lib]# hadoop fs -ls /outrecordreader Found 5 items -rw-r--r-- 1 root supergroup 0 2020-06-12 21:42 /outrecordreader/_SUCCESS -rw-r--r-- 1 root supergroup 22 2020-06-12 21:42 /outrecordreader/part-r-00000 -rw-r--r-- 1 root supergroup 22 2020-06-12 21:42 /outrecordreader/part-r-00001 -rw-r--r-- 1 root supergroup 0 2020-06-12 21:42 /outrecordreader/part-r-00002 -rw-r--r-- 1 root supergroup 0 2020-06-12 21:42 /outrecordreader/part-r-00003
之后查看一下各个文件的内容,如下所示:
1 2 3 [root@master lib]# hadoop fs -text /outrecordreader/part-r-* 奇数之和为: 64 偶数之和为: 72