写在前面 本篇来学习HDFS更高级的知识,主要包括Hadoop序列化机制、SequenceFile和MapFile的使用。
Hadoop序列化机制 序列化与反序列化 序列化是将对象转化为字节流,便于在网络上传输或者写入磁盘进行永久存储;而反序列化就是将字节流转回对象。
序列化在分布式数据处理的“进程间通信”和“永久存储”这两个领域出现的概率较大。请注意,在Hadoop中多个节点进程之间的通信是通过远程过程调用来实现的,即(Remote Procedure Call),简称RPC。
Hadoop的序列化 请注意Hadoop没有使用Java的序列化,而是采用自己的序列化机制。原因在于Hadoop的序列化,允许用户复用对象,这样就减少了Java对象的分配和回收,提高了应用效率。
Hadoop提供了Writable接口,用于实现序列化:
1 2 3 4 5 6 7 8 @Public @Stable public interface Writable { //将状态写入到DataOutput二进制流 void write(DataOutput var1) throws IOException; //从DataOutput二进制流中读取状态 void readFields(DataInput var1) throws IOException; }
但是该接口没有提供比较功能,因此将其和Java中的Comparable接口合并,就提供了WritableComparable接口:
1 2 3 4 @Public @Stable public interface WritableComparable<T> extends Writable, Comparable<T> { }
在实际工作中用的多的就是这个WritableComparable接口。
Hadoop序列化实例 接下来笔者将通过一个需求来演示Hadoop的序列化与反序列化过程,需求非常简单,就是将一个Student对象进行序列化,之后将序列化的对象反序列化即可。
这里依旧使用之前创建的Maven项目,名称为envy-hdfs。接下来简单说一下这个实现流程: (1)序列化对象类需要实现前面介绍的WritableComparable接口; (2)对于对象的属性,可以采用Hadoop的类型,也可以使用Java的类型; (3)需要重写write方法,它用于将对象转换为字节流并写入到输出流out中。请注意,如果属性是Hadoop的类型,那么使用name.write(out)
方式;如果属性是Java的类型,那么使用out.write(name)
方式,这一点需要引起注意; (4)需要重写readFields方法,它用于从输入流in中读取字节流并反序列对象。请注意,如果属性是Hadoop的类型,那么使用name.readFields(in)
方式;如果属性是Java的类型,那么使用in.readFields(name)
方式,这一点需要引起注意; (5)实现compareTo方法接口; (6)setter方法,采用Java类型的属性,便于客户端操作; (7)提供构造方法; (8)实现toString、hasCode和equals方法。
完整的示例过程如下所示:
第一步,创建待序列化和反序列化Student类 。新建pojo包,并在该包内创建Student类,请注意这个Student类需要实现前面介绍的WritableComparable接口:
1 2 3 public class Student implements WritableComparable<Student> { }
第二步,定义属性,使用Hadoop的类型 。在Student类中新增如下代码:
1 2 3 private Text name = new Text(); private IntWritable age = new IntWritable(); private Text sex = new Text();
第三步,提供构造方法、赋值set方法和实现toString、hasCode、equals和CompareTo方法 。在Student类中新增如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 //有参构造方法 public Student(Text name, IntWritable age, Text sex) { this.name = name; this.age = age; this.sex = sex; } //无参构造方法 public Student() { } //调用Hadoop提供的set方法来赋值 public Student(String name, int age, String sex) { this.name.set(name); this.age.set(age); this.sex.set(sex); } //调用Hadoop提供的set方法 public void set(String name,int age,String sex){ this.name.set(name); this.age.set(age); this.sex.set(sex); } //比较规则,姓名相同比较年龄,年龄相同比较性别 @Override public int compareTo(Student student){ int result = 0; int compare1 = name.compareTo(student.name); if(compare1!=0){ return compare1; } int compare2 = age.compareTo(student.age); if(compare2!=0){ return compare2; } int compare3 = sex.compareTo(student.sex); if(compare3!=0){ return compare3; } return result; } @Override public int hashCode() { final int prime =2020; int result = 1; result = prime*result +((name==null)? 0:name.hashCode()); result = prime*result +((age==null)? 0:age.hashCode()); result = prime*result +((sex==null)? 0:sex.hashCode()); return result; } @Override public boolean equals(Object obj) { if(this==obj){ return true; } if(obj==null){ return false; } if(getClass()!=obj.getClass()){ return false; } Student student = (Student) obj; if(name ==null){ if(student.name !=null){ return false; }else if(!name.equals(student.name)){ return false; } } if(age ==null){ if(student.age !=null){ return false; }else if(!age.equals(student.age)){ return false; } } if(sex ==null){ if(student.sex !=null){ return false; }else if(!sex.equals(student.sex)){ return false; } } return true; } @Override public String toString() { return "Student [name=]"+name+",age="+age+",sex="+sex+"]"; }
第四步,重写write和readFields方法 。在Student类中新增如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public void write(DataOutput dataOutput) throws IOException { name.write(dataOutput); age.write(dataOutput); sex.write(dataOutput); } @Override public void readFields(DataInput dataInput) throws IOException { name.readFields(dataInput); age.readFields(dataInput); sex.readFields(dataInput); }
第五步,创建序列化和反序列化工具类 。新建utils包,并在该包内创建HadoopSerializationUtil类,请注意这个HadoopSerializationUtil类里面需要提供序列化和反序列化方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class HadoopSerializationUtil { //序列化方法 public static byte[] serialize(Writable writable) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputStream dataOut = new DataOutputStream(out); writable.write(dataOut); dataOut.close(); return out.toByteArray(); } //反序列化方法 public static void deserialize(Writable writable,byte[] bytes) throws IOException { ByteArrayInputStream in = new ByteArrayInputStream(bytes); DataInputStream dataIn = new DataInputStream(in); writable.readFields(dataIn); dataIn.close(); } }
第六步,创建Test类 。新建Test类,请注意这个Test类用于测试对象的序列化和反序列化操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 public class Test { public static void main(String[] args) throws IOException { //测试序列化 Student student = new Student("envyzhan",25,"man"); byte[] bytes = HadoopSerializationUtil.serialize(student); System.out.println(bytes); //测试反序列化 Student s = new Student(); HadoopSerializationUtil.deserialize(s,bytes); System.out.println(s); } }
可以看到该main方法的执行结果如下所示:
1 2 [B@497470ed Student [name=]envyzhan,age=25,sex=man]
这也就说明我们对于Student类的序列化和反序列化操作是成功的。
SequenceFile使用 SequenceFile简介 SequenceFile是Hadoop提供的一种对于二进制文件的支持,二进制文件直接将Key-Value键值对序列化到文件中。
在前面我们知道HDFS非常适合大文件的存储,而对小文件其实不太适合,小文件过多会导致NameNode的压力变大,这是因为每个文件都会有一条元数据信息存储在NameNode上,而小文件越多就意味着在NameNode上存储的元数据信息就非常多。
既然Hadoop对大文件的支持非常友好,因此可以通过SequenceFile来将小文件进行合并,进而获取更高效的存储和计算。SequenceFile中的Key和Value可以是任意类型的Writable或者自定义的Writable类型。
这里有一个需要注意的地方就是,假设现在需要存储1000G的数据,这个大小是固定的,那么如果采用SequenceFile来进行存储,则实际的占用空间是大于1000G的,因为SequenceFile为了便于后续查找,会在存储中添加一些额外信息,这样就造成了实际情况比固定数据大小要大的情况。
SequenceFile特点 SequenceFile有三个特点,分别是支持压缩、本地化任务支持和难度低。
(1)支持压缩 。SequenceFile可以设置为基于Record(记录)和Block(块)的压缩。
首先学习无压缩类型,这是默认设置,即SequenceFile如果没有启动压缩,那么每个记录就由它的记录长度(字节数)、键的长度、键和值组成,长度字段为4字节。
下图展示了SequenceFile的内部结构:
简单点就是Record针对行压缩,且只压缩Value部分,不压缩Key;而Block则对Key和Value都进行压缩。
(2)本地化任务支持 。由于文件是可以被切分的,因此在运行MapReduce任务时数据的本地化情况是非常好的,就可以尽可能的发起Map Task来进行并行处理,进而提高作业的执行效率。
(3)难度低 。由于使用Hadoop提供的API,因此业务逻辑一侧的修改其实是非常简单的,直接使用API即可。
SequenceFile文件写操作 SequenceFile文件写操作的流程如下所示: (1)设置Configuration; (2)获取FileSystem; (3)设置文件输出路径; (4)调用SequenceFile.createWriter()
方法来创建SequenceFile.Write
写入; (5)调用SequenceFile.Write.append
来追加写入; (6)关闭流。
由于前面几步都已经学习过,因此这里就着重介绍(4)-(6)步,只需在app包内新建一个SequenceFileApp类,相应的代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class SequenceFileWriter { private static final String HDFS_PATH = "hdfs://master:9000"; private static String[] data = {"a,b,c,d,e,f,g","e,f,g,h,i,j,k","l,m,n,o,p,q,r,s","t,u,v,w,x,y,z"}; Configuration configuration = null; FileSystem fileSystem = null; @Before public void setUp() throws Exception { System.out.println("*********HDFSApp.setUp()**********"); configuration = new Configuration(); configuration.set("fs.defaultFS",HDFS_PATH); fileSystem = FileSystem.get(new URI(HDFS_PATH),configuration); } @After public void tearDown() throws Exception { fileSystem = null; configuration = null; System.out.println("*********HDFSApp.tearDown()**********"); } /** * SequenceFile文件写操作 * */ @Test public void SequenceFileWriteTest() throws IOException { Path outputPath = new Path("EnvySequenceFile.seq"); IntWritable key = new IntWritable(); Text value = new Text(); SequenceFile.Writer writer = SequenceFile.createWriter(fileSystem,configuration,outputPath,IntWritable.class,Text.class); for(int i=0;i<10;i++){ key.set(10-i); value.set(data[i%data.length]); writer.append(key,value); } IOUtils.closeStream(writer); } }
执行上述SequenceFileWriteTest方法,执行查看一下HDFS,请注意文件的默认HDFS写出路径为/user/<user>/
,查看一下:
1 2 3 [root@master usr]# hadoop fs -ls -R /user drwxr-xr-x - Administrator supergroup 0 2020-06-02 15:57 /user/Administrator -rw-r--r-- 3 Administrator supergroup 472 2020-06-02 15:57 /user/Administrator/EnvySequenceFile.seq
可以看到确实在/user/Administrator/
目录下生成了之前的EnvySequenceFile.seq
文件,查看一下这个文件的信息:
1 2 3 4 5 6 7 8 9 10 11 12 [root@master usr]# hadoop fs -text /user/Administrator/EnvySequenceFile.seq 20/06/02 15:57:08 INFO compress.CodecPool: Got brand-new decompressor [.deflate] 10 a,b,c,d,e,f,g 9 e,f,g,h,i,j,k 8 l,m,n,o,p,q,r,s 7 t,u,v,w,x,y,z 6 a,b,c,d,e,f,g 5 e,f,g,h,i,j,k 4 l,m,n,o,p,q,r,s 3 t,u,v,w,x,y,z 2 a,b,c,d,e,f,g 1 e,f,g,h,i,j,k
SequenceFile文件读操作 既然有写操作,那么自然就有读操作,接下来学习SequenceFile文件读操作的流程,如下所示: (1)设置Configuration; (2)获取FileSystem; (3)设置文件输出路径; (4)调用SequenceFile.Reader()方法来创建SequenceFile.Reader
读取类; (5)获取Key和Value的class; (6)读取; (7)关闭流。
前三步和写操作完全一致,因此可以复用,着重介绍(4)-(7)步,只需在SequenceFileApp类中新建SequenceFileReadTest方法,相应的代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Test public void SequenceFileReadTest() throws IOException { Path inputPath = new Path("EnvySequenceFile.seq"); SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem,inputPath,configuration); Writable keyClass = (Writable)ReflectionUtils.newInstance(reader.getKeyClass(),configuration); Writable valueClass = (Writable)ReflectionUtils.newInstance(reader.getValueClass(),configuration); while (reader.next(keyClass,valueClass)){ System.out.println("key:"+keyClass); System.out.println("value:"+valueClass); System.out.println("position:"+reader.getPosition()); } IOUtils.closeStream(reader); }
执行该方法,可以看到控制台输出如下信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 key:10 value:a,b,c,d,e,f,g position:162 key:9 value:e,f,g,h,i,j,k position:196 key:8 value:l,m,n,o,p,q,r,s position:232 key:7 value:t,u,v,w,x,y,z position:266 key:6 value:a,b,c,d,e,f,g position:300 key:5 value:e,f,g,h,i,j,k position:334 key:4 value:l,m,n,o,p,q,r,s position:370 key:3 value:t,u,v,w,x,y,z position:404 key:2 value:a,b,c,d,e,f,g position:438 key:1 value:e,f,g,h,i,j,k position:472 *********HDFSApp.tearDown()**********
这个之前我们使用Hadoop Shell命令查看的信息是完全一致的。
SequenceFile使用压缩来写操作 前面学习的写操作是没有使用压缩方式,接下来就学习SequenceFile使用压缩来进修写操作。请注意SequenceFile写操作的压缩也支持Record和Block两种,它在读取时可以自动解压。
还是以之前文件写操作为例来进行学习,在SequenceFileApp类中新建SequenceFileCompressionWriteTest
方法,相应的代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Test public void SequenceFileCompressionWriteTest() throws IOException { Path outputPath = new Path("EnvySequenceFileCompression.seq"); IntWritable key = new IntWritable(); Text value = new Text(); SequenceFile.Writer writer = SequenceFile.createWriter( fileSystem,configuration,outputPath,IntWritable.class,Text.class, SequenceFile.CompressionType.RECORD,new BZip2Codec() ); for(int i=0;i<10;i++){ key.set(10-i); value.set(data[i%data.length]); writer.append(key,value); } IOUtils.closeStream(writer); }
执行上述SequenceFileCompressionWriteTest方法,执行查看一下HDFS,请注意文件的默认HDFS写出路径为/user/<user>/
,查看一下:
1 2 3 4 [root@master usr]# hadoop fs -ls -R /user drwxr-xr-x - Administrator supergroup 0 2020-06-02 15:57 /user/Administrator -rw-r--r-- 3 Administrator supergroup 472 2020-06-02 15:57 /user/Administrator/EnvySequenceFile.seq -rw-r--r-- 3 Administrator supergroup 740 2020-06-02 17:06 /user/Administrator/EnvySequenceFileCompression.seq
其实这个只是在文件写入的时候指定了压缩算法,其余的和不压缩时的操作一样。由于SequenceFile在读取数据时可以自动解压,因此读取压缩与未压缩的文件代码完全一致,这里就忽略。
MapFile使用 MapFile简介 MapFile是排过序的SequenceFile,它由data和index这两部分组成。index是文件的数据索引,用于记录每个Record的Key值,以及该Record在文件中的偏移位置。在MapFile被访问的时候,索引文件会先被加载到内存中,通过index索引关系可以快速定位到指定Record所在的文件位置。
也就是说MapFile的检索效率比SequenceFile更高,缺点就是需要消耗一部分内存来存储index数据。
MapFile文件写操作 MapFile文件写操作的流程如下所示: (1)设置Configuration; (2)获取FileSystem; (3)设置文件输出路径; (4)调用MapFile.Writer()
方法来创建MapFile.Write
写入; (5)调用MapFile.Write.append
来追加写入; (6)关闭流。
可以看到这个过程和SequenceFile文件写操作的流程几乎一样,所不同的是创建的对象和调用的方法不同。
在app包内新建一个MapFileApp类,相应的代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class MapFileApp { private static final String HDFS_PATH = "hdfs://master:9000"; Configuration configuration = null; FileSystem fileSystem = null; @Before public void setUp() throws Exception { System.out.println("*********HDFSApp.setUp()**********"); configuration = new Configuration(); configuration.set("fs.defaultFS",HDFS_PATH); fileSystem = FileSystem.get(new URI(HDFS_PATH),configuration); } @After public void tearDown() throws Exception { fileSystem = null; configuration = null; System.out.println("*********HDFSApp.tearDown()**********"); } /** * MapFile文件写操作 * */ @Test public void MapFileWriteTest() throws IOException { Path outputPath = new Path("EnvyMapFile.map"); Text key = new Text(); key.set("envymapKey"); Text value = new Text(); value.set("envymapvalue"); MapFile.Writer writer = new MapFile.Writer(configuration,fileSystem,outputPath.toString(),Text.class,Text.class); writer.append(key,value); IOUtils.closeStream(writer); } }
执行上述MapFileWriteTest
方法,执行查看一下HDFS,请注意文件的默认HDFS写出路径为/user/<user>/
,查看一下:
1 2 3 4 5 6 7 [root@master usr]# hadoop fs -ls -R /user drwxr-xr-x - Administrator supergroup 0 2020-06-02 17:28 /user/Administrator drwxr-xr-x - Administrator supergroup 0 2020-06-02 17:28 /user/Administrator/EnvyMapFile.map -rw-r--r-- 3 Administrator supergroup 161 2020-06-02 17:28 /user/Administrator/EnvyMapFile.map/data -rw-r--r-- 3 Administrator supergroup 203 2020-06-02 17:28 /user/Administrator/EnvyMapFile.map/index -rw-r--r-- 3 Administrator supergroup 472 2020-06-02 15:57 /user/Administrator/EnvySequenceFile.seq -rw-r--r-- 3 Administrator supergroup 740 2020-06-02 17:06 /user/Administrator/EnvySequenceFileCompression.seq
可以看到在/user/Administrator/
目录下已经存在EnvyMapFile.map
目录,且里面存在data和index这两个文件,后期将通过读取这个index中的key来获取data中的value信息。
MapFile文件读操作 MapFile文件写操作的流程如下所示: (1)设置Configuration; (2)获取FileSystem; (3)设置文件输入路径; (4)调用MapFile.Reader()
方法来创建MapFile.Reader
读取类; (5)获取Key和Value的class; (6)读取; (7)关闭流
可以看到这个过程和SequenceFile文件读操作的流程几乎一样,所不同的是创建的对象和调用的方法不同。
在MapFileApp类中新建MapFileReadTest方法,相应的代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 @Test public void MapFileReadTest() throws IOException { Path inputPath = new Path("EnvyMapFile.map"); MapFile.Reader reader = new MapFile.Reader(fileSystem,inputPath.toString(),configuration); Writable keyClass = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(),configuration); Writable valueClass = (Writable)ReflectionUtils.newInstance(reader.getValueClass(),configuration); while (reader.next((WritableComparable) keyClass,valueClass)){ System.out.println("key:"+keyClass); System.out.println("value:"+valueClass); } IOUtils.closeStream(reader); }
执行该方法,可以看到控制台输出如下信息:
1 2 3 key:envymapKey value:envymapvalue *********HDFSApp.tearDown()**********
这样我们就成功的获取到了data中的数据。
小结 其实在实际工作中,输入的数据往往都是小文件,即文件大小小于HDFS文件系统数据块的大小。但是每一个存储在HDFS中的文件、目录和块都会映射为一个对象,存储在NameNode服务器内存中,通常占用150字节。假设有1000万个文件,那么就需要消耗大约3GB的内存空间;假设有10亿个文件呢?那么就需要300GB的内存空间,很明显300GB在普通PC上是不可能实现的,因此就需要开发一个可以将小文件进行合并的程序,这样才能解决小文件问题。