写在前面

在微服务的架构系统中,我们通常会使用轻量级的消息代理来构建一个共用的消息主题,让系统中所有微服务实例都连接上来,由于该主题中产生的消息会被所有实例监听和消费,因此我们称之为消息总线。在消息总线上的各个实例都可以方便地广播一些需要让其他连接在该主题上的实例都知道的消息,如配置信息的变更或者其他一些管理操作。

在前面我们学习了分布式配置组件Spring Cloud Config,接下来开始学习另一个和它同等重要的组件:消息总线SpringCloud Bus,不过我们会从基础的消息代理入手,一步步的由浅入深学习Spring Cloud Bus这个消息总线组件。通过使用Spring Cloud Bus消息总线,我们可以非常容易的搭建起消息总线,同时实现了一些消息总线中的常用功能,如配合Spring Cloud Config实现微服务应用配置信息的动态更新等。

消息代理

消息代理(Message Broker)是一种消息验证、传输、路由的架构模式。它在应用程序之间起到通信调度并最小化应用之间的依赖的作用,使得应用可以高效地解耦通信过程。消息代理是一个中间件产品,它的核心是一个消息的路由程序,用来实现接收和分发消息,并根据设定好的消息处理流来转发给正确的应用。它包括独立的通信和消息传递协议,能够实现组织内部和组织间的网络通信。设计代理的目的就是为了能够从应用程序中传入消息,并执行一些特别的操作。

一般来说,在企业中以下场景都是需要使用到消息代理:(1)将消息路由到一个或多个目的地;(2)消息转化为其他的表现方式;(3)执行消息的聚集和分解,并将结果发送到它们的目的地,然后重新组合响应返回给消息用户;(4)调用Web服务来检索数据;(5)响应事件或错误;(6)使用发布-订阅模式来提供内容或基于主题的消息路由。

常用的消息中间件有很多,如ActiveMQ、Kafka、RabbitMQ、RocketMQ等,但是目前Spring Cloud Bus支持RabbitMQ和Kafka这两款,因此本文和后续主要介绍这两款消息中间件如何与Spring Cloud Bus配合实现消息总线。

RabbitMQ

AMQP (Advanced Message Queuing Protocol ,高级消息队列协议)是一个面向消息中间件的开放式标准应用层协议,同时也是一个线路层的协议规范,而不是API 规范(例如JMS)。由于AMQP 是一个线路层协议规范,因此它天然就是跨平台的,就像SMTP、HTTP等协议一样,只要开发者按照规范的格式发送数据,任何平台都可以通过AMQP进行消息交互。像目前流行的StormMQ、RabbitMQ等都实现了AMQP协议。

AMQP定义了以下特性:(1)消息方向;(2)消息队列;(3)消息路由(包括点对点和发布-订阅模式);(4)可靠性;(5)安全性。

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件,也称为面向消息的中间件。它支持多种操作系统,多种编程语言,几乎覆盖所有主流的企业级技术平台。RabbitMQ服务器是高可用性能、可伸缩而闻名的Erlang语言编写而成的,其集群和故障转移是构建在开放电信平台框架之上的。

在微服务架构消息中间件的选型中,RabbitMQ是一个优秀且适合的选择,因此Spring Cloud Bus中包含了对RabbitMQ的自动化配置。接下来将从RabbitMQ的基本概念、安装和使用等过程入手,循序渐进的学习RabbitMQ如何与Spring Cloud Bus进行整合以实现消息总线。

基本概念

接下来学习RabbitMQ中一些基本的概念,了解它们对于后续学习RabbitMQ有非常大的帮助,基本概念如下所示:

  • Broker:可以将其理解为是一个消息队列服务器的实体。它是一个中间件应用,负责接收消息生产者的消息,然后将消息发送给消息接收者或者其他的Broker。
  • Exchange:消息交换机。它是消息第一个到达的地方,消息通过它指定的路由规则分发到不同的消息队列中。
  • Queue:消息队列。消息通过发送和路由之后最终到达的地方,到达Queue的消息就进入逻辑上等待消费的状态。每个消息都会被发送到一个或多个队列。
  • Binding:绑定。它的作用就是将Exchange和Queue按照路由规则绑定起来,其实就是Exchange和Queue之间的虚拟链接。
  • Routing Key:路由关键字。Exchange根据这个关键字进行消息投递。
  • Virtual host:虚拟主机。它是对Broker的虚拟划分,将消费者、生产者和它们依赖的AMQP相关结构进行隔离,一般都是处于安全考量。开发者完全可以在一个Broker中设置多个虚拟主机,用于实现对不同用户进行权限的分离。
  • Connection:连接。连接代表了生产者、消费者、Broker之间进行通信的物理网络。
  • Channel:消息通道。它用于连接生产者和消费者的逻辑结构。在客户端的每个连接里,可建立多个Channel,每个Channel代表一个会话任务,通过Channel可以隔离同一连接中的不同交互内容。
  • Producer:消息生产者。一个用于生产并发送消息的程序。
  • Consumer:消息消费者。一个用于接收并处理消息的程序。

