Stream基础使用
写在前面
在前面我们学习了使用消息总线Spring Cloud Bus组件来整合RabbitMQ和Kafka等消息中间件,接下来开始学习另一个组件Spring Cloud Stream,被称为是消息驱动的微服务。它可以基于Spring Boot来创建独立的、可用于生产的Spring应用程序。同时它通过使用Spring Interation来连接消息代理中间件以实现消息事件的驱动。
Spring Cloud Stream为一些中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅、消费组以及分区这三个核心概念。简单的说,Spring Cloud Stream本质上就是整合了Spring Boot和Spring Interation,实现了一套轻量级的消息驱动的微服务框架。使用Spring Cloud Stream可以有效的简化开发人员对于消息中间件的使用复杂度,让系统开发人员可以将更多的精力专注于核心业务逻辑的处理。目前为止Spring Cloud Stream支持RabbitMQ和Kafka这两个消息中间件,但是相信在不久的将来会有更多的中间件加入其中。
快速入门
接下来通过一个简单的例子来感受Spring Cloud Stream的魅力,该例子是构建一个基于Spring Boot的微服务应用,这个微服务应用将通过使用消息中间件RabbitMQ来接收消息,并将消息打印到日志中。
第一步,新建一个普通的SpringBoot工程,名称为stream-hello,注意使用默认的依赖。
第二步,添加项目依赖。Spring Boot工程创建完成后,修改pom.xml文件,添加如下依赖:
1 | <properties> |
请注意这个spring-cloud-starter-stream-rabbit
其实也是依赖了Spring Cloud Stream对于RabbitMQ的封装,因此就包含了对于RabbitMQ的自动化配置,如默认的地址是localhost,端口是5672,默认的用户名和密码都是guest。查看spring-cloud-starter-stream-rabbit
依赖可知,里面其实就只依赖于spring-cloud-stream-binder-rabbit
,因此这两者其实是等价关系。
1 | <dependency> |
如果之前开发者没有修改上述配置,那么此时可以不用做任何配置,但是笔者之前进行了修改,因此需要在application.properties配置文件中进行修改:
1 | # 项目名称 |
第三步,创建接收器。在stream-hello项目目录下新建一个receiver包,并在该包内新建一个SinkReceiver类,用于接收RabbitMQ发送过来的消息,如下所示:
1 | @EnableBinding(Sink.class) |
简单解释一下上述代码的含义:
(1)首先使用@EnableBinding
注解,该注解用于指定一个或者多个定义了@Input
或者@Output
注解的接口,以实现对消息通道(Channel)的绑定。在这里我们通过使用@EnableBinding(Sink.class)
来绑定Sink接口。Sink接口是Spring Cloud Stream中默认实现的对输入消息通道绑定的定义,它的源码如下:
1 | public interface Sink { |
从源码中可以看到,它通过 @Input
注解绑定了一个名为input的通道,其实除了Sink之外,Spring Cloud Stream还默认实现了绑定output通道的Source接口,还有结合了Sink和Source的Processor接口,在实际使用的时候开发者可以通过使用@Input
和@Output
注解来定义绑定消息通道的接口。当需要为@EnableBinding
指定多个接口来绑定消息通道的时候,可以使用类似的定义:
1 | @EnableBinding(value={Sink.class,Source.class}) |
(2)其次使用@StreamListener
注解,它主要定义在方法上,其作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。我们通过使用 @StreamListener(Sink.INPUT)
注解,将receive方法注册为input消息通道的监听处理器,因此当我们在RabbitMQ的控制页面中发布消息的时候,receive方法会作出对应的响应动作。
第四步,启动测试。首先需要将RabbitMQ启动,然后启动stream-hello项目,观察stream-hello项目的控制台,可以看到有类似下面的输出信息:
1 | Created new connection: rabbitConnectionFactory#7f92b990:0/SimpleConnection@7aac8884 [delegate=amqp://springcloud@127.0.0.1:5672/, localPort= 7440] |
这就说明我们项目已经通过springcloud用户创建了一个指向RabbitMQ的连接,接着在浏览器中访问http://localhost:15672
,可以看到Connections页面出现了如下的信息:
再点击Queues,可以看到这里的input.anonymous等一大堆字母组成的字符串就是我们新建的队列:
那么接下来就可以点击其下方的publish message按钮,然后在其下方的Playload区域输入一些信息,再点击下方的Publish message按钮:
然后观察stream-hello项目的输出控制台,可以发现其输出的内容就是SinkReceiver中的receive方法定义的内容。