写在前面

在实际工作中经常会遇到需要同步数据的场景,在业务和数据量比较小的情况下,我们会在项目中通过编写一些定时任务来同步数据。但是随着业务系统的迭代、数据量的增多以及多种复杂场景下的分库分表实现,使得数据同步变得越来越具挑战性。针对这种情况,笔者决定使用阿里开源的Canal这一中间件来解决数据同步问题。

Canal

Canal简说

Canal是阿里巴巴开源的一款基于MySQL数据库的增量日志解析中间件,提供增量数据的订阅和消费功能。由于它是基于MySQL的日志进行的增量解析,因此对原有的业务代码完全不侵入。

Canal的工作原理是解析MySQL的binlog日志,提供增量数据的订阅和消费功能,具体包括:数据库镜像、数据库实时备份、业务缓存刷新、带业务逻辑的增量数据处理以及索引构建和实时维护(拆分异构索引和倒排索引)。

Canal官方文档,点击 这里 进行查阅。

Canal如何传输数据

下图是Canal官方展示的Canal传输数据架构图,可以看到Canal支持RabbitMQ、RocketMQ或者ES等常用的消息中间件:

从上图中可以知道Canal分为服务端和客户端,一般阿里开源的程序都是这个规范。其中服务端负责解析MySQL的binlog日志,并传输增量数据给客户端或者消息中间件;而客户端负责解析服务端传递过来的数据,并处理自己的业务逻辑。

当然,数据同步除了本篇学习的Canal外,还有其他一些优秀的开源工具,它们均支持MySQL,但是也存在一些区别。下面笔者将从支持实时同步、增量同步、需要编写保存变更数据的业务代码以及社区活跃度这4个方面进行对比,如下表所示:

Canal Debezium DataX Databus Flinkx Bifrost
实时同步 支持 支持 不支持 支持 支持 支持
增量同步 支持 支持 不支持 支持 支持 支持
是否需要自己编写保存变更数据的代码
社区活跃度 不高 一般 一般

接下来将以Canal为例,来学习如何进行数据的同步。

搭建服务端

安装Canal服务端

点击 这里 下载Canal的服务端,即名称为canal.developer前缀的包:

解压后的目录如下所示:

其中bin里面存放的是启动和停止脚本;conf目录存放的是配置文件;lib目录存放的是项目所依赖的包;logs目录存放的是项目运行的日志文件;plugin目录存放的是插件。其实这个canal服务端是一个SpringBoot项目,可以从其启动脚本中看出:

修改MySQL配置信息

第一步,找到MySQL安装目录下的my.ini文件,在里面新增如下配置信息:

1
2
3
4
5
6
#开启binlog
log-bin=mysql-bin
#使用ROW模式
binlog-format=ROW
# 设置MySQL replication,注意不要和canal的slaveId重复就行
server_id=1

第二步,设置Canal服务端中关于MySQL的相关配置信息,即告诉Canal应该去监听哪个数据库的哪个表的日志文件。

注意一个Canal服务端中可以监听多个MySQL实例,Canal默认自带了一个example实例(在其conf/example目录下有一个名为instance.properties的文件),本篇笔记就以example为例来进行学习。如果开发者需要增加实例,那么需要复制example文件夹中的内容到同级目录中,并在同级目录下的canal.properties文件中指定所添加的实例名称。这里我们就使用默认的实例即可。

修改canal.deployer-1.1.5\conf\example\instance.properties文件中的如下所示信息:

1
2
3
4
5
6
7
8
9
# position info
canal.instance.master.address=127.0.0.1:3306
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
# listener database name
canal.instance.defaultDatabaseName=delaytask
# table regex
canal.instance.filter.regex=.*\\..*

最后一行表示设置监听的数据库表信息,可以监听多个表,多个之间使用逗号进行分割,此处设置的是监听所有的表。

修改RabbitMQ配置信息

由于Canal服务端默认的传输方式为tcp,因此我们需要在canal的配置文件中设置RabbitMQ相关的信息。

第一步,修改canal.deployer-1.1.5\conf\canal.properties文件中的如下所示信息:

1
2
3
4
5
6
7
8
9
10
11
12
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host = /
rabbitmq.exchange = canal.exchange
rabbitmq.username = guest
rabbitmq.password = guest
# 是否持久化,2表示是
rabbitmq.deliveryMode = 2

第二步,修改canal.deployer-1.1.5\conf\example\instance.properties文件中的如下所示信息:

1
2
# mq config
canal.mq.topic=canal.routing.key

其实就是设置RabbitMQ路由的key,这样才能将信息准确路由到指定的队列里面。

第三步,在RabbitMQ中创建交换机和队列。接着我们需要在RabbitMQ中新建一个名为canal.exchange的交换机,注意这个名字必须与canal.deployer-1.1.5\conf\canal.properties文件中设置的rabbitmq.exchange选项值保持一致。

第四步,新建一个名为canal.queue的队列,这个队列的名称随意,不过为了区分实际业务,建议此队列还是命名为canal.queue

第五步,定义交换机和队列之间的绑定键,名称为canal.routing.key,注意这个名字必须与canal.deployer-1.1.5\conf\example\instance.properties文件中设置的canal.mq.topic选项值保持一致:

启动各个服务

在完成上述准备工作后,接下来我们尝试启动MySQL,之后启动Canal服务端,只需直接双击bin目录下的startup.bat文件。或者在Canal的bin目录下打开终端,然后输入start /min startup.bat命令来启动Canal服务端:

当然了,这里有一个小坑,就是如果你的Java版本大于1.7,那么程序就会报错,因为此时永久代已经取消了,应该使用元空间。打开startup.bat文件,将其中的如下信息:

1
set JAVA_MEM_OPTS= -Xms128m -Xmx512m -XX:PermSize=128m

修改为如下后的信息:

1
JAVA_MEM_OPTS= -Xms128m -Xmx512m -XX:MetaspaceSize=128m

之后再重新执行startup.bat文件,进而启动Canal服务端。

测试一下数据

接下来使用如下命令往delaytask数据库中的order_master数据表里面新增一条信息:

1
INSERT INTO `delaytask`.`order_master`(`id`, `order_id`) VALUES (1, '2001');

然后查看一下RabbitMQ中的canal.queue队列里面的数据,可以看到里面已经有对应数据了:

这其实就是一个JSON字符串,格式化一下信息如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
{
"data": [
{
"id": "1",
"order_id": "2001",
"create_time": "2022-06-06 14:10:07",
"update_time": "2022-06-06 14:10:07",
"retry_times": "0",
"order_status": "0"
}
],
"database": "delaytask",
"es": 1654495807000,
"id": 3,
"isDdl": false,
"mysqlType": {
"id": "bigint unsigned",
"order_id": "varchar(60)",
"create_time": "datetime",
"update_time": "datetime",
"retry_times": "tinyint",
"order_status": "tinyint"
},
"old": null,
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"order_id": 12,
"create_time": 93,
"update_time": 93,
"retry_times": -6,
"order_status": -6
},
"table": "order_master",
"ts": 1654495807981,
"type": "INSERT"
}

这个JOSN字符串中将用户执行的插入操作记录的非常清楚,如表名称、方法、参数、参数类型、参数值等。

既然服务端已经通过解析MySQL的binlog日志,并传输增量数据给消息中间件,那么接下来就是客户端监听MQ来获取JSON信息,并将其解析出来为后续业务逻辑服务。

搭建客户端

客户端就比较简单,就是消费Canal服务端传递过来的信息,而它传递的消息都存在了MQ中,因此客户端就只需监听canal.queue队列。

创建消息实体类

由于MQ传递来的是JSON信息,因此接下来需要创建一个实体类来完成与JSON信息的数据绑定工作。新建一个名为CanalMessage的类,由于JSON中的data可能存在多种形式,因此可以将CanalMessage类定义为一个泛型类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Canal消息接收实体类
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CanalMessage<T> {
@JsonProperty("data")
private List<String> data;

@JsonProperty("database")
private String database;

@JsonProperty("es")
private String es;

@JsonProperty("id")
private Integer id;

@JsonProperty("isDdl")
private Boolean isDdl;

@JsonProperty("old")
private List<T> old;

@JsonProperty("pkNames")
private List<String> pkNames;

@JsonProperty("sql")
private String sql;

@JsonProperty("table")
private String table;

@JsonProperty("ts")
private String ts;

@JsonProperty("type")
private String type;
}

监听MQ消息

接下来就是监听RabbitMQ中的canal.queue队列了,这样只要Canal服务端有数据推送过来就可以及时的消费掉。

简单起见,这里笔者就通过@RabbitListener来完成RabbitMQ中交换机、队列的声明和绑定工作。新建一个名为CanalRabbitMQListener的类,里面的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 监听RabbitMQ中信息
*/
@Component
@Slf4j
public class CanalRabbitMQListener {

@RabbitListener(bindings={
@QueueBinding(
value = @Queue(value = "canal.queue",durable = "true"),
exchange = @Exchange(value = "canal.exchange"),
key = "canal.routing.key"
)
})
public void handlerCanalData(String message){
//message则是MQ中canal.queue中的消息
//将message消息转换为之前定义的CanalMessage
CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class, true);
String databaseName = canalMessage.getDatabase();
String tableName = canalMessage.getTable();
log.info("Canal监听,{}数据库的{}数据表发生变化,变化信息为{}",databaseName,tableName,message);
//其他后续的业务逻辑
}
}

当然你还需要配置RabbitMQ的相关信息,可以在application.properties配置文件中新增如下信息:

1
2
3
4
5
spring.rabbitmq.addresses=127.0.0.1
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

项目测试

接下来我们启动项目,首先往order_master数据表中新增一条记录,命令如下所示:

1
INSERT INTO `delaytask`.`order_master`(`id`, `order_id`) VALUES (2, '2002');

可以看到此时项目控制台输出如下信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Canal监听,delaytask数据库的order_master数据表发生变化,变化信息为:
{
"data": [
{
"id": "2",
"order_id": "2002",
"create_time": "2022-06-06 17:24:43",
"update_time": "2022-06-06 17:24:43",
"retry_times": "0",
"order_status": "0"
}
],
"database": "delaytask",
"es": 1654507483000,
"id": 4,
"isDdl": false,
"mysqlType": {
"id": "bigint unsigned",
"order_id": "varchar(60)",
"create_time": "datetime",
"update_time": "datetime",
"retry_times": "tinyint",
"order_status": "tinyint"
},
"old": null,
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": -5,
"order_id": 12,
"create_time": 93,
"update_time": 93,
"retry_times": -6,
"order_status": -6
},
"table": "order_master",
"ts": 1654507483543,
"type": "INSERT"
}

可以看到Canal客户端已经成功消费了MQ中的消息并接收到,后续开发者可根据实际业务进行后续逻辑开发。