写在前面

在前面我们学习了使用消息总线Spring Cloud Bus组件来整合RabbitMQ和Kafka等消息中间件,接下来开始学习另一个组件Spring Cloud Stream,被称为是消息驱动的微服务。它可以基于Spring Boot来创建独立的、可用于生产的Spring应用程序。同时它通过使用Spring Interation来连接消息代理中间件以实现消息事件的驱动。

Spring Cloud Stream为一些中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅、消费组以及分区这三个核心概念。简单的说,Spring Cloud Stream本质上就是整合了Spring Boot和Spring Interation,实现了一套轻量级的消息驱动的微服务框架。使用Spring Cloud Stream可以有效的简化开发人员对于消息中间件的使用复杂度,让系统开发人员可以将更多的精力专注于核心业务逻辑的处理。目前为止Spring Cloud Stream支持RabbitMQ和Kafka这两个消息中间件,但是相信在不久的将来会有更多的中间件加入其中。

快速入门

接下来通过一个简单的例子来感受Spring Cloud Stream的魅力,该例子是构建一个基于Spring Boot的微服务应用,这个微服务应用将通过使用消息中间件RabbitMQ来接收消息,并将消息打印到日志中。

第一步,新建一个普通的SpringBoot工程,名称为stream-hello,注意使用默认的依赖。

第二步,添加项目依赖。Spring Boot工程创建完成后,修改pom.xml文件,添加如下依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR7</spring-cloud.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>

请注意这个spring-cloud-starter-stream-rabbit其实也是依赖了Spring Cloud Stream对于RabbitMQ的封装,因此就包含了对于RabbitMQ的自动化配置,如默认的地址是localhost,端口是5672,默认的用户名和密码都是guest。查看spring-cloud-starter-stream-rabbit依赖可知,里面其实就只依赖于spring-cloud-stream-binder-rabbit,因此这两者其实是等价关系。

1
2
3
4
 <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

如果之前开发者没有修改上述配置,那么此时可以不用做任何配置,但是笔者之前进行了修改,因此需要在application.properties配置文件中进行修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 项目名称
spring.application.name=stream-hello
# 项目端口号
server.port=4006

# RabbitMQ相关设置
# 主机
spring.rabbitmq.host=localhost
# 端口
spring.rabbitmq.port=5672
# 用户名
spring.rabbitmq.username=springcloud
# 密码
spring.rabbitmq.password=springcloud

第三步,创建接收器。在stream-hello项目目录下新建一个receiver包,并在该包内新建一个SinkReceiver类,用于接收RabbitMQ发送过来的消息,如下所示:

1
2
3
4
5
6
7
8
9
@EnableBinding(Sink.class)
public class SinkReceiver {
private static Logger logger = LoggerFactory.getLogger(StreamHelloApplication.class);

@StreamListener(Sink.INPUT)
public void receive(Object playLoad){
logger.info("Received: "+playLoad);
}
}

简单解释一下上述代码的含义:
(1)首先使用@EnableBinding注解,该注解用于指定一个或者多个定义了@Input或者@Output注解的接口,以实现对消息通道(Channel)的绑定。在这里我们通过使用@EnableBinding(Sink.class)来绑定Sink接口。Sink接口是Spring Cloud Stream中默认实现的对输入消息通道绑定的定义,它的源码如下:

1
2
3
4
5
6
public interface Sink {
String INPUT = "input";

@Input("input")
SubscribableChannel input();
}

从源码中可以看到,它通过 @Input注解绑定了一个名为input的通道,其实除了Sink之外,Spring Cloud Stream还默认实现了绑定output通道的Source接口,还有结合了Sink和Source的Processor接口,在实际使用的时候开发者可以通过使用@Input@Output注解来定义绑定消息通道的接口。当需要为@EnableBinding指定多个接口来绑定消息通道的时候,可以使用类似的定义:

1
@EnableBinding(value={Sink.class,Source.class})

(2)其次使用@StreamListener注解,它主要定义在方法上,其作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。我们通过使用 @StreamListener(Sink.INPUT)注解,将receive方法注册为input消息通道的监听处理器,因此当我们在RabbitMQ的控制页面中发布消息的时候,receive方法会作出对应的响应动作。

第四步,启动测试。首先需要将RabbitMQ启动,然后启动stream-hello项目,观察stream-hello项目的控制台,可以看到有类似下面的输出信息:

1
Created new connection: rabbitConnectionFactory#7f92b990:0/SimpleConnection@7aac8884 [delegate=amqp://springcloud@127.0.0.1:5672/, localPort= 7440]

这就说明我们项目已经通过springcloud用户创建了一个指向RabbitMQ的连接,接着在浏览器中访问http://localhost:15672,可以看到Connections页面出现了如下的信息:

再点击Queues,可以看到这里的input.anonymous等一大堆字母组成的字符串就是我们新建的队列:

那么接下来就可以点击其下方的publish message按钮,然后在其下方的Playload区域输入一些信息,再点击下方的Publish message按钮:

然后观察stream-hello项目的输出控制台,可以发现其输出的内容就是SinkReceiver中的receive方法定义的内容。