写在前面

本篇来学习HDFS更高级的知识,主要包括Hadoop序列化机制、SequenceFile和MapFile的使用。

Hadoop序列化机制

序列化与反序列化

序列化是将对象转化为字节流,便于在网络上传输或者写入磁盘进行永久存储;而反序列化就是将字节流转回对象。

序列化在分布式数据处理的“进程间通信”和“永久存储”这两个领域出现的概率较大。请注意,在Hadoop中多个节点进程之间的通信是通过远程过程调用来实现的,即(Remote Procedure Call),简称RPC。

Hadoop的序列化

请注意Hadoop没有使用Java的序列化,而是采用自己的序列化机制。原因在于Hadoop的序列化,允许用户复用对象,这样就减少了Java对象的分配和回收,提高了应用效率。

Hadoop提供了Writable接口,用于实现序列化:

1
2
3
4
5
6
7
8
@Public
@Stable
public interface Writable {
//将状态写入到DataOutput二进制流
void write(DataOutput var1) throws IOException;
//从DataOutput二进制流中读取状态
void readFields(DataInput var1) throws IOException;
}

但是该接口没有提供比较功能,因此将其和Java中的Comparable接口合并,就提供了WritableComparable接口:

1
2
3
4
@Public
@Stable
public interface WritableComparable<T> extends Writable, Comparable<T> {
}

在实际工作中用的多的就是这个WritableComparable接口。

Hadoop序列化实例

接下来笔者将通过一个需求来演示Hadoop的序列化与反序列化过程,需求非常简单,就是将一个Student对象进行序列化,之后将序列化的对象反序列化即可。

这里依旧使用之前创建的Maven项目,名称为envy-hdfs。接下来简单说一下这个实现流程:
(1)序列化对象类需要实现前面介绍的WritableComparable接口;
(2)对于对象的属性,可以采用Hadoop的类型,也可以使用Java的类型;
(3)需要重写write方法,它用于将对象转换为字节流并写入到输出流out中。请注意,如果属性是Hadoop的类型,那么使用name.write(out)方式;如果属性是Java的类型,那么使用out.write(name)方式,这一点需要引起注意;
(4)需要重写readFields方法,它用于从输入流in中读取字节流并反序列对象。请注意,如果属性是Hadoop的类型,那么使用name.readFields(in)方式;如果属性是Java的类型,那么使用in.readFields(name)方式,这一点需要引起注意;
(5)实现compareTo方法接口;
(6)setter方法,采用Java类型的属性,便于客户端操作;
(7)提供构造方法;
(8)实现toString、hasCode和equals方法。

完整的示例过程如下所示:

第一步,创建待序列化和反序列化Student类。新建pojo包,并在该包内创建Student类,请注意这个Student类需要实现前面介绍的WritableComparable接口:

1
2
3
public class Student implements WritableComparable<Student> {

}

第二步,定义属性,使用Hadoop的类型。在Student类中新增如下代码:

1
2
3
private Text name = new Text();
private IntWritable age = new IntWritable();
private Text sex = new Text();

第三步,提供构造方法、赋值set方法和实现toString、hasCode、equals和CompareTo方法。在Student类中新增如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//有参构造方法
public Student(Text name, IntWritable age, Text sex) {
this.name = name;
this.age = age;
this.sex = sex;
}

//无参构造方法
public Student() {
}

//调用Hadoop提供的set方法来赋值
public Student(String name, int age, String sex) {
this.name.set(name);
this.age.set(age);
this.sex.set(sex);
}

//调用Hadoop提供的set方法
public void set(String name,int age,String sex){
this.name.set(name);
this.age.set(age);
this.sex.set(sex);
}

//比较规则,姓名相同比较年龄,年龄相同比较性别
@Override
public int compareTo(Student student){
int result = 0;
int compare1 = name.compareTo(student.name);
if(compare1!=0){
return compare1;
}

int compare2 = age.compareTo(student.age);
if(compare2!=0){
return compare2;
}

int compare3 = sex.compareTo(student.sex);
if(compare3!=0){
return compare3;
}
return result;
}