RabbitMQ的整体架构如下图所示:

消息投递到队列的整个过程如下所示:
(1)客户端连接到消息队列服务器,打开一个Channel;
(2)客户端声明一个Exchange,并设置相关属性;
(3)客户端声明一个Queue,并设置相关属性;
(4)客户端使用RoutingKey,在Exchange和Queue之间建立绑定关系;
(5)客户端投递消息到Exchange;
(6)Exchange接收到消息后,根据消息的Key和已经设置的Binding,进行消息路由,将消息投递到一个或者多个Queue中。


请注意Exchange消息交换机有以下三种类型,如下所示:
(1)Direct交换机:它完全根据Key进行投递。举个例子,当开发者绑定时设置Routing Key(路由关键字)为envy,那么客户端提交的消息,只有设置了Key为envy的才会被投递到消息队列中。

(2)Topic交换机:对Key进行模式匹配后才进行投递,可以使用#符号来匹配一个或者多个词,而*符号则只能匹配一个词。举个例子来说,使用envy.#可以匹配envy.hello.book.movie;而使用envy.*则只能匹配诸如envy.hello,两者的区别需要格外注意。

(3)Fanout交换机:不需要任何Key,它采取广播的模式,一个消息进来时,投递到与该交换机绑定的所有队列。


RabbitMQ支持消息的持久化,也就是将数据写入磁盘中,这是正常的操作,因为出于安全考虑,将数据持久化是正确的选择。消息队列持久化包括3个部分,分别是:
(1)Exchange持久化,在声明时指定durable=> 1。
(2)Queue持久化,在声明时指定durable=> 1。
(3)消息持久化,在投递时指定delivery_mode=>2(注意1是非持久化)。

需要说明的是:如果Exchange和Queue都是持久化的,那么它们之间的Binding也是持久化的;如果Exchange和Queue两者中有一个是持久化的,一个是非持久化的,那么两者不允许建立绑定。

关于RabbitMQ的详细使用,后续我会出一套相关的学习笔记,这里就学习到这里。

安装和使用

安装Erlang语言

由于RabbitMQ使用Erlang语言编写,因此需要先安装Erlang环境,这里以Windows系统为例来介绍RabbitMQ的安装。由于Erlang官网在国外,下载过程较慢,可以点击 这里 选择合适的系统进行下载,我这里选择了23的版本:

下载后就直接安装,点击下一步就可以,这个没什么好说的。接下来就是进行环境变量的配置,操作如下:此电脑–>鼠标右键“属性”–>高级系统设置–>环境变量–>“新建”系统环境变量:

在上面设置变量名为”ERLANG_HOME”,变量值为”D:\Application\Erlang\erl-23.0”,点击确定。接着双击系统变量path,点击“新建”,将%ERLANG_HOME%\bin加入到path中,如下所示:

最后,使用windows+R组合键,输入cmd进入DOS状态。然后输入erl -version命令,看到如下所示的版本信息则说明erlang已经安装成功了:

1
2
C:\Users\envy>erl -version
Erlang (SMP,ASYNC_THREADS) (BEAM) emulator version 11.0
安装RabbitMQ

首先点击 这里 下载RabbitMQ的安装包,当然如果觉得下载速度较慢,可以点击使用 华为镜像来进行下载。下载后就直接安装,点击下一步就可以,这个没什么好说的。

RabbitMQ安装好以后,接下来开始安装RabbitMQ-Plugins。打开终端,然后切换到RabbitMQ的sbin目录(我的是D:\Application\RabbitMQ\rabbitmq_server-3.8.0\sbin),然后执行rabbitmq-plugins enable rabbitmq_management命令来进行安装,如下所示:

接着继续执行rabbitmqctl status命令来查看Rabbitmq的状态,如下所示:

然后双击sbin目录下的rabbitmq-server.bat文件来启动rabbitmq-server,等10秒左右后,打开浏览器在其地址栏中访问http://localhost:15672链接,如下所示:

然后输入用户名和密码,其默认值都是guest,完成登录:

RabbitMQ安装完成后,默认会创建系统服务,将和Windows系统一起启动,因此可以查看系统服务,验证是否安装成功:

Rabbit管理

对于Rabbit的管理,有两种方式:访问配置文件和访问Web页面,接下来就分别介绍这两种管理方式。

Web页面管理

想使用Web页面管理,必须在Rabbit安装的sbin目录下执行rabbitmq-plugins enable rabbitmq_management命令来安装插件,开启Web管理软件的功能:

然后打开浏览器在其地址栏中访问http://localhost:15672链接,如下所示:

然后输入用户名和密码,其默认值都是guest,完成登录:

从这个页面中我们可以看到前面学习的基本概念,如Connections、Channels、Exchange、Queues等,可以多点点其他地方熟悉一下RabbitMQ Server的服务端。

接下来尝试添加一个名为springcloud的用户,可以点击Admin选项卡,如下所示:

请注意其中的Tags标签,它是RabbitMQ中的角色分类,一共有6种,接下来按照所拥有权限依次增大的顺序进行介绍,如下所示:
(1)None:表示不能访问Web管理页面;

(2)Management:表示用户可以通过AMQP做的任何事情外加如下功能:(a)列出自己可以通过AMQP登入的virtual hosts;(b)查看自己的virtual hosts中的queues、exchanges和bindings;(c)查看和关闭自己的channels和connections;(d)查看有关自己的virtual hosts的“全局”统计信息,包含其他用户在这些virtual hosts中的活动。

(3)Policymaker:表示Management可以做的任何事外加如下功能:查看、创建和删除自己的virtual hosts所属的policies和parameters。

(4)Monitoring:表示Management可以做的任何事外加如下功能:(a)列出所有的virtual hosts,包括它们不能登录的virtual hosts;(b)查看其他用户的channels和connections;(c)查看节点级别的数据,如clustering和memory的使用情况;(d)查看真正的关于所有virtual hosts的“全局”统计信息。

(5)Administrator:表示Policymaker和Monitoring可以做的任何事外加如下功能:(a)创建和删除virtual hosts;(b)查看、创建和删除users;(c)查看、创建和删除permissions;(d)关闭其他用户的connections。

注意还需要给上面的springcloud的用户设置Virtual Host,否则后续会报错,如果出错了可以点击 这里 寻找解决办法:

配置文件管理

在RabbitMQ安装的sbin目录下存在一些批处理脚本,通过使用它们可以管理RabbitMQ:

笔者这里以rabbitmqctl.bat脚本为例进行介绍,开发者可以使用如下命令来实现相应的功能:

1
2
3
4
5
6
7
8
9
10
# 查看状态
rabbitmqctl status
# 开启web插件
rabbitmq-plugins enable rabbitmq_management
# 添加一个用户名为envy,密码为123的用户
rabbitmqctl add_user envy 123
# 设置envy用户的角色为管理员
rabbitmqctl set_user_tags envy administrator
# 配置envy用户可以远程登录,并访问envy中所有资源的配置、写、读权限以便管理其中的资源
rabbitmqctl set_permissions -p / envy ".*" ".*" ".*"

快速入门

其实SpringBoot整合RabbitMQ这一场景,我在SpringBoot框架学习那一套笔记中有详细介绍,但是考虑到后续演示的需要,这里再次举一个例子,通过SpringBoot整合RabbitMQ来实现一个简单的发送、接收消息的例子来加深对于RabbitMQ的理解。

第一步,新建一个普通的SpringBoot工程,名称为rabbitmq-hello,注意使用默认的依赖。

第二步,添加项目依赖。Spring Boot工程创建完成后,修改pom.xml文件,添加如下依赖:

1
2
3
4
5
6
7
8
9
10
11
12
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>

第三步,在application.properties配置文件中配置关于RabbitMQ的连接和用户信息,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 项目名称
spring.application.name=rabbitmq-hello
# 项目端口号
server.port=4005

