SpringBoot整合Canal实现数据增量同步
写在前面
在实际工作中经常会遇到需要同步数据的场景,在业务和数据量比较小的情况下,我们会在项目中通过编写一些定时任务来同步数据。但是随着业务系统的迭代、数据量的增多以及多种复杂场景下的分库分表实现,使得数据同步变得越来越具挑战性。针对这种情况,笔者决定使用阿里开源的Canal这一中间件来解决数据同步问题。
Canal
Canal简说
Canal是阿里巴巴开源的一款基于MySQL数据库的增量日志解析中间件,提供增量数据的订阅和消费功能。由于它是基于MySQL的日志进行的增量解析,因此对原有的业务代码完全不侵入。
Canal的工作原理是解析MySQL的binlog日志,提供增量数据的订阅和消费功能,具体包括:数据库镜像、数据库实时备份、业务缓存刷新、带业务逻辑的增量数据处理以及索引构建和实时维护(拆分异构索引和倒排索引)。
Canal官方文档,点击 这里 进行查阅。
Canal如何传输数据
下图是Canal官方展示的Canal传输数据架构图,可以看到Canal支持RabbitMQ、RocketMQ或者ES等常用的消息中间件:
从上图中可以知道Canal分为服务端和客户端,一般阿里开源的程序都是这个规范。其中服务端负责解析MySQL的binlog日志,并传输增量数据给客户端或者消息中间件;而客户端负责解析服务端传递过来的数据,并处理自己的业务逻辑。
当然,数据同步除了本篇学习的Canal外,还有其他一些优秀的开源工具,它们均支持MySQL,但是也存在一些区别。下面笔者将从支持实时同步、增量同步、需要编写保存变更数据的业务代码以及社区活跃度这4个方面进行对比,如下表所示:
Canal | Debezium | DataX | Databus | Flinkx | Bifrost | |
---|---|---|---|---|---|---|
实时同步 | 支持 | 支持 | 不支持 | 支持 | 支持 | 支持 |
增量同步 | 支持 | 支持 | 不支持 | 支持 | 支持 | 支持 |
是否需要自己编写保存变更数据的代码 | 是 | 是 | 否 | 是 | 是 | 否 |
社区活跃度 | 高 | 高 | 高 | 不高 | 一般 | 一般 |
接下来将以Canal为例,来学习如何进行数据的同步。
搭建服务端
安装Canal服务端
点击 这里 下载Canal的服务端,即名称为canal.developer
前缀的包:
解压后的目录如下所示:
其中bin里面存放的是启动和停止脚本;conf目录存放的是配置文件;lib目录存放的是项目所依赖的包;logs目录存放的是项目运行的日志文件;plugin目录存放的是插件。其实这个canal服务端是一个SpringBoot项目,可以从其启动脚本中看出:
修改MySQL配置信息
第一步,找到MySQL安装目录下的my.ini
文件,在里面新增如下配置信息:
1 | #开启binlog |
第二步,设置Canal服务端中关于MySQL的相关配置信息,即告诉Canal应该去监听哪个数据库的哪个表的日志文件。
注意一个Canal服务端中可以监听多个MySQL实例,Canal默认自带了一个example实例(在其conf/example
目录下有一个名为instance.properties
的文件),本篇笔记就以example为例来进行学习。如果开发者需要增加实例,那么需要复制example文件夹中的内容到同级目录中,并在同级目录下的canal.properties
文件中指定所添加的实例名称。这里我们就使用默认的实例即可。
修改canal.deployer-1.1.5\conf\example\instance.properties
文件中的如下所示信息:
1 | # position info |
最后一行表示设置监听的数据库表信息,可以监听多个表,多个之间使用逗号进行分割,此处设置的是监听所有的表。
修改RabbitMQ配置信息
由于Canal服务端默认的传输方式为tcp,因此我们需要在canal的配置文件中设置RabbitMQ相关的信息。
第一步,修改canal.deployer-1.1.5\conf\canal.properties
文件中的如下所示信息:
1 | # tcp, kafka, rocketMQ, rabbitMQ |
第二步,修改canal.deployer-1.1.5\conf\example\instance.properties
文件中的如下所示信息:
1 | # mq config |
其实就是设置RabbitMQ路由的key,这样才能将信息准确路由到指定的队列里面。
第三步,在RabbitMQ中创建交换机和队列。接着我们需要在RabbitMQ中新建一个名为canal.exchange
的交换机,注意这个名字必须与canal.deployer-1.1.5\conf\canal.properties
文件中设置的rabbitmq.exchange
选项值保持一致。
第四步,新建一个名为canal.queue
的队列,这个队列的名称随意,不过为了区分实际业务,建议此队列还是命名为canal.queue
。
第五步,定义交换机和队列之间的绑定键,名称为canal.routing.key
,注意这个名字必须与canal.deployer-1.1.5\conf\example\instance.properties
文件中设置的canal.mq.topic
选项值保持一致:
启动各个服务
在完成上述准备工作后,接下来我们尝试启动MySQL,之后启动Canal服务端,只需直接双击bin目录下的startup.bat
文件。或者在Canal的bin目录下打开终端,然后输入start /min startup.bat
命令来启动Canal服务端:
当然了,这里有一个小坑,就是如果你的Java版本大于1.7,那么程序就会报错,因为此时永久代已经取消了,应该使用元空间。打开startup.bat
文件,将其中的如下信息:
1 | set JAVA_MEM_OPTS= -Xms128m -Xmx512m -XX:PermSize=128m |
修改为如下后的信息:
1 | JAVA_MEM_OPTS= -Xms128m -Xmx512m -XX:MetaspaceSize=128m |
之后再重新执行startup.bat
文件,进而启动Canal服务端。
测试一下数据
接下来使用如下命令往delaytask数据库中的order_master
数据表里面新增一条信息:
1 | INSERT INTO `delaytask`.`order_master`(`id`, `order_id`) VALUES (1, '2001'); |
然后查看一下RabbitMQ中的canal.queue
队列里面的数据,可以看到里面已经有对应数据了:
这其实就是一个JSON字符串,格式化一下信息如下所示:
1 | { |
这个JOSN字符串中将用户执行的插入操作记录的非常清楚,如表名称、方法、参数、参数类型、参数值等。
既然服务端已经通过解析MySQL的binlog日志,并传输增量数据给消息中间件,那么接下来就是客户端监听MQ来获取JSON信息,并将其解析出来为后续业务逻辑服务。
搭建客户端
客户端就比较简单,就是消费Canal服务端传递过来的信息,而它传递的消息都存在了MQ中,因此客户端就只需监听canal.queue
队列。
创建消息实体类
由于MQ传递来的是JSON信息,因此接下来需要创建一个实体类来完成与JSON信息的数据绑定工作。新建一个名为CanalMessage的类,由于JSON中的data可能存在多种形式,因此可以将CanalMessage类定义为一个泛型类:
1 | /** |
监听MQ消息
接下来就是监听RabbitMQ中的canal.queue
队列了,这样只要Canal服务端有数据推送过来就可以及时的消费掉。
简单起见,这里笔者就通过@RabbitListener
来完成RabbitMQ中交换机、队列的声明和绑定工作。新建一个名为CanalRabbitMQListener
的类,里面的代码如下所示:
1 | /** |
当然你还需要配置RabbitMQ的相关信息,可以在application.properties
配置文件中新增如下信息:
1 | spring.rabbitmq.addresses=127.0.0.1 |
项目测试
接下来我们启动项目,首先往order_master
数据表中新增一条记录,命令如下所示:
1 | INSERT INTO `delaytask`.`order_master`(`id`, `order_id`) VALUES (2, '2002'); |
可以看到此时项目控制台输出如下信息:
1 | Canal监听,delaytask数据库的order_master数据表发生变化,变化信息为: |
可以看到Canal客户端已经成功消费了MQ中的消息并接收到,后续开发者可根据实际业务进行后续逻辑开发。