写在前面

前面我们只是单纯的介绍了消息代理、AMQP以及RabbitMQ的基础知识和基本使用,接下来我们开始学习Spring Cloud Bus组件的配置,并一个Spring Cloud Bus与Spring Cloud Config结合的例子来实现配置信息的实时更新。

不过在此之前需要回忆一下前面的知识,在学习Spring Cloud Config的时候我们学习了如何实现配置信息的实时更新:通过使用POST方法来提交/refresh接口或者Git仓库的Web Hook来手动触发,进而实现Git仓库中的内容修改导致程序属性的更新。显然这种方式是不合理的,所有操作都是人工进行,随着系统的不断扩展,这势必会导致维护成本的增加,使用消息代理中间件可以很好的解决这个问题,因为它可以将消息路由到一个或者多个目的地。

由于Spring Cloud基于Spring Boot,而在前面我们已经进行了Spring Boot和RabbitMQ的整合,那么接下来就是在Spring Cloud Bus中使用RabbitMQ了。

请注意,这里不需要创建新的应用,而是使用到前面关于Spring Cloud Config的几个工程,简单介绍如下:

  • config-repo:这是一个定义在GitHub仓库的目录,其中存储了应用名为envy的多环境配置文件,且配置文件中有一个envythink参数。
  • config-server:配置了Github仓库,并注册到了Eureka服务注册中心。
  • config-client:通过Eureka发现Config Server的客户端,应用名为envy,用来访问配置服务器以获取配置信息。该应用中提供了一个/envythinkOne的接口,它会获取config-repo/envy-prod.properties配置文件中的envythink属性并且将值进行返回。

    整合Spring Cloud Bus

    接下来正式进入整合环节,首先需要扩展config-client应用,相应的步骤如下所示:
    第一步,修改config-client应用的pom.xml配置文件,在里面新增spring-cloud-starter-bus-amqpspring-boot-starter-actuator依赖,其中前者用来提供AMQP功能,后者用来提供刷新端点的功能:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
    </dependency>

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    第二步,在config-client应用的application.properties配置文件中添加一些RabbitMQ的配置信息:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    # RabbitMQ相关设置
    # 主机
    spring.rabbitmq.host=localhost
    # 端口
    spring.rabbitmq.port=5672
    # 用户名
    spring.rabbitmq.username=springcloud
    # 密码
    spring.rabbitmq.password=springcloud
    第三步,先启动config-server项目,然后修改config-client应用的application.properties配置文件,使之能运行两个实例(cc1和cc2),如下所示:
    1
    2
    3
    4
    5
    6
    7
    # 配置端口号4002
    server.port=4002
    eureka.instance.hostname=cc1

    # 配置端口号4003
    server.port=4003
    eureka.instance.hostname=cc2
    这里采用IDEA自带的方式,首先编辑Edit Configuration,然后不勾选右上角的仅仅使用单例模式,接着复制一个选项卡,然后每启动一个实例就将application.properties配置文件中对应的的端口号信息给注释掉:

第四步,修改config-server项目的application.properties配置文件,在里面新增暴露所有端点的代码,当然也可以只暴露bus-refresh端点:

1
2
# actuator配置
management.endpoints.web.exposure.include=*

第五步,开始访问两个config-client项目实例的/envythinkOne接口,它们均会返回当前config-repo/envy-prod.properties配置文件中的envythink属性的值:

第六步,修改config-repo/envy-prod.properties配置文件中的envythink属性的值,由prod-111.0修改为prod-1.0。并使用POSTMAN等测试工具发送POST请求到其中任意一个链接,如http://localhost:4002/actuator/bus-refresh这个,得到的结果如下所示:

第七步,分别去访问之前启动的两个config-client实例的/envythinkOne接口,此时两者都会返回config-repo/envy-prod.properties配置文件中envythink属性的最新值:

通过上述几步配置,我们能通过Spring Cloud Bus来实时更新总线上的属性配置了。

原理分析

Config Server入门示例流程

在进行原理分析之前,先复习一下前面在Config Server入门例子中使用的基础架构,如下图所示:

该架构图的工作流程如下所示:当我们将该架构图中的所有服务都启动后,图中的ServiceA和ServiceB实例就会请求Config Server以获取配置信息,而Config Server则会根据应用配置的规则通过使用git clone命令从Git仓库中将配置文件克隆一份到本地,然后再将信息返回给ServiceA和ServiceB实例。