# RabbitMQ相关设置
# 主机
spring.rabbitmq.host=localhost
# 端口
spring.rabbitmq.port=5672
# 用户名
spring.rabbitmq.username=springcloud
# 密码
spring.rabbitmq.password=password

这里我们配置RabbitMQ的地址为localhost,端口为5672(web管理端端口是15672,而其服务本身是5672端口),用户名和密码则是之前创建的springcloud,当然也可以使用默认的guest。

第四步,创建消息生产者。接下来创建消息生产者Sender对象,在其内部通过注入AmqpTemplate接口的实例来实现消息的发送,AmqpTemplate接口内定义了一套针对AMQP协议的基础操作。而在SpringBoot中会根据配置来注入其具体实现。

在该生产者中,我们会产生一个字符串,并发送到名为hello的队列中。在rabbitmq-hello项目目录下新建component目录,然后在里面新建Sender类,相应的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class Sender {

@Autowired
private AmqpTemplate amqpTemplate;

public void send(){
String message= "hello, " + new Date();
System.out.print("Sender: "+message);
amqpTemplate.convertAndSend("hello",message);
}
}

第五步,创建消息消费者。接下来在component包内创建消息消费者Receiver对象,在该类中需要使用@RabbitListener注解来定义该类对hello队列的监听,并使用@RabbitHandler注解来指定对消息的处理方法。也就是说该消费者实现了对hello队列的消费,消费操作是以日志的形式输出消息的字符串内容:

1
2
3
4
5
6
7
8
9
@Component
@RabbitListener(queues = "hello")
public class Receiver {

@RabbitHandler
public void process(String message{
System.out.print("Receiver: "+message);
}
}

第六步,创建RabbitMQ配置类。接下来需要在rabbitmq-hello项目目录下新建一个config包,并在该包内新建一个RabbitConfig类,这是RabbitMQ的配置类,用来配置队列、交换机、路由等高级功能。考虑到这里是入门demo,因此先学习使用最基本的配置来完成一个最基本的生产和消费过程:

1
2
3
4
5
6
7
@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue(){
return new Queue("hello");
}
}

第七步,创建单元测试类。接下来需要创建一个单元测试类,用于测试消息的发送,可以直接使用项目测试主类RabbitmqHelloApplicationTests,相应的代码如下所示:

1
2
3
4
5
6
7
8
9
10
@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
class RabbitmqHelloApplicationTests {
@Autowired
private Sender sender;
@Test
void contextLoads() {
sender.send();
}
}

完成上述各步骤之后,接下来开始启动rabbitmq-hello项目了,可以看到在项目的控制台输出以下信息,这说明我们已经成功创建了一个访问127.0.0.1:5672的springcloud连接,如下所示:

1
Created new connection: rabbitConnectionFactory#241e8ea6:0/SimpleConnection@650eab8 [delegate=amqp://springcloud@127.0.0.1:5672/, localPort= 5150]

开发者此时也可以去Web管理页面查询:

接着去执行项目测试类的contextLoads方法(注意rabbitmq-hello项目此时还需处于运行状态),来发送一条消息,可以发现此时单元测试的控制台输出以下信息,这说明消息被发送到了RabbitMQ Server的hello队列中:

同时在项目的控制台中也可以看到如下类似的输出,这说明消费者对hello队列的监听程序执行了,并输出了接收到的信息:

通过上面的例子,我们发现用户只需在SpringBoot应用中引入spring-boot-starter-amqp依赖,然后进行简单的配置就完成了对RabbitMQ的消息生产和消费的设置内容。

上面只是一个入门级别的例子,之前提到的一些基本概念,如消息交换机、路由关键字、绑定、虚拟主机等都还没有介绍,关于这些笔者会在另一套学习RabbitMQ的系列笔记中进行介绍学习。在这里我们需要理解的是,在整个生产消费过程中,生产和消费是一个异步操作,这也是在分布式系统中使用消息代理的重要原因,它非常符合微服务的思想,这样我们可以使用通信来解耦业务逻辑。

为了证明它的确可以做到业务解耦,我们可以尝试在上面的例子中,不运行消费者,只运行生产者,然后观察RabbitMQ Server的管理页面,可以发现在Queues选项卡下面多了一些待处理的消息:

这时候我们再启动消费者,它就会处理这些消息,因此通过生产消费模式的异步操作,系统间调用就没有同步调用需要那么高的实时性要求,同时也更容易控制处理的吞吐量以保证系统的正常运行。