@Override
public int hashCode() {
final int prime =2020;
int result = 1;
result = prime*result +((name==null)? 0:name.hashCode());
result = prime*result +((age==null)? 0:age.hashCode());
result = prime*result +((sex==null)? 0:sex.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if(this==obj){
return true;
}
if(obj==null){
return false;
}
if(getClass()!=obj.getClass()){
return false;
}
Student student = (Student) obj;
if(name ==null){
if(student.name !=null){
return false;
}else if(!name.equals(student.name)){
return false;
}
}
if(age ==null){
if(student.age !=null){
return false;
}else if(!age.equals(student.age)){
return false;
}
}
if(sex ==null){
if(student.sex !=null){
return false;
}else if(!sex.equals(student.sex)){
return false;
}
}
return true;
}

@Override
public String toString() {
return "Student [name=]"+name+",age="+age+",sex="+sex+"]";
}

第四步,重写write和readFields方法。在Student类中新增如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void write(DataOutput dataOutput) throws IOException {
name.write(dataOutput);
age.write(dataOutput);
sex.write(dataOutput);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
name.readFields(dataInput);
age.readFields(dataInput);
sex.readFields(dataInput);
}

第五步,创建序列化和反序列化工具类。新建utils包,并在该包内创建HadoopSerializationUtil类,请注意这个HadoopSerializationUtil类里面需要提供序列化和反序列化方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class HadoopSerializationUtil {
//序列化方法
public static byte[] serialize(Writable writable) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataOut = new DataOutputStream(out);
writable.write(dataOut);
dataOut.close();
return out.toByteArray();
}

//反序列化方法
public static void deserialize(Writable writable,byte[] bytes) throws IOException {
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
DataInputStream dataIn = new DataInputStream(in);
writable.readFields(dataIn);
dataIn.close();
}
}

第六步,创建Test类。新建Test类,请注意这个Test类用于测试对象的序列化和反序列化操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Test {
public static void main(String[] args) throws IOException {
//测试序列化
Student student = new Student("envyzhan",25,"man");
byte[] bytes = HadoopSerializationUtil.serialize(student);
System.out.println(bytes);

//测试反序列化
Student s = new Student();
HadoopSerializationUtil.deserialize(s,bytes);
System.out.println(s);
}
}

可以看到该main方法的执行结果如下所示:

1
2
[B@497470ed
Student [name=]envyzhan,age=25,sex=man]

这也就说明我们对于Student类的序列化和反序列化操作是成功的。

SequenceFile使用

SequenceFile简介

SequenceFile是Hadoop提供的一种对于二进制文件的支持,二进制文件直接将Key-Value键值对序列化到文件中。

在前面我们知道HDFS非常适合大文件的存储,而对小文件其实不太适合,小文件过多会导致NameNode的压力变大,这是因为每个文件都会有一条元数据信息存储在NameNode上,而小文件越多就意味着在NameNode上存储的元数据信息就非常多。

既然Hadoop对大文件的支持非常友好,因此可以通过SequenceFile来将小文件进行合并,进而获取更高效的存储和计算。SequenceFile中的Key和Value可以是任意类型的Writable或者自定义的Writable类型。

这里有一个需要注意的地方就是,假设现在需要存储1000G的数据,这个大小是固定的,那么如果采用SequenceFile来进行存储,则实际的占用空间是大于1000G的,因为SequenceFile为了便于后续查找,会在存储中添加一些额外信息,这样就造成了实际情况比固定数据大小要大的情况。

SequenceFile特点

SequenceFile有三个特点,分别是支持压缩、本地化任务支持和难度低。

(1)支持压缩。SequenceFile可以设置为基于Record(记录)和Block(块)的压缩。

首先学习无压缩类型,这是默认设置,即SequenceFile如果没有启动压缩,那么每个记录就由它的记录长度(字节数)、键的长度、键和值组成,长度字段为4字节。

下图展示了SequenceFile的内部结构:

简单点就是Record针对行压缩,且只压缩Value部分,不压缩Key;而Block则对Key和Value都进行压缩。

(2)本地化任务支持。由于文件是可以被切分的,因此在运行MapReduce任务时数据的本地化情况是非常好的,就可以尽可能的发起Map Task来进行并行处理,进而提高作业的执行效率。

(3)难度低。由于使用Hadoop提供的API,因此业务逻辑一侧的修改其实是非常简单的,直接使用API即可。

SequenceFile文件写操作

SequenceFile文件写操作的流程如下所示:
(1)设置Configuration;
(2)获取FileSystem;
(3)设置文件输出路径;
(4)调用SequenceFile.createWriter()方法来创建SequenceFile.Write写入;
(5)调用SequenceFile.Write.append来追加写入;
(6)关闭流。

由于前面几步都已经学习过,因此这里就着重介绍(4)-(6)步,只需在app包内新建一个SequenceFileApp类,相应的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class SequenceFileWriter {
private static final String HDFS_PATH = "hdfs://master:9000";
private static String[] data = {"a,b,c,d,e,f,g","e,f,g,h,i,j,k","l,m,n,o,p,q,r,s","t,u,v,w,x,y,z"};

Configuration configuration = null;
FileSystem fileSystem = null;

@Before
public void setUp() throws Exception {
System.out.println("*********HDFSApp.setUp()**********");
configuration = new Configuration();
configuration.set("fs.defaultFS",HDFS_PATH);
fileSystem = FileSystem.get(new URI(HDFS_PATH),configuration);
}

@After
public void tearDown() throws Exception {
fileSystem = null;
configuration = null;
System.out.println("*********HDFSApp.tearDown()**********");
}

/**
* SequenceFile文件写操作
* */
@Test
public void SequenceFileWriteTest() throws IOException {
Path outputPath = new Path("EnvySequenceFile.seq");
IntWritable key = new IntWritable();
Text value = new Text();

SequenceFile.Writer writer = SequenceFile.createWriter(fileSystem,configuration,outputPath,IntWritable.class,Text.class);

for(int i=0;i<10;i++){
key.set(10-i);
value.set(data[i%data.length]);
writer.append(key,value);
}
IOUtils.closeStream(writer);
}
}

执行上述SequenceFileWriteTest方法,执行查看一下HDFS,请注意文件的默认HDFS写出路径为/user/<user>/,查看一下:

1
2
3
[root@master usr]# hadoop fs -ls -R /user
drwxr-xr-x - Administrator supergroup 0 2020-06-02 15:57 /user/Administrator
-rw-r--r-- 3 Administrator supergroup 472 2020-06-02 15:57 /user/Administrator/EnvySequenceFile.seq

可以看到确实在/user/Administrator/目录下生成了之前的EnvySequenceFile.seq文件,查看一下这个文件的信息:

1
2
3
4
5
6
7
8
9
10
11
12
[root@master usr]# hadoop fs -text /user/Administrator/EnvySequenceFile.seq
20/06/02 15:57:08 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
10 a,b,c,d,e,f,g
9 e,f,g,h,i,j,k
8 l,m,n,o,p,q,r,s
7 t,u,v,w,x,y,z
6 a,b,c,d,e,f,g
5 e,f,g,h,i,j,k
4 l,m,n,o,p,q,r,s
3 t,u,v,w,x,y,z
2 a,b,c,d,e,f,g
1 e,f,g,h,i,j,k

SequenceFile文件读操作

既然有写操作,那么自然就有读操作,接下来学习SequenceFile文件读操作的流程,如下所示:
(1)设置Configuration;
(2)获取FileSystem;
(3)设置文件输出路径;
(4)调用SequenceFile.Reader()方法来创建SequenceFile.Reader读取类;
(5)获取Key和Value的class;
(6)读取;
(7)关闭流。

前三步和写操作完全一致,因此可以复用,着重介绍(4)-(7)步,只需在SequenceFileApp类中新建SequenceFileReadTest方法,相应的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
public void SequenceFileReadTest() throws IOException {
Path inputPath = new Path("EnvySequenceFile.seq");

SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem,inputPath,configuration);