Bus整合Config入门示例流程

而本篇我们通过使用Spring Cloud Bus和Spring Cloud Config的整合,并以RabbitMQ为消息代理实现了应用配置信息的动态更新,接下来就通过分析其架构来对其工作原理进行分析,如下所示:

从上述架构图中可以看到,其中包括了Git仓库,Config Server以及几个微服务应用的实例,请注意这些微服务应用的实例中都引入了Spring Cloud Bus,所以它们都连接到了RabbitMQ的消息总线上。

当我们将该架构图中的所有服务都启动后,图中的“ServiceA”的三个实例就会请求Config Server以获取配置信息,而Config Server则会根据应用配置的规则从Git仓库中获取配置信息并返回。

现在有一个需求,就是需要修改“ServiceA”的属性,那么这个如何实现呢?首先,开发者通过Git管理工具去仓库中修改对应的属性值,但是这个修改并不会触发“ServiceA”实例的属性更新。之后开发者向“ServiceA”的实例3以POST方式向/actuator/bus-refresh接口发送请求,此时“ServiceA”的实例3就会将刷新请求发送到消息总线中,该消息事件就会被“ServiceA”的实例1和实例2从消息总线中获取得到,并重新从Config Server中获取它们的配置信息,进而实现配置信息的动态更新。

而从Git仓库中配置的修改到向/actuator/bus-refresh接口发送请求这一步,其实是可以通过Git仓库的Web Hook来自动触发。由于所有连接到消息总线上的应用都会接收到更新请求,因此在Web Hook中就不需要维护所有节点内容来进行更新,也就是说其实也解决了前面仅通过Web Hook来逐个刷新的问题。

指定刷新范围

在前面的例子中,我们通过向服务实例请求Spring Cloud Bus的/actuator/bus-refresh接口,进而触发总线上其他服务实例的/actuator/refresh接口。但是在一些特殊场景下,我们希望仅仅是刷新微服务中某个具体实例的配置,而不是将整个连接到消息总线的微服务应用的配置都给刷新。

对于这种场景,Spring Cloud Bus也提供了很好的支持,/actuator/bus-refresh接口提供了一个destination参数用来定位具体要刷新的应用程序。举个例子,假设我们想刷新服务名为cc1的微服务应用,此时可以使用请求/actuator/bus-refresh?destination=cc1:4002,这样消息总线上的各应用实例会根据destination的属性值来判断是否为自己的实例名,若符合则进行配置刷新,否则忽略该消息。

而关于应用的实例名称,在前面也进行了学习,它的默认名称是按照如下规则生成的:

1
${spring.cloud.client.hostname}:${spring.application.name}:${spring.application.instance_id}:${server.port}

destination参数除了可以定位具体的实例之外,还可以用来定位具体的服务。其定位服务的原理是通过使用Spring的PathMatecher(匹配路径)来实现的。举个例子,当请求/actuator/bus-refresh?destination=cc1:**的时候,该请求会使得cc1服务的所有实例都进行刷新。

架构优化

现在问题来了,Spring Cloud Bus的/actuator/bus-refresh接口既然也提供了针对服务和实例进行配置更新的参数,那么就可以对之前的架构进行优化。在之前的架构中,服务的配置更新需要通过向具体服务的某个实例发送请求,然后触发对整个服务集群的配置更新。这样尽管能实现功能,但是会有一个问题,就是我们指定的应用实例必须不同于集群中的其他应用实例,这无疑增加了集群内部的复杂度,大大提升了运维的难度。举个例子,当我们需要对服务实例进行迁移的时候,那么就得修改Web Hook中的配置,而这不是我们希望做的事情。因此我们尽可能的要让服务集群中的各个节点是对等的。

优化后的结构图如下所示:

从图中可以看到我们对该架构做了如下改动:
(1)在Config Server中也引入Spring Cloud Bus,并将配置服务端也加入到消息总线中。
(2)/actuator/bus-refresh请求不再发送到具体的服务实例上,而是发送给Config Server上,并通过destination参数来指定需要更新配置的服务或者实例。

通过上面的改动,我们的服务实例不再承担触发配置更新的职责,同时对于Git的触发等配置,都只需要针对Config Server即可,这样极大的简化了集群的维护工作。

