本篇来学习消息服务相关的知识,消息服务在一些大型的项目架构中才会使用,但是对于小项目也是可以使用的。

消息队列(Message Queue)是一种进程间或者线程间的异步通信方式,使用消息队列,消息生产者在产生消息后,会将消息保存在消息队列中 ,直到消息消费者来取走它,即消息的发送者和接收者不需要同时与消息队列交互。使用消息队列可以有效实现服务的解耦,并提供系统的可靠性以及扩展性。目前开源的消息队列服务非常多,如Apache ActiveMQ、RabbitMQ等,这些产品也就是常说的消息中间件。

JMS

JMS简介

JMS(Java Message Service)即Java消息服务,它通过统一Java API层面的标准,使得多个客户端可以通过JMS进行交互,大部分的消息中间件提供商都对JMS提供支持。JMS和ActiveMQ的关系就像是JDBC和JDBC驱动一样的关系。JMS包括两种消息模型:点对点和发布者/订阅者,同时JMS仅仅支持Java平台。

SpringBoot整合JMS

由于JMS只是一套标准,因此SpringBoot整合JMS必然是整合JMS的某一个实现,本例子以ActiveMQ为例介绍SpringBoot如何整合JMS。

ActiveMQ简介

Apache ActiveMQ是一个开源的消息中间件,它不仅完全支持JMS1.1规范,而且支持多种编程语言,例如C、C++、C#、Delphi、Erlang、Adobe Flash、Haskell、Java、JavaScript、Perl、PHP、Pike、Python和Ruby等,也支持多种协议,例如OpenWire、REST、STOMP、WS-Notification、MQTT、XMPP以及AMQP。Apache ActiveMQ 提供了对Spring 框架的支持,可以非常容易地嵌入Spring中,同时它也提供了集群支持。

ActiveMQ安装

通常来说,ActiveMQ都是安装在Linux上的,因此本例子以安装在CentOS7为例,ActiveMQ的版本为5.15.12(注意要运行ActiveMQ的话,CentOS7上必须安装Java运行环境),相应的安装步骤为:

第一步,检查是否安装java。使用java -version命令来检测本地是否安装了java,可以看到确实安装,如果没有安装只需要使用yum -y install java安装即可:

1
2
3
4
[root@iZthsx5glu6ohyZ ~]# java -version
openjdk version "1.8.0_252"
OpenJDK Runtime Environment (build 1.8.0_252-b09)
OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)

第二步,下载ActiveMQ安装包。使用如下命令来下载ActiveMQ安装包:

1
wget https://mirrors.tuna.tsinghua.edu.cn/apache/activemq/5.15.12/apache-activemq-5.15.12-bin.tar.gz

第三步,解压下载的ActiveMQ安装包。使用如下命令来解压下载的ActiveMQ安装包:

1
tar -zxvf apache-activemq-5.15.12-bin.tar.gz

第四步,启动ActiveMQ。依次执行以下命令来启动ActiveMQ:

1
2
cd apache-activemq-5.15.12/bin/
./activemq start

第五步,检测ActiveMQ是否启动即访问。开发者可以使用ps -ef|grep activemq命令来检测ActiveMQ是否启动成功,若成功则可以进行访问。首先关闭CentOS7的防火墙(如果ActiveMQ是运行在云服务器上,那么还需要开放8161端口的访问权限),然后在浏览器地址栏中输入http://47.100.175.12:8161,(47.100.175.12)是云服务器的公网地址,而8161则是ActiveMQ的默认端口号,如果能看到如下页面,则表示ActiveMQ已经成功启动了:

ActiveMQ启动成功后,单击Manage ActiveMQ borker超链接进入管理员控制台,默认用户和密码都是admin,如下图所示:

SpringBoot整合ActiveMQ

SpringBoot为ActiveMQ配置提供了相应的启动器,因此整合起来非常方便。

第一步,新建项目并添加依赖。使用spring Initializr构建工具构建一个SpringBoot的Web应用,名称为activemqspringboot,然后在pom.xml文件中添加如下依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<!--添加activemq依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--添加lombok依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

第二步,修改配置文件。接着在application.properties配置文件中添加ActiveMQ的连接信息:

1
2
3
4
5
6
7
8
# 设置ActiveMQ的广播地址
spring.activemq.broker-url=tcp://47.100.175.12:61616
# 设置ActiveMQ信任所有的包,用于支持发送对象消息
spring.activemq.packages.trust-all=true
# 设置ActiveMQ的用户名
spring.activemq.user=admin
# 设置ActiveMQ的用户密码
spring.activemq.password=admin