Writable keyClass = (Writable)ReflectionUtils.newInstance(reader.getKeyClass(),configuration);
Writable valueClass = (Writable)ReflectionUtils.newInstance(reader.getValueClass(),configuration);
while (reader.next(keyClass,valueClass)){
System.out.println("key:"+keyClass);
System.out.println("value:"+valueClass);
System.out.println("position:"+reader.getPosition());
}
IOUtils.closeStream(reader);
}

执行该方法,可以看到控制台输出如下信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
key:10
value:a,b,c,d,e,f,g
position:162
key:9
value:e,f,g,h,i,j,k
position:196
key:8
value:l,m,n,o,p,q,r,s
position:232
key:7
value:t,u,v,w,x,y,z
position:266
key:6
value:a,b,c,d,e,f,g
position:300
key:5
value:e,f,g,h,i,j,k
position:334
key:4
value:l,m,n,o,p,q,r,s
position:370
key:3
value:t,u,v,w,x,y,z
position:404
key:2
value:a,b,c,d,e,f,g
position:438
key:1
value:e,f,g,h,i,j,k
position:472
*********HDFSApp.tearDown()**********

这个之前我们使用Hadoop Shell命令查看的信息是完全一致的。

SequenceFile使用压缩来写操作

前面学习的写操作是没有使用压缩方式,接下来就学习SequenceFile使用压缩来进修写操作。请注意SequenceFile写操作的压缩也支持Record和Block两种,它在读取时可以自动解压。

还是以之前文件写操作为例来进行学习,在SequenceFileApp类中新建SequenceFileCompressionWriteTest方法,相应的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void SequenceFileCompressionWriteTest() throws IOException {
Path outputPath = new Path("EnvySequenceFileCompression.seq");
IntWritable key = new IntWritable();
Text value = new Text();

SequenceFile.Writer writer = SequenceFile.createWriter(
fileSystem,configuration,outputPath,IntWritable.class,Text.class,
SequenceFile.CompressionType.RECORD,new BZip2Codec()
);

for(int i=0;i<10;i++){
key.set(10-i);
value.set(data[i%data.length]);
writer.append(key,value);
}
IOUtils.closeStream(writer);
}

执行上述SequenceFileCompressionWriteTest方法,执行查看一下HDFS,请注意文件的默认HDFS写出路径为/user/<user>/,查看一下:

1
2
3
4
[root@master usr]# hadoop fs -ls -R /user
drwxr-xr-x - Administrator supergroup 0 2020-06-02 15:57 /user/Administrator
-rw-r--r-- 3 Administrator supergroup 472 2020-06-02 15:57 /user/Administrator/EnvySequenceFile.seq
-rw-r--r-- 3 Administrator supergroup 740 2020-06-02 17:06 /user/Administrator/EnvySequenceFileCompression.seq

其实这个只是在文件写入的时候指定了压缩算法,其余的和不压缩时的操作一样。由于SequenceFile在读取数据时可以自动解压,因此读取压缩与未压缩的文件代码完全一致,这里就忽略。

MapFile使用

MapFile简介

MapFile是排过序的SequenceFile,它由data和index这两部分组成。index是文件的数据索引,用于记录每个Record的Key值,以及该Record在文件中的偏移位置。在MapFile被访问的时候,索引文件会先被加载到内存中,通过index索引关系可以快速定位到指定Record所在的文件位置。

也就是说MapFile的检索效率比SequenceFile更高,缺点就是需要消耗一部分内存来存储index数据。

MapFile文件写操作

MapFile文件写操作的流程如下所示:
(1)设置Configuration;
(2)获取FileSystem;
(3)设置文件输出路径;
(4)调用MapFile.Writer()方法来创建MapFile.Write写入;
(5)调用MapFile.Write.append来追加写入;
(6)关闭流。

可以看到这个过程和SequenceFile文件写操作的流程几乎一样,所不同的是创建的对象和调用的方法不同。

