Bus整合RabbitMQ
写在前面
前面我们只是单纯的介绍了消息代理、AMQP以及RabbitMQ的基础知识和基本使用,接下来我们开始学习Spring Cloud Bus组件的配置,并一个Spring Cloud Bus与Spring Cloud Config结合的例子来实现配置信息的实时更新。
不过在此之前需要回忆一下前面的知识,在学习Spring Cloud Config的时候我们学习了如何实现配置信息的实时更新:通过使用POST方法来提交/refresh接口或者Git仓库的Web Hook来手动触发,进而实现Git仓库中的内容修改导致程序属性的更新。显然这种方式是不合理的,所有操作都是人工进行,随着系统的不断扩展,这势必会导致维护成本的增加,使用消息代理中间件可以很好的解决这个问题,因为它可以将消息路由到一个或者多个目的地。
由于Spring Cloud基于Spring Boot,而在前面我们已经进行了Spring Boot和RabbitMQ的整合,那么接下来就是在Spring Cloud Bus中使用RabbitMQ了。
请注意,这里不需要创建新的应用,而是使用到前面关于Spring Cloud Config的几个工程,简单介绍如下:
- config-repo:这是一个定义在GitHub仓库的目录,其中存储了应用名为envy的多环境配置文件,且配置文件中有一个envythink参数。
- config-server:配置了Github仓库,并注册到了Eureka服务注册中心。
- config-client:通过Eureka发现Config Server的客户端,应用名为envy,用来访问配置服务器以获取配置信息。该应用中提供了一个/envythinkOne的接口,它会获取config-repo/envy-prod.properties配置文件中的envythink属性并且将值进行返回。
整合Spring Cloud Bus
接下来正式进入整合环节,首先需要扩展config-client应用,相应的步骤如下所示:
第一步,修改config-client应用的pom.xml配置文件,在里面新增spring-cloud-starter-bus-amqp
和spring-boot-starter-actuator
依赖,其中前者用来提供AMQP功能,后者用来提供刷新端点的功能:第二步,在config-client应用的application.properties配置文件中添加一些RabbitMQ的配置信息:1
2
3
4
5
6
7
8
9<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>第三步,先启动config-server项目,然后修改config-client应用的application.properties配置文件,使之能运行两个实例(cc1和cc2),如下所示:1
2
3
4
5
6
7
8
9# RabbitMQ相关设置
# 主机
spring.rabbitmq.host=localhost
# 端口
spring.rabbitmq.port=5672
# 用户名
spring.rabbitmq.username=springcloud
# 密码
spring.rabbitmq.password=springcloud这里采用IDEA自带的方式,首先编辑Edit Configuration,然后不勾选右上角的仅仅使用单例模式,接着复制一个选项卡,然后每启动一个实例就将application.properties配置文件中对应的的端口号信息给注释掉:1
2
3
4
5
6
7# 配置端口号4002
server.port=4002
eureka.instance.hostname=cc1
# 配置端口号4003
server.port=4003
eureka.instance.hostname=cc2
第四步,修改config-server项目的application.properties配置文件,在里面新增暴露所有端点的代码,当然也可以只暴露bus-refresh端点:
1 | # actuator配置 |
第五步,开始访问两个config-client项目实例的/envythinkOne
接口,它们均会返回当前config-repo/envy-prod.properties配置文件中的envythink属性的值:
第六步,修改config-repo/envy-prod.properties配置文件中的envythink属性的值,由prod-111.0修改为prod-1.0。并使用POSTMAN等测试工具发送POST请求到其中任意一个链接,如http://localhost:4002/actuator/bus-refresh
这个,得到的结果如下所示:
第七步,分别去访问之前启动的两个config-client实例的/envythinkOne接口,此时两者都会返回config-repo/envy-prod.properties配置文件中envythink属性的最新值:
通过上述几步配置,我们能通过Spring Cloud Bus来实时更新总线上的属性配置了。
原理分析
Config Server入门示例流程
在进行原理分析之前,先复习一下前面在Config Server入门例子中使用的基础架构,如下图所示:
该架构图的工作流程如下所示:当我们将该架构图中的所有服务都启动后,图中的ServiceA和ServiceB实例就会请求Config Server以获取配置信息,而Config Server则会根据应用配置的规则通过使用git clone命令从Git仓库中将配置文件克隆一份到本地,然后再将信息返回给ServiceA和ServiceB实例。
Bus整合Config入门示例流程
而本篇我们通过使用Spring Cloud Bus和Spring Cloud Config的整合,并以RabbitMQ为消息代理实现了应用配置信息的动态更新,接下来就通过分析其架构来对其工作原理进行分析,如下所示:
从上述架构图中可以看到,其中包括了Git仓库,Config Server以及几个微服务应用的实例,请注意这些微服务应用的实例中都引入了Spring Cloud Bus,所以它们都连接到了RabbitMQ的消息总线上。
当我们将该架构图中的所有服务都启动后,图中的“ServiceA”的三个实例就会请求Config Server以获取配置信息,而Config Server则会根据应用配置的规则从Git仓库中获取配置信息并返回。
现在有一个需求,就是需要修改“ServiceA”的属性,那么这个如何实现呢?首先,开发者通过Git管理工具去仓库中修改对应的属性值,但是这个修改并不会触发“ServiceA”实例的属性更新。之后开发者向“ServiceA”的实例3以POST方式向/actuator/bus-refresh
接口发送请求,此时“ServiceA”的实例3就会将刷新请求发送到消息总线中,该消息事件就会被“ServiceA”的实例1和实例2从消息总线中获取得到,并重新从Config Server中获取它们的配置信息,进而实现配置信息的动态更新。
而从Git仓库中配置的修改到向/actuator/bus-refresh
接口发送请求这一步,其实是可以通过Git仓库的Web Hook来自动触发。由于所有连接到消息总线上的应用都会接收到更新请求,因此在Web Hook中就不需要维护所有节点内容来进行更新,也就是说其实也解决了前面仅通过Web Hook来逐个刷新的问题。
指定刷新范围
在前面的例子中,我们通过向服务实例请求Spring Cloud Bus的/actuator/bus-refresh
接口,进而触发总线上其他服务实例的/actuator/refresh
接口。但是在一些特殊场景下,我们希望仅仅是刷新微服务中某个具体实例的配置,而不是将整个连接到消息总线的微服务应用的配置都给刷新。
对于这种场景,Spring Cloud Bus也提供了很好的支持,/actuator/bus-refresh
接口提供了一个destination参数用来定位具体要刷新的应用程序。举个例子,假设我们想刷新服务名为cc1的微服务应用,此时可以使用请求/actuator/bus-refresh?destination=cc1:4002
,这样消息总线上的各应用实例会根据destination的属性值来判断是否为自己的实例名,若符合则进行配置刷新,否则忽略该消息。
而关于应用的实例名称,在前面也进行了学习,它的默认名称是按照如下规则生成的:
1 | ${spring.cloud.client.hostname}:${spring.application.name}:${spring.application.instance_id}:${server.port} |
destination参数除了可以定位具体的实例之外,还可以用来定位具体的服务。其定位服务的原理是通过使用Spring的PathMatecher(匹配路径)来实现的。举个例子,当请求/actuator/bus-refresh?destination=cc1:**
的时候,该请求会使得cc1服务的所有实例都进行刷新。
架构优化
现在问题来了,Spring Cloud Bus的/actuator/bus-refresh
接口既然也提供了针对服务和实例进行配置更新的参数,那么就可以对之前的架构进行优化。在之前的架构中,服务的配置更新需要通过向具体服务的某个实例发送请求,然后触发对整个服务集群的配置更新。这样尽管能实现功能,但是会有一个问题,就是我们指定的应用实例必须不同于集群中的其他应用实例,这无疑增加了集群内部的复杂度,大大提升了运维的难度。举个例子,当我们需要对服务实例进行迁移的时候,那么就得修改Web Hook中的配置,而这不是我们希望做的事情。因此我们尽可能的要让服务集群中的各个节点是对等的。
优化后的结构图如下所示:
从图中可以看到我们对该架构做了如下改动:
(1)在Config Server中也引入Spring Cloud Bus,并将配置服务端也加入到消息总线中。
(2)/actuator/bus-refresh
请求不再发送到具体的服务实例上,而是发送给Config Server上,并通过destination参数来指定需要更新配置的服务或者实例。
通过上面的改动,我们的服务实例不再承担触发配置更新的职责,同时对于Git的触发等配置,都只需要针对Config Server即可,这样极大的简化了集群的维护工作。
RabbitMQ配置
Spring Cloud Bus中的RabbitMQ整合使用了Spring Boot的ConnectionFactory,因此在Spring Cloud Bus中支持使用以spring.rabbit.mq
为前缀的Spring Boot配置属性,具体的配置属性、说明以及默认值如下表所示:
属性名称 | 说明 | 默认值 |
---|---|---|
spring.rabbitmq.address | 客户端连接的地址,有多个的时候可以使用逗号进行分隔,该地址可以是IP和Port的结合 | |
spring.rabbitmq.cache.channel.checkout-timeout | 当缓存已满时,获取Channel的等待时间,单位为毫秒 | |
spring.rabbitmq.cache.channel.size | 缓存中保存的Channel数量 | |
spring.rabbitmq.cache.connection.mode | 连接缓存的模式 | CHANNEL |
spring.rabbitmq.cache.connection.size | 缓存的连接数 | |
spring.rabbitmq.connection-timeout | 连接超时参数,单位为毫秒;设置为“0”,代表无穷大 | |
spring.rabbitmq.dynamic | 默认创建一个AmqpAdmin的Bean | true |
spring.rabbitmq.host | RabbitMQ的主机地址 | localhost |
spring.rabbitmq.listener.acknowledge-mode | 容器的acknowledge模式 | |
spring.rabbitmq.listener.auto-startup | 启动时自动启动容器 | true |
spring.rabbitmq.listener.concurrency | 消费者的最小数量 | |
spring.rabbitmq.listener.default-requeue-rejected | 投递失败时是否重新排队 | true |
spring.rabbitmq.listener.max-concurrency | 消费者的最大数量 | |
spring.rabbitmq.listener.prefetch | 在单个请求中处理的消息个数,它应该大于或者等于事务数量 | |
spring.rabbitmq.listener.retry.enabled | 不论是不是重试都发布 | false |
spring.rabbitmq.listener.retry.max-attempts | 尝试投递消息的最大数量 | 3 |
spring.rabbitmq.listener.retry.max-interval | 两次尝试的最大时间间隔 | 10000 |
spring.rabbitmq.listener.retry.multiplier | 上一次尝试时间间隔的乘数 | 1.0 |
spring.rabbitmq.listener.retry.stateless | 不论重试是有状态的还是无状态的 | true |
spring.rabbitmq.listener.transaction-size | 在一个事务中处理的消息数量。为了获得最佳效果,该值应该设置为小于或等于每个请求中处理的消息个数,即spring.rabbitmq.listener.prefetch的值 | |
spring.rabbitmq.password | 登录到RabbitMQ的密码 | |
spring.rabbitmq.port | RabbitMQ的端口号 | 5672 |
spring.rabbitmq.publisher-confirms | 开启Publisher Confirm机制 | false |
spring.rabbitmq.publisher-returns | 开启Publisher Return机制 | false |
spring.rabbitmq.requested-heartbeat | 请求心跳超时时间,单位为秒 | |
spring.rabbitmq.ssl.enabled | 启用SSL支持 | false |
spring.rabbitmq.ssl.key-store | 保存SSL证书的地址 | |
spring.rabbitmq.ssl.key-store-password | 访问SSL证书的地址时使用的密码 | |
spring.rabbitmq.ssl.trust-store | SSL的可信地址 | |
spring.rabbitmq.ssl.trust-store-password | 访问SSL的可信地址时的密码 | |
spring.rabbitmq.ssl.algorithm | SSL算法,默认使用Rabbit的客户端算法库 | |
spring.rabbitmq.template.mandatory | 启用强制消息 | false |
spring.rabbitmq.template.receive-timeout | receive()方法的超时时间 | 0 |
spring.rabbitmq.template.reply-timeout | sendAndReceive()方法的超时时间 | 5000 |
spring.rabbitmq.template.retry.enabled | 设置为true的时候RabbitTemplate能够实现重试 | false |
spring.rabbitmq.template.retry.initial-interval | 第一次与第二次发布消息的时间间隔 | 1000 |
spring.rabbitmq.template.retry.max-attempts | 尝试发布消息的最大数量 | 3 |
spring.rabbitmq.template.retry.max-interval | 尝试发布消息的最大时间间隔 | 10000 |
spring.rabbitmq.template.retry.multiplier | 上一次尝试时间间隔的乘数 | 1.0 |
spring.rabbitmq.username | 登录到RabbitMQ的用户名 | |
spring.rabbitmq.virtual-host | 连接到RabbitMQ的虚拟主机 |
这样关于Spring Cloud Bus消息总线整合RabbitMQ的学习就到此为止,后续开始学习Spring Cloud Bus消息总线整合Kafka的相关内容。