第三步,添加消息队列Bean。接下来在项目启动类中提供一个消息队列Bean,该Bean的实例就由ActiveMQ提供,相应的代码如下:

1
2
3
4
5
6
7
8
9
10
@SpringBootApplication
public class ActivemqspringbootApplication {
public static void main(String[] args) {
SpringApplication.run(ActivemqspringbootApplication.class, args);
}
@Bean
Queue queue(){
return new ActiveMQQueue("amq");
}
}

请注意上面的Queue是来自于javax.jms.Queue包的,不要导入错误的包。

第四步,新建Message实体类。新建一个pojo包,然后在里面创建一个Message实体类,且需要实现Serializable 接口,相应的代码为:

1
2
3
4
5
@Data
public class Message implements Serializable {
private String content;
private Date date;
}

第五步,新建JMS组件类。新建一个component包,接着在里面创建一个JMS组件类用来完成消息的发送和接收,也就是JmsComponent类,其中的代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
public class JmsComponent {
@Autowired
JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
Queue queue;

public void send(Message msg){
jmsMessagingTemplate.convertAndSend(this.queue,msg);
}

@JmsListener(destination = "amq")
public void receive(Message msg){
System.out.println("receive:"+msg);
}
}

是不是觉得这个和之前Redis的操作非常类似,不过这里使用的是JmsMessagingTemplate而不是RedisTemplateJmsMessagingTemplate是由Spring提供的一个JMS消息发送模板,可以用来方便地进行消息的发送,消息发送方法convertAndSend,该方法的第一个参数就是消息队列,第二个参数是消息的内容。本例子用于演示一个对象消息。@JmsListener注解表示相应的方法是一个消息消费者,消息消费者订阅的消息destination为amq。经过以上配置就可以在SpringBoot中使用ActiveMQ了。

*第六步,测试。编写一个测试类,用于对消息发送进行测试(其实就是对之前定义的JmsComponent进行测试),相应的代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
@RunWith(SpringRunner.class)
@SpringBootTest
public class JmsComponentTest {
@Autowired
JmsComponent jmsComponent;
@Test
public void contextLoads(){
Message msg = new Message();
msg.setContent("hello,jms!");
msg.setDate(new Date());
jmsComponent.send(msg);
}
}

既然是对之前定义的JmsComponent进行测试,那么必然需要导入这个JmsComponent组件,然后调用该组件内的send方法,来发送一个Message对象。

确认ActiveMQ已经启动后,再执行该测试方法,可以看到SpringBoot项目的启动日志中出现了一些信息,这表明消息队列已经收到了生产者发送的信息:

1
receive:Message(content=hello,jms!, date=Wed Nov 27 11:52:33 CST 2019)

这样就完成了在SpringBoot中整合ActiveMQ的相关功能。

AMQP

AMQP简介

AMQP (Advanced Message Queuing Protocol ,高级消息队列协议)是一个线路层的协议规范,而不是API 规范(例如JMS)。由于AMQP 是一个线路层协议规范,因此它天然就是跨平台的,就像SMTP、HTTP等协议一样,只要开发者按照规范的格式发送数据,任何平台都可以通过AMQP进行消息交互。像目前流行的StormMQ、RabbitMQ等都实现了AMQP协议。

SpringBoot整合AMQP

和JMS一样,使用AMQP也是使用AMQP的某个实现,本例子以RabbitMQ为例,介绍AMQP的使用。

RabbitMQ简介

R且bbitMQ是一个实现了AMQP的开源消息中间件,使用高性能的Erlang编写。RabbitMQ具有可靠性、支持多种协议、高可用、支持消息集群以及多语言客户端等特点,在分布式系统中存储转发消息,具有不错的性能表现。

RabbitMQ的安装

由于RabbitMQ使用Erlang语言编写,因此需要先安装Erlang环境,同样也是在CentOS7系统上安装Erlang21.0,相应的步骤如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
# 下载安装包
wget http://erlang.org/download/otp_src_21.0.tar.gz
# 解压文件
tar -zxvf otp_src_21.0.tar.gz
cd otp_src_21.0
# 编译
./otp_build autoconf
./configure
make
# 安装
make install
# 检验
erl