RabbitMQ配置

Spring Cloud Bus中的RabbitMQ整合使用了Spring Boot的ConnectionFactory,因此在Spring Cloud Bus中支持使用以spring.rabbit.mq为前缀的Spring Boot配置属性,具体的配置属性、说明以及默认值如下表所示:

属性名称 说明 默认值
spring.rabbitmq.address 客户端连接的地址,有多个的时候可以使用逗号进行分隔,该地址可以是IP和Port的结合
spring.rabbitmq.cache.channel.checkout-timeout 当缓存已满时,获取Channel的等待时间,单位为毫秒
spring.rabbitmq.cache.channel.size 缓存中保存的Channel数量
spring.rabbitmq.cache.connection.mode 连接缓存的模式 CHANNEL
spring.rabbitmq.cache.connection.size 缓存的连接数
spring.rabbitmq.connection-timeout 连接超时参数,单位为毫秒;设置为“0”,代表无穷大
spring.rabbitmq.dynamic 默认创建一个AmqpAdmin的Bean true
spring.rabbitmq.host RabbitMQ的主机地址 localhost
spring.rabbitmq.listener.acknowledge-mode 容器的acknowledge模式
spring.rabbitmq.listener.auto-startup 启动时自动启动容器 true
spring.rabbitmq.listener.concurrency 消费者的最小数量
spring.rabbitmq.listener.default-requeue-rejected 投递失败时是否重新排队 true
spring.rabbitmq.listener.max-concurrency 消费者的最大数量
spring.rabbitmq.listener.prefetch 在单个请求中处理的消息个数,它应该大于或者等于事务数量
spring.rabbitmq.listener.retry.enabled 不论是不是重试都发布 false
spring.rabbitmq.listener.retry.max-attempts 尝试投递消息的最大数量 3
spring.rabbitmq.listener.retry.max-interval 两次尝试的最大时间间隔 10000
spring.rabbitmq.listener.retry.multiplier 上一次尝试时间间隔的乘数 1.0
spring.rabbitmq.listener.retry.stateless 不论重试是有状态的还是无状态的 true
spring.rabbitmq.listener.transaction-size 在一个事务中处理的消息数量。为了获得最佳效果,该值应该设置为小于或等于每个请求中处理的消息个数,即spring.rabbitmq.listener.prefetch的值
spring.rabbitmq.password 登录到RabbitMQ的密码
spring.rabbitmq.port RabbitMQ的端口号 5672
spring.rabbitmq.publisher-confirms 开启Publisher Confirm机制 false
spring.rabbitmq.publisher-returns 开启Publisher Return机制 false
spring.rabbitmq.requested-heartbeat 请求心跳超时时间,单位为秒
spring.rabbitmq.ssl.enabled 启用SSL支持 false
spring.rabbitmq.ssl.key-store 保存SSL证书的地址
spring.rabbitmq.ssl.key-store-password 访问SSL证书的地址时使用的密码
spring.rabbitmq.ssl.trust-store SSL的可信地址
spring.rabbitmq.ssl.trust-store-password 访问SSL的可信地址时的密码
spring.rabbitmq.ssl.algorithm SSL算法,默认使用Rabbit的客户端算法库
spring.rabbitmq.template.mandatory 启用强制消息 false
spring.rabbitmq.template.receive-timeout receive()方法的超时时间 0
spring.rabbitmq.template.reply-timeout sendAndReceive()方法的超时时间 5000
spring.rabbitmq.template.retry.enabled 设置为true的时候RabbitTemplate能够实现重试 false
spring.rabbitmq.template.retry.initial-interval 第一次与第二次发布消息的时间间隔 1000
spring.rabbitmq.template.retry.max-attempts 尝试发布消息的最大数量 3
spring.rabbitmq.template.retry.max-interval 尝试发布消息的最大时间间隔 10000
spring.rabbitmq.template.retry.multiplier 上一次尝试时间间隔的乘数 1.0
spring.rabbitmq.username 登录到RabbitMQ的用户名
spring.rabbitmq.virtual-host 连接到RabbitMQ的虚拟主机

这样关于Spring Cloud Bus消息总线整合RabbitMQ的学习就到此为止,后续开始学习Spring Cloud Bus消息总线整合Kafka的相关内容。