在app包内新建一个MapFileApp类,相应的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class MapFileApp {
private static final String HDFS_PATH = "hdfs://master:9000";

Configuration configuration = null;
FileSystem fileSystem = null;

@Before
public void setUp() throws Exception {
System.out.println("*********HDFSApp.setUp()**********");
configuration = new Configuration();
configuration.set("fs.defaultFS",HDFS_PATH);
fileSystem = FileSystem.get(new URI(HDFS_PATH),configuration);
}

@After
public void tearDown() throws Exception {
fileSystem = null;
configuration = null;
System.out.println("*********HDFSApp.tearDown()**********");
}

/**
* MapFile文件写操作
* */
@Test
public void MapFileWriteTest() throws IOException {
Path outputPath = new Path("EnvyMapFile.map");

Text key = new Text();
key.set("envymapKey");
Text value = new Text();
value.set("envymapvalue");

MapFile.Writer writer = new MapFile.Writer(configuration,fileSystem,outputPath.toString(),Text.class,Text.class);
writer.append(key,value);

IOUtils.closeStream(writer);
}
}

执行上述MapFileWriteTest方法,执行查看一下HDFS,请注意文件的默认HDFS写出路径为/user/<user>/,查看一下:

1
2
3
4
5
6
7
[root@master usr]# hadoop fs -ls -R /user
drwxr-xr-x - Administrator supergroup 0 2020-06-02 17:28 /user/Administrator
drwxr-xr-x - Administrator supergroup 0 2020-06-02 17:28 /user/Administrator/EnvyMapFile.map
-rw-r--r-- 3 Administrator supergroup 161 2020-06-02 17:28 /user/Administrator/EnvyMapFile.map/data
-rw-r--r-- 3 Administrator supergroup 203 2020-06-02 17:28 /user/Administrator/EnvyMapFile.map/index
-rw-r--r-- 3 Administrator supergroup 472 2020-06-02 15:57 /user/Administrator/EnvySequenceFile.seq
-rw-r--r-- 3 Administrator supergroup 740 2020-06-02 17:06 /user/Administrator/EnvySequenceFileCompression.seq

可以看到在/user/Administrator/目录下已经存在EnvyMapFile.map目录,且里面存在data和index这两个文件,后期将通过读取这个index中的key来获取data中的value信息。

MapFile文件读操作

MapFile文件写操作的流程如下所示:
(1)设置Configuration;
(2)获取FileSystem;
(3)设置文件输入路径;
(4)调用MapFile.Reader()方法来创建MapFile.Reader读取类;
(5)获取Key和Value的class;
(6)读取;
(7)关闭流

可以看到这个过程和SequenceFile文件读操作的流程几乎一样,所不同的是创建的对象和调用的方法不同。

在MapFileApp类中新建MapFileReadTest方法,相应的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void MapFileReadTest() throws IOException {
Path inputPath = new Path("EnvyMapFile.map");

MapFile.Reader reader = new MapFile.Reader(fileSystem,inputPath.toString(),configuration);
Writable keyClass = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(),configuration);
Writable valueClass = (Writable)ReflectionUtils.newInstance(reader.getValueClass(),configuration);
while (reader.next((WritableComparable) keyClass,valueClass)){
System.out.println("key:"+keyClass);
System.out.println("value:"+valueClass);
}
IOUtils.closeStream(reader);
}

执行该方法,可以看到控制台输出如下信息:

1
2
3
key:envymapKey
value:envymapvalue
*********HDFSApp.tearDown()**********

这样我们就成功的获取到了data中的数据。

小结

其实在实际工作中,输入的数据往往都是小文件,即文件大小小于HDFS文件系统数据块的大小。但是每一个存储在HDFS中的文件、目录和块都会映射为一个对象,存储在NameNode服务器内存中,通常占用150字节。假设有1000万个文件,那么就需要消耗大约3GB的内存空间;假设有10亿个文件呢?那么就需要300GB的内存空间,很明显300GB在普通PC上是不可能实现的,因此就需要开发一个可以将小文件进行合并的程序,这样才能解决小文件问题。