消息服务
本篇来学习消息服务相关的知识,消息服务在一些大型的项目架构中才会使用,但是对于小项目也是可以使用的。
消息队列(Message Queue)是一种进程间或者线程间的异步通信方式,使用消息队列,消息生产者在产生消息后,会将消息保存在消息队列中 ,直到消息消费者来取走它,即消息的发送者和接收者不需要同时与消息队列交互。使用消息队列可以有效实现服务的解耦,并提供系统的可靠性以及扩展性。目前开源的消息队列服务非常多,如Apache ActiveMQ、RabbitMQ等,这些产品也就是常说的消息中间件。
JMS
JMS简介
JMS(Java Message Service)即Java消息服务,它通过统一Java API层面的标准,使得多个客户端可以通过JMS进行交互,大部分的消息中间件提供商都对JMS提供支持。JMS和ActiveMQ的关系就像是JDBC和JDBC驱动一样的关系。JMS包括两种消息模型:点对点和发布者/订阅者,同时JMS仅仅支持Java平台。
SpringBoot整合JMS
由于JMS只是一套标准,因此SpringBoot整合JMS必然是整合JMS的某一个实现,本例子以ActiveMQ为例介绍SpringBoot如何整合JMS。
ActiveMQ简介
Apache ActiveMQ是一个开源的消息中间件,它不仅完全支持JMS1.1规范,而且支持多种编程语言,例如C、C++、C#、Delphi、Erlang、Adobe Flash、Haskell、Java、JavaScript、Perl、PHP、Pike、Python和Ruby等,也支持多种协议,例如OpenWire、REST、STOMP、WS-Notification、MQTT、XMPP以及AMQP。Apache ActiveMQ 提供了对Spring 框架的支持,可以非常容易地嵌入Spring中,同时它也提供了集群支持。
ActiveMQ安装
通常来说,ActiveMQ都是安装在Linux上的,因此本例子以安装在CentOS7为例,ActiveMQ的版本为5.15.12(注意要运行ActiveMQ的话,CentOS7上必须安装Java运行环境),相应的安装步骤为:
第一步,检查是否安装java。使用java -version
命令来检测本地是否安装了java,可以看到确实安装,如果没有安装只需要使用yum -y install java
安装即可:
1 | [root@iZthsx5glu6ohyZ ~]# java -version |
第二步,下载ActiveMQ安装包。使用如下命令来下载ActiveMQ安装包:
1 | wget https://mirrors.tuna.tsinghua.edu.cn/apache/activemq/5.15.12/apache-activemq-5.15.12-bin.tar.gz |
第三步,解压下载的ActiveMQ安装包。使用如下命令来解压下载的ActiveMQ安装包:
1 | tar -zxvf apache-activemq-5.15.12-bin.tar.gz |
第四步,启动ActiveMQ。依次执行以下命令来启动ActiveMQ:
1 | cd apache-activemq-5.15.12/bin/ |
第五步,检测ActiveMQ是否启动即访问。开发者可以使用ps -ef|grep activemq
命令来检测ActiveMQ是否启动成功,若成功则可以进行访问。首先关闭CentOS7的防火墙(如果ActiveMQ是运行在云服务器上,那么还需要开放8161端口的访问权限),然后在浏览器地址栏中输入http://47.100.175.12:8161
,(47.100.175.12)是云服务器的公网地址,而8161则是ActiveMQ的默认端口号,如果能看到如下页面,则表示ActiveMQ已经成功启动了:
ActiveMQ启动成功后,单击Manage ActiveMQ borker超链接进入管理员控制台,默认用户和密码都是admin,如下图所示:
SpringBoot整合ActiveMQ
SpringBoot为ActiveMQ配置提供了相应的启动器,因此整合起来非常方便。
第一步,新建项目并添加依赖。使用spring Initializr
构建工具构建一个SpringBoot的Web应用,名称为activemqspringboot
,然后在pom.xml文件中添加如下依赖:
1 | <!--添加activemq依赖--> |
第二步,修改配置文件。接着在application.properties
配置文件中添加ActiveMQ的连接信息:
1 | # 设置ActiveMQ的广播地址 |
第三步,添加消息队列Bean。接下来在项目启动类中提供一个消息队列Bean,该Bean的实例就由ActiveMQ提供,相应的代码如下:
1 | @SpringBootApplication |
请注意上面的Queue是来自于javax.jms.Queue
包的,不要导入错误的包。
第四步,新建Message实体类。新建一个pojo包,然后在里面创建一个Message实体类,且需要实现Serializable 接口,相应的代码为:
1 | @Data |
第五步,新建JMS组件类。新建一个component包,接着在里面创建一个JMS组件类用来完成消息的发送和接收,也就是JmsComponent类,其中的代码为:
1 | @Component |
是不是觉得这个和之前Redis的操作非常类似,不过这里使用的是JmsMessagingTemplate
而不是RedisTemplate
,JmsMessagingTemplate
是由Spring提供的一个JMS消息发送模板,可以用来方便地进行消息的发送,消息发送方法convertAndSend
,该方法的第一个参数就是消息队列,第二个参数是消息的内容。本例子用于演示一个对象消息。@JmsListener
注解表示相应的方法是一个消息消费者,消息消费者订阅的消息destination为amq。经过以上配置就可以在SpringBoot中使用ActiveMQ了。
*第六步,测试。编写一个测试类,用于对消息发送进行测试(其实就是对之前定义的JmsComponent进行测试),相应的代码为:
1 | @RunWith(SpringRunner.class) |
既然是对之前定义的JmsComponent进行测试,那么必然需要导入这个JmsComponent组件,然后调用该组件内的send方法,来发送一个Message对象。
确认ActiveMQ已经启动后,再执行该测试方法,可以看到SpringBoot项目的启动日志中出现了一些信息,这表明消息队列已经收到了生产者发送的信息:
1 | receive:Message(content=hello,jms!, date=Wed Nov 27 11:52:33 CST 2019) |
这样就完成了在SpringBoot中整合ActiveMQ的相关功能。
AMQP
AMQP简介
AMQP (Advanced Message Queuing Protocol ,高级消息队列协议)是一个线路层的协议规范,而不是API 规范(例如JMS)。由于AMQP 是一个线路层协议规范,因此它天然就是跨平台的,就像SMTP、HTTP等协议一样,只要开发者按照规范的格式发送数据,任何平台都可以通过AMQP进行消息交互。像目前流行的StormMQ、RabbitMQ等都实现了AMQP协议。
SpringBoot整合AMQP
和JMS一样,使用AMQP也是使用AMQP的某个实现,本例子以RabbitMQ为例,介绍AMQP的使用。
RabbitMQ简介
R且bbitMQ是一个实现了AMQP的开源消息中间件,使用高性能的Erlang编写。RabbitMQ具有可靠性、支持多种协议、高可用、支持消息集群以及多语言客户端等特点,在分布式系统中存储转发消息,具有不错的性能表现。
RabbitMQ的安装
由于RabbitMQ使用Erlang语言编写,因此需要先安装Erlang环境,同样也是在CentOS7系统上安装Erlang21.0,相应的步骤如下:
1 | # 下载安装包 |
最后一步是检验,如果看到如图所示的页面,则显示Erlang已经安装成功了:
Erlang安装成功后,接下来开始安装RabbitMQ。由于yum仓库中默认的Erlang版本较低,因此需要先将最新的Erlang包添加到yum源中,执行如下命令
1 | vi /etc/yum.repos.d/rabbtimq-erlang.repo |
然后在其中新增以下内容:
1 | [rabbitmq-erlang] |
添加成功后,清除原有缓存并创建新缓存,使用的命令如下:
1 | yum clean all |
准备工作完成后,接下来就可以正式安装RabbitMQ了。首先下砸安装包,使用的命令为:
1 | wget https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.7/rabbitmq-server-3.7.7-1.el7.noarch.rpm |
下载完成后,使用如下命令进行安装:
1 | yum install rabbitmq-server-3.7.7-1.el7.noarch.rpm |
安装过程中若提示缺少socat依赖,可以使用如下命令安装该依赖yum -y install socat
。之后继续执行上述命令,如果还抛出以下错误:
1 | Error: Package: rabbitmq-server-3.7.7-1.el7.noarch (/rabbitmq-server-3.7.7-1.el7.noarch) |
尝试使用这个命令rpm -ivh --nodeps rabbitmq-server-3.7.7-1.el7.noarch.rpm
进行安装。等安装成功后,接下来就可以启动RabbitMQ,并进行用户管理,使用的命令如下:
1 | # 启动RabbitMQ |
若无法启动且出现下面所示的错误:
请删除/var/lib/rabbitmq/mnesia
目录下存在的:rabbit@localhost.pid、rabbit@localhost和rabbit@localhost-plugins-expand三个文件,然后再使用service rabbitmq-server start
命令就能正常启动了。如果还出现问题,请点击这里进行查阅。
接着继续执行以下命令:
1 | # 查看状态 |
注意,如果是云服务器,还需要配置防火墙,且在安全组内开放5672和15672端口的访问权限:
RabbitMQ启动成功后,默认有一个guest用户,但是该用户只能在本地登录,无法远程登录,因此本案例中添加了一个新的用户envy,它也具有管理员身份,同时可以远程登录。当RabbitMQ启动成功后,在本地浏览器地址栏中输入http://47.100.175.12:15672/
(服务器用公网IP),如果出现下图所示的界面,则表明RabbitMQ已经启动成功:
之后使用envy/123就可以进行登录了,登录成功后的界面如下图所示:
SpringBoot整合AMQP
SpringBoot为AMQP提供了自动化配置依赖spring-boot-starter-amqp
,接下来学习如何在SpringBoot中整合AMQP。
第一步,新建项目并添加依赖。使用spring Initializr
构建工具构建一个SpringBoot的Web应用,名称为rabbitmqspringboot
,然后在pom.xml文件中添加如下依赖:
1 | <!--添加amqp依赖--> |
第二步,修改配置文件。接着在application.properties
配置文件中添加RabbitMQ的连接信息:
1 | # 设置RabbitMQ的主机地址 |
第三步,RabbitMQ配置。接下来进行RabbitMQ配置,在RabbitMQ中,所有的消息生产者提交的消息都会交由Exchange进行再分配,Exchange会根据不同的策略将消息分发到不同的Queue中。RabbitMQ中一共提供了4种不同的Exchange策略,分别是Direct、Fanout、Topic和Header,前3种使用的频率较高,第4种使用额频率较低,下面分别对这4种不同的ExchangeType予以介绍。
(1)Direct。DirectExchange的路由策略是将消息队列绑定到一个DirectExchange上,当一条消息到达DirectExchange时,会被转发到与该条消息routingkey相同的Queue上,例如消息队列名为“hello-queue”,则routingkey为“hello-queue”的消息会被该消息队列所接收。
DirectExchange的配置如下,新建一个RabbitDirectConfig
类,其中的代码为:
1 | @Configuration |
解释一下上述代码的含义:(1)首先提供一个消息队列Queue,然后创建一个DirectExchange对象,三个参数分别是名字、重启后是否依然有效以及长期未使用是否删除等。(2)binding方法用于将Exchange和Queue对象绑定在一起。(3)DirectExchange和Binding两个Bean的配置可以省略掉,即如果使用DirectExchange,只配置一个Queue实例即可。
接下来配置一个消费者,新增component包,并在其中新建DriectReceiver
类,里面的代码为:
1 | @Component |
上面通过@RabbitListener
注解指定一个方法是一个消息消费者,方法参数就是所接收到的消息。然后定义一个单元测试类DriectReceiverTest
,在里面使用RabbitTemplate
对象来进行消息发送,相应的代码为:
1 | @RunWith(SpringRunner.class) |
请注意这个convertAndSend
方法的第一个参数是routingkey,第二个参数是object。
请确认RabbitMQ已经启动,然后执行该测试方法,可以看到在SpringBoot控制台上打印出下面的信息:
1 | DirectReceiver:hello,direct! |
(2)Fanout。FanoutExchange的数据交换策略是把所有到达的FanoutExchange的消息转发给所有与它绑定的Queue,在这种策略中,routingkey将不起任何作用。
FanoutExchange的配置如下,新建一个RabbitFanoutConfig
类,其中的代码为:
1 | @Configuration |
在这里首先创建FanoutExchange,参数的含义与创建DirectExchange参数的含义一致,然后创建两个Queue,再将这两个Queue都绑定到FanoutExchange上。接下来配置两个消费者,在component包中新建FanoutReceiver
类,里面的代码为:
1 | @Component |
这两个消费者分别消费两个消息队列中的消息。然后定义一个单元测试类FanoutReceiverTest
,在里面使用RabbitTemplate
对象来进行消息发送,相应的代码为:
1 | @RunWith(SpringRunner.class) |
请注意这个convertAndSend
方法的第一个参数是名称,第二个参数是object,这里发送消息时不需要routingkey,只需指定exchange即可,routingkey可以直接传一个null值。
请确认RabbitMQ已经启动,然后执行该测试方法,可以看到在SpringBoot控制台上打印出下面的信息:
1 | FanoutReceiver---->handlerOne:hello,fanout! |
可以看到,一条消息发送出去后,所有和该FanoutExchange绑定的Queue都收到了消息。
(3)Topic。TopicExchange是比较复杂也比较灵活的一种路由策略,在TopicExchange中,Queue通过routingkey绑定到了TopicExchange上,当消息到达TopicExchange后,TopicExchange根据消息的routingkey将消息路由到一个或者多个Queue上。
TopicExchange的配置如下,新建一个RabbitTopicConfig
类,其中的代码为:
1 | @Configuration |
解释一下上述代码的含义:
- 在这里首先创建TopicExchange,参数的含义与创建DirectExchange参数的含义一致,然后创建三个Queue,第一个Queue用来存储和“xiaomi”有关的消息;第二个Queue用来存储和“huawei”有关的消息;第三个Queue用来存储和“phone”有关的消息。
- 再将这三个Queue分别绑定到TopicExchange上,第一个Binding中的“xiaomi.#”表示消息的routingkey凡是以“xiaomi”开头的,都将被路由到名称为“xiaomi”的Queue上;第二个Binding中的“huawei.#”表示消息的routingkey凡是以“huawei”开头的,都将被路由到名称为“huawei”的Queue上;第三个Binding中的“#.phone.#”表示消息的routingkey中凡是包含“phone”的,都将被路由到名称为“phone”的Queue上。
接下来配置三个消费者,在component包中新建TopicReceiver
类,里面的代码为:
1 | @Component |
这三个消费者分别消费三个消息队列中的消息。然后定义一个单元测试类TopicReceiverTest
,在里面使用RabbitTemplate
对象来进行消息发送,相应的代码为:
1 | @RunWith(SpringRunner.class) |
请注意这个convertAndSend
方法的第一个参数是名称,第二个参数是routingkey,第三个参数是object。根据RabbitTopicConfig
中的配置,第一条消息被路由到名称为“xiaomi”的Queue上;第二条消息被路由到名称为“huawei”的Queue上;第三条消息被路由到名称为“xiaomi”以及名为“phone”的Queue上;第四条消息被路由到名称为“huawei”以及名为“phone”的Queue上;最后一条消息被路由到名称为“phonei”的Queue上。
请确认RabbitMQ已经启动,然后执行该测试方法,可以看到在SpringBoot控制台上打印出下面的信息:
1 | HuaweiReceiver---->华为新闻... |
可以看到,一个输出了7条信息,这个和之前的预想完全一致。
(4)Header。HeadersExchange是一种使用较少的路由策略,HeadersExchange会根据消息的Header将消息路由到不同的Queue上,这种策略也和routingkey无关。
HeadersExchange的配置如下,新建一个RabbitHeaderConfig
类,其中的代码为:
1 | @Configuration |
解释一下上述代码的含义:
- 在这里首先创建HeadersExchange,参数的含义与创建DirectExchange参数的含义一致,然后创建两个Queue。这里大部分的配置和之前介绍的一样,主要差别就体现在Binding的配置上,第一个
bindingName
方法中,whereAny表示消息的Header中只要有一个Header匹配上map的key/value,就把该消息路由到名为“name-queue”的Queue上,这里也可以使用whereAll方法,表示消息的所有Header都要匹配。whereAny和whereAll实际上对应了一个名为x-match的属性。bindAge
方法中的配置则表示只要消息的Header中包含age,无论age的值为多少,都将消息路由到名为“age-queue”的Queue上。
接下来配置两个消费者,在component包中新建HeaderReceiver
类,里面的代码为:
1 | @Component |
注意,这里的参数需要使用byte[]
数组接收,同时这两个消费者分别消费两个消息队列中的消息。
然后定义一个单元测试类HeaderReceiverTest
,在里面使用RabbitTemplate
对象来进行消息发送,相应的代码为:
1 | @RunWith(SpringRunner.class) |
请注意这里使用的是send
方法,该方法的第一个参数是名称,第二个参数是object,这里发送消息时不需要routingkey,只需指定exchange即可,routingkey可以直接传一个null值。两条消息具有不同的header,不同header的消息将被发送到不同的Queue中。
请确认RabbitMQ已经启动,然后执行该测试方法,可以看到在SpringBoot控制台上打印出下面的信息:
1 | HeaderReceiver--->name:hello,header! name-queue |
可以看到,一个输出了2条信息,这个和之前的预想完全一致。
消息服务小结
本篇学习了SpringBoot对消息服务的支持,传统的JMS和AMQP各有千秋。 JMS从API的层面对消息中间件进行了统一, AMQP从协议层面来统一, JMS不支持跨平台,而AMQP天然地具备跨平台功能。AMQP支持的消息模型也更加丰富,除了本篇介绍的ActiveMQ和RabbitMQ之外,Spring Boot也能方便地整合Kafka、Artemis等,开发者需要结合实际情况来选择合适的消息中间件。