最后一步是检验,如果看到如图所示的页面,则显示Erlang已经安装成功了:

Erlang安装成功后,接下来开始安装RabbitMQ。由于yum仓库中默认的Erlang版本较低,因此需要先将最新的Erlang包添加到yum源中,执行如下命令

1
vi /etc/yum.repos.d/rabbtimq-erlang.repo

然后在其中新增以下内容:

1
2
3
4
5
6
7
[rabbitmq-erlang]
name=rabbitmq-erlang
baseurl=https://dl.bintray.com/rabbitmq/rpm/erlang/21/el/7
gpgcheck=1
gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
repo_gpgcheck=0
enabled=1

添加成功后,清除原有缓存并创建新缓存,使用的命令如下:

1
2
yum clean all
yum makecache

准备工作完成后,接下来就可以正式安装RabbitMQ了。首先下砸安装包,使用的命令为:

1
wget https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.7/rabbitmq-server-3.7.7-1.el7.noarch.rpm

下载完成后,使用如下命令进行安装:

1
yum install rabbitmq-server-3.7.7-1.el7.noarch.rpm

安装过程中若提示缺少socat依赖,可以使用如下命令安装该依赖yum -y install socat。之后继续执行上述命令,如果还抛出以下错误:

1
2
3
4
5
6
Error: Package: rabbitmq-server-3.7.7-1.el7.noarch (/rabbitmq-server-3.7.7-1.el7.noarch)
Requires: erlang >= 19.3
Available: erlang-R16B-03.18.el7.x86_64 (epel)
erlang = R16B-03.18.el7
You could try using --skip-broken to work around the problem
You could try running: rpm -Va --nofiles --nodigest

尝试使用这个命令rpm -ivh --nodeps rabbitmq-server-3.7.7-1.el7.noarch.rpm进行安装。等安装成功后,接下来就可以启动RabbitMQ,并进行用户管理,使用的命令如下:

1
2
# 启动RabbitMQ
service rabbitmq-server start

若无法启动且出现下面所示的错误:

请删除/var/lib/rabbitmq/mnesia目录下存在的:rabbit@localhost.pid、rabbit@localhost和rabbit@localhost-plugins-expand三个文件,然后再使用service rabbitmq-server start命令就能正常启动了。如果还出现问题,请点击这里进行查阅。

接着继续执行以下命令:

1
2
3
4
5
6
7
8
9
10
11
12
# 查看状态
rabbitmqctl status
# 开启web插件
rabbitmq-plugins enable rabbitmq_management
# 重启
service rabbitmq-server restart
# 添加一个用户名为envy,密码为123的用户
rabbitmqctl add_user envy 123
# 设置envy用户的角色为管理员
rabbitmqctl set_user_tags envy administrator
# 配置envy用户可以远程登录,并访问envy中所有资源的配置、写、读权限以便管理其中的资源
rabbitmqctl set_permissions -p / envy ".*" ".*" ".*"

注意,如果是云服务器,还需要配置防火墙,且在安全组内开放5672和15672端口的访问权限:

RabbitMQ启动成功后,默认有一个guest用户,但是该用户只能在本地登录,无法远程登录,因此本案例中添加了一个新的用户envy,它也具有管理员身份,同时可以远程登录。当RabbitMQ启动成功后,在本地浏览器地址栏中输入http://47.100.175.12:15672/(服务器用公网IP),如果出现下图所示的界面,则表明RabbitMQ已经启动成功:

之后使用envy/123就可以进行登录了,登录成功后的界面如下图所示:

SpringBoot整合AMQP

SpringBoot为AMQP提供了自动化配置依赖spring-boot-starter-amqp,接下来学习如何在SpringBoot中整合AMQP。

第一步,新建项目并添加依赖。使用spring Initializr构建工具构建一个SpringBoot的Web应用,名称为rabbitmqspringboot,然后在pom.xml文件中添加如下依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<!--添加amqp依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--添加lombok依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

第二步,修改配置文件。接着在application.properties配置文件中添加RabbitMQ的连接信息:

1
2
3
4
5
6
7
8
# 设置RabbitMQ的主机地址
spring.rabbitmq.host=47.100.175.12
# 设置RabbitMQ的主机端口
spring.rabbitmq.port=5672
# 设置RabbitMQ的用户名
spring.rabbitmq.username=admin
# 设置RabbitMQ的用户密码
spring.rabbitmq.password=xxxxxx

第三步,RabbitMQ配置。接下来进行RabbitMQ配置,在RabbitMQ中,所有的消息生产者提交的消息都会交由Exchange进行再分配,Exchange会根据不同的策略将消息分发到不同的Queue中。RabbitMQ中一共提供了4种不同的Exchange策略,分别是Direct、Fanout、Topic和Header,前3种使用的频率较高,第4种使用额频率较低,下面分别对这4种不同的ExchangeType予以介绍。
(1)Direct。DirectExchange的路由策略是将消息队列绑定到一个DirectExchange上,当一条消息到达DirectExchange时,会被转发到与该条消息routingkey相同的Queue上,例如消息队列名为“hello-queue”,则routingkey为“hello-queue”的消息会被该消息队列所接收。

DirectExchange的配置如下,新建一个RabbitDirectConfig类,其中的代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Configuration
public class RabbitDirectConfig {
public final static String DIRECTNAME="envy-direct";
@Bean
Queue queue(){
return new Queue("hello-queue");
}
@Bean
DirectExchange directExchange(){
return new DirectExchange(DIRECTNAME,true,false);
}
@Bean
Binding binding(){
return BindingBuilder.bind(queue()).to(directExchange()).with("direct");
}
}

解释一下上述代码的含义:(1)首先提供一个消息队列Queue,然后创建一个DirectExchange对象,三个参数分别是名字、重启后是否依然有效以及长期未使用是否删除等。(2)binding方法用于将Exchange和Queue对象绑定在一起。(3)DirectExchange和Binding两个Bean的配置可以省略掉,即如果使用DirectExchange,只配置一个Queue实例即可。

接下来配置一个消费者,新增component包,并在其中新建DriectReceiver类,里面的代码为:

1
2
3
4
5
6
7
@Component
public class DriectReceiver {
@RabbitListener(queues = "hello-queue")
public void handlerOne(String msg){
System.out.println("DirectReceiver:"+msg);
}
}

上面通过@RabbitListener注解指定一个方法是一个消息消费者,方法参数就是所接收到的消息。然后定义一个单元测试类DriectReceiverTest,在里面使用RabbitTemplate对象来进行消息发送,相应的代码为:

1
2
3
4
5
6
7
8
9
10
11
@RunWith(SpringRunner.class)
@SpringBootTest
public class DriectReceiverTest {
@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void directTest() {
rabbitTemplate.convertAndSend("hello-queue","hello,direct!");
}
}

请注意这个convertAndSend方法的第一个参数是routingkey,第二个参数是object。

请确认RabbitMQ已经启动,然后执行该测试方法,可以看到在SpringBoot控制台上打印出下面的信息:

1
DirectReceiver:hello,direct!

(2)Fanout。FanoutExchange的数据交换策略是把所有到达的FanoutExchange的消息转发给所有与它绑定的Queue,在这种策略中,routingkey将不起任何作用。

FanoutExchange的配置如下,新建一个RabbitFanoutConfig类,其中的代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
public class RabbitFanoutConfig {
public final static String FANOUTNAME = "envy-fanout";
@Bean
FanoutExchange fanoutExchange(){
return new FanoutExchange(FANOUTNAME,true,false);
}
@Bean
Queue queueOne(){
return new Queue("queue-one");
}
@Bean
Queue queueTwo(){
return new Queue("queue-two");
}
@Bean
Binding bindingOne(){
return BindingBuilder.bind(queueOne()).to(fanoutExchange());
}
@Bean
Binding bindingTwo(){
return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
}
}

在这里首先创建FanoutExchange,参数的含义与创建DirectExchange参数的含义一致,然后创建两个Queue,再将这两个Queue都绑定到FanoutExchange上。接下来配置两个消费者,在component包中新建FanoutReceiver类,里面的代码为:

1
2
3
4
5
6
7
8
9
10
11
@Component
public class FanoutReceiver {
@RabbitListener(queues = "queue-one")
public void handlerOne(String msg){
System.out.println("FanoutReceiver---->handlerOne:"+msg);
}
@RabbitListener(queues = "queue-two")
public void handleTwo(String msg){
System.out.println("FanoutReceiver---->handleTwo:"+msg);
}
}

这两个消费者分别消费两个消息队列中的消息。然后定义一个单元测试类FanoutReceiverTest,在里面使用RabbitTemplate对象来进行消息发送,相应的代码为:

1
2
3
4
5
6
7
8
9
10
11
@RunWith(SpringRunner.class)
@SpringBootTest
public class FanoutReceiverTest {
@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void fanoutTest() {
rabbitTemplate.convertAndSend(RabbitFanoutConfig.FANOUTNAME,null,"hello,fanout!");
}
}

请注意这个convertAndSend方法的第一个参数是名称,第二个参数是object,这里发送消息时不需要routingkey,只需指定exchange即可,routingkey可以直接传一个null值。

请确认RabbitMQ已经启动,然后执行该测试方法,可以看到在SpringBoot控制台上打印出下面的信息:

1
2
FanoutReceiver---->handlerOne:hello,fanout!
FanoutReceiver---->handleTwo:hello,fanout!

可以看到,一条消息发送出去后,所有和该FanoutExchange绑定的Queue都收到了消息。

(3)Topic。TopicExchange是比较复杂也比较灵活的一种路由策略,在TopicExchange中,Queue通过routingkey绑定到了TopicExchange上,当消息到达TopicExchange后,TopicExchange根据消息的routingkey将消息路由到一个或者多个Queue上。

TopicExchange的配置如下,新建一个RabbitTopicConfig类,其中的代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Configuration
public class RabbitTopicConfig {
public final static String TOPICNAME = "envy-topic";

@Bean
TopicExchange topicExchange(){
return new TopicExchange(TOPICNAME,true,false);
}
@Bean
Queue xiaomi(){
return new Queue("xiaomi");
}
@Bean
Queue huawei(){
return new Queue("huawei");
}
@Bean
Queue phone(){
return new Queue("phone");
}

@Bean
Binding xiaomiBinding(){
return BindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#");
}
@Bean
Binding huaweiBinding(){
return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#");
}
@Bean
Binding phoneBinding(){
return BindingBuilder.bind(phone()).to(topicExchange()).with("#.phone.#");
}
}

解释一下上述代码的含义:

  • 在这里首先创建TopicExchange,参数的含义与创建DirectExchange参数的含义一致,然后创建三个Queue,第一个Queue用来存储和“xiaomi”有关的消息;第二个Queue用来存储和“huawei”有关的消息;第三个Queue用来存储和“phone”有关的消息。
  • 再将这三个Queue分别绑定到TopicExchange上,第一个Binding中的“xiaomi.#”表示消息的routingkey凡是以“xiaomi”开头的,都将被路由到名称为“xiaomi”的Queue上;第二个Binding中的“huawei.#”表示消息的routingkey凡是以“huawei”开头的,都将被路由到名称为“huawei”的Queue上;第三个Binding中的“#.phone.#”表示消息的routingkey中凡是包含“phone”的,都将被路由到名称为“phone”的Queue上。

接下来配置三个消费者,在component包中新建TopicReceiver类,里面的代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class TopicReceiver {
@RabbitListener(queues = "phone")
public void handleOne(String msg){
System.out.println("PhoneReceiver---->"+msg);
}
@RabbitListener(queues = "xiaomi")
public void handleTwo(String msg){
System.out.println("XiaomiReceiver---->"+msg);
}
@RabbitListener(queues = "huawei")
public void handleThree(String msg){
System.out.println("HuaweiReceiver---->"+msg);
}
}

这三个消费者分别消费三个消息队列中的消息。然后定义一个单元测试类TopicReceiverTest,在里面使用RabbitTemplate对象来进行消息发送,相应的代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RunWith(SpringRunner.class)
@SpringBootTest
public class TopicReceiverTest {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void topicTest(){
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME, "xiaomi.news", "小米新闻...");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.news","华为新闻...");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.phone","小米手机...");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.phone","华为手机...");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"phone.news","手机新闻...");
}
}

请注意这个convertAndSend方法的第一个参数是名称,第二个参数是routingkey,第三个参数是object。根据RabbitTopicConfig中的配置,第一条消息被路由到名称为“xiaomi”的Queue上;第二条消息被路由到名称为“huawei”的Queue上;第三条消息被路由到名称为“xiaomi”以及名为“phone”的Queue上;第四条消息被路由到名称为“huawei”以及名为“phone”的Queue上;最后一条消息被路由到名称为“phonei”的Queue上。

请确认RabbitMQ已经启动,然后执行该测试方法,可以看到在SpringBoot控制台上打印出下面的信息:

1
2
3
4
5
6
7
HuaweiReceiver---->华为新闻...
PhoneReceiver---->小米手机...
XiaomiReceiver---->小米新闻...
HuaweiReceiver---->华为手机...
XiaomiReceiver---->小米手机...
PhoneReceiver---->华为手机...
PhoneReceiver---->手机新闻...

可以看到,一个输出了7条信息,这个和之前的预想完全一致。

(4)Header。HeadersExchange是一种使用较少的路由策略,HeadersExchange会根据消息的Header将消息路由到不同的Queue上,这种策略也和routingkey无关。

HeadersExchange的配置如下,新建一个RabbitHeaderConfig类,其中的代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Configuration
public class RabbitHeaderConfig {
public final static String HEADERNAME= "envy-header";
@Bean
HeadersExchange headersExchange(){
return new HeadersExchange(HEADERNAME,true,false);
}
@Bean
Queue queueName(){
return new Queue("name-queue");
}
@Bean
Queue queueAge(){
return new Queue("age-queue");
}
@Bean
Binding bindingName(){
Map<String,Object> map =new HashMap<>();
map.put("name","envy");
return BindingBuilder.bind(queueName()).to(headersExchange()).whereAny(map).match();
}
@Bean
Binding bindingAge(){
return BindingBuilder.bind(queueAge()).to(headersExchange()).where("age").exists();
}
}

解释一下上述代码的含义:

  • 在这里首先创建HeadersExchange,参数的含义与创建DirectExchange参数的含义一致,然后创建两个Queue。这里大部分的配置和之前介绍的一样,主要差别就体现在Binding的配置上,第一个bindingName方法中,whereAny表示消息的Header中只要有一个Header匹配上map的key/value,就把该消息路由到名为“name-queue”的Queue上,这里也可以使用whereAll方法,表示消息的所有Header都要匹配。whereAny和whereAll实际上对应了一个名为x-match的属性。bindAge方法中的配置则表示只要消息的Header中包含age,无论age的值为多少,都将消息路由到名为“age-queue”的Queue上。

接下来配置两个消费者,在component包中新建HeaderReceiver类,里面的代码为:

1
2
3
4
5
6
7
8
9
10
11
@Component
public class HeaderReceiver {
@RabbitListener(queues = "name-queue")
public void handlerOne(byte[] msg){
System.out.println("HeaderReceiver--->name:"+new String(msg,0,msg.length));
}
@RabbitListener(queues = "age-queue")
public void handlerTwo(byte[] msg){
System.out.println("HeaderReceiver--->age:"+new String(msg,0,msg.length));
}
}

注意,这里的参数需要使用byte[]数组接收,同时这两个消费者分别消费两个消息队列中的消息。

然后定义一个单元测试类HeaderReceiverTest,在里面使用RabbitTemplate对象来进行消息发送,相应的代码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RunWith(SpringRunner.class)
@SpringBootTest
public class HeaderReceiverTest {
@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void headerTest(){
Message nameMsg = MessageBuilder
.withBody("hello,header! name-queue".getBytes())
.setHeader("name","envy")
.build();
Message ageMsg = MessageBuilder
.withBody("hello,header! age-queue".getBytes())
.setHeader("age","120")
.build();
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME,null,nameMsg);
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME,null,ageMsg);
}
}

请注意这里使用的是send方法,该方法的第一个参数是名称,第二个参数是object,这里发送消息时不需要routingkey,只需指定exchange即可,routingkey可以直接传一个null值。两条消息具有不同的header,不同header的消息将被发送到不同的Queue中。

请确认RabbitMQ已经启动,然后执行该测试方法,可以看到在SpringBoot控制台上打印出下面的信息:

1
2
HeaderReceiver--->name:hello,header! name-queue
HeaderReceiver--->age:hello,header! age-queue

可以看到,一个输出了2条信息,这个和之前的预想完全一致。

消息服务小结

本篇学习了SpringBoot对消息服务的支持,传统的JMS和AMQP各有千秋。 JMS从API的层面对消息中间件进行了统一, AMQP从协议层面来统一, JMS不支持跨平台,而AMQP天然地具备跨平台功能。AMQP支持的消息模型也更加丰富,除了本篇介绍的ActiveMQ和RabbitMQ之外,Spring Boot也能方便地整合Kafka、Artemis等,开发者需要结合实际情况来选择合适的消息中间件。