写在前面

本文将在第七篇《整合MongoDB实现用户商品浏览记录》的基础上整合RabbitMQ,实现延迟消息这一功能。

RabbitMQ

AMQP简介

AMQP (Advanced Message Queuing Protocol ,高级消息队列协议)是一个线路层的协议规范,而不是API 规范(例如JMS)。由于AMQP 是一个线路层协议规范,因此它天然就是跨平台的,就像SMTP、HTTP等协议一样,只要开发者按照规范的格式发送数据,任何平台都可以通过AMQP进行消息交互。像目前流行的StormMQ、RabbitMQ等都实现了AMQP协议。

RabbitMQ简介

RabbitMQ是一个实现了AMQP的开源消息中间件,使用高性能的Erlang编写。RabbitMQ具有可靠性、支持多种协议、高可用、支持消息集群以及多语言客户端等特点,在分布式系统中存储转发消息,具有不错的性能表现。

RabbitMQ的安装

第一步,安装Erlang,可点击 这里 进行下载,然后进行安装。

第二步,安装RabbitMQ,可点击 这里 进行下载,然后进行安装。

第三步,以管理员身份打开终端,并切换到RabbitMQ安装目录下的sbin目录:

然后执行如下命令来开启管理功能:

1
rabbitmq-plugins enable rabbitmq_management

第四步,打开浏览器,访问http://localhost:15672/链接:

第五步,输出账号和密码:guest和guest,然后登陆,点击右上角的Admin,创建用户名为kenbings和kenbings的用户,并设置角色为管理员:

接着创建一个名为/shop的虚拟host:

然后点击shop用户,进入到用户配置界面:

这样RabbitMQ的安装和配置就完成了,下面开始学习AMQP协议。

AMQP协议

AMQP协议作为RabbitMQ的规范,规定了RabbitMQ对外接口,同时学会了AMQP协议的使用,就基本掌握了RabbitMQ的使用。

Broker:接收和分发消息的应用,RabbitMQ就是Message Broker;
Virtual Host:虚拟Broker,用于将多个单元隔离开;
Connection:publisher/consumer和broker之间的TCP连接;
Channel:connection内部建立的逻辑连接,通常每个线程创建单独的channel;
Routing Key:路由键,用于指示消息的路由转发,相当于快递的地址;
Exchange:交换机,相当于快递的分拨中心;
Queue:消息队列,消息最终被送到这里等待consumer取走;
Binding Key:exchange和queue之间的虚拟连接,用于message的分发依据。

Exchange的作用

在AMQP协议或者是RabbitMQ实现中,最核心的组件是Exchange,Exchange承担RabbitMQ中的核心功能,即路由转发。Exchange有多个种类,配置多变,需要深度分析。

Exchange的功能是根据绑定关系和路由键为消息提供路由,将消息转发至相应的队列。

Exchange有4种类型,Direct、Topic、Fanout和Headers,其中Headers使用较少,以前三种为主。

业务场景描述

本篇要实现的延迟消息,是用户在下单之后,超过一定时间如30分钟,之后自动取消订单这一业务场景。可能的业务步骤如下:
(1)用户进行下单操作(会存在锁定商品库存、使用优惠券、积分、红包等操作);
(2)生成订单信息,并获取订单id;
(3)根据订单id获取到设置的订单超时时间(假设设置30分钟内不支付取消订单);
(4)按照订单超时时间发送一个延迟消息给RabbitMQ,让它在订单超时后触发取消订单的操作;
(5)如果用户没有支付,那么就执行取消订单的操作(释放商品库存、优惠券、积分、红包等操作)。

整合RabbitMQ实现延迟消息

第一步,复制一份shop-mongodb源码,将其名字修改为shop-rabbitmq,然后对应包和文件中的信息也记得修改,本篇后续所有操作均在shop-rabbitmq这一Module中进行。注意复制之后需要重新执行一下Generator类,以覆盖之前项目的自动生成文件。关于如何使用IDEA复制module,可以点击 这里 进行阅读。

第二步,在shop-rabbitmq的POM文件中新增如下依赖:

1
2
3
4
5
<!--消息队列相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第三步,往application.yml配置文件中在spring节点下添加RabbitMQ相关配置信息:

1
2
3
4
5
6
7
8
  # RabbitMQ相关    
rabbitmq:
host: localhost # rabbitmq的连接地址
port: 5672 # rabbitmq的连接端口号
virtual-host: /shop # rabbitmq的虚拟host
username: kenbings # rabbitmq的用户名
password: kenbings # rabbitmq的密码
publisher-confirms: true #如果对异步消息需要回调必须设置为true

第四步,在com.kenbings.shop.shoprabbitmq包内定义一个名为enums的包,接着在enums包内定义一个名为QueueEnum的枚举类,注意这是消息队列的枚举类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 消息队列枚举类
*/
@Getter
public enum QueueEnum {
/**
* 订单取消通知队列
*/
QUEUE_ORDER_CANCEL("shop.order.direct.exchange","shop.order.cancel.queue","shop.order.cancel.key"),
/**
* 订单消息通知TTL队列
*/
QUEUE_TTL_ORDER_CANCEL("shop.order.direct.ttl.exchange","shop.order.cancel.ttl.queue","shop.order.cancel.ttl.key");
/**
* 交换机
*/
private String exchange;
/**
* 队列名称
*/
private String queue;
/**
* 路由键
*/
private String routingKey;

QueueEnum(String exchange, String queue, String routingKey){
this.exchange = exchange;
this.queue = queue;
this.routingKey = routingKey;
}
}

第五步,在config包内定义一个名为RabbitMQConfig的配置类,注意这是消息队列的配置类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* 消息队列配置类
*/
@Configuration
public class RabbitMQConfig {
/**
* 订单消息实际消费队列所绑定的交换机
*/
@Bean
public DirectExchange orderDirectExchange(){
return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()).durable(true).build();
}

/**
* 订单延迟队列所绑定的交换机
*/
@Bean
public DirectExchange orderTTLDirectExchange(){
return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()).durable(true).build();
}

/**
* 订单消息实际消费队列
*/
@Bean
public Queue orderQueue(){
return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getQueue());
}

/**
* 订单延迟消息队列(死信队列)
*/
@Bean
public Queue orderTTLQueue(){
return QueueBuilder.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getQueue())
.withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) //消息过期后转到的交换机
.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRoutingKey()) //消息过期后转发的路由键
.build();
}

/**
* 将正常订单队列绑定到交换机
*/
@Bean
public Binding orderBinding(DirectExchange orderDirectExchange,Queue orderQueue){
return BindingBuilder.bind(orderQueue).to(orderDirectExchange).with(QueueEnum.QUEUE_ORDER_CANCEL.getRoutingKey());
}

/**
* 将订单延迟队列绑定到交换机
*/
@Bean
public Binding orderTTLBinding(DirectExchange orderTTLDirectExchange,Queue orderTTLQueue){
return BindingBuilder.bind(orderTTLQueue).to(orderTTLDirectExchange).with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRoutingKey());
}
}

之后我们尝试启动一下项目入口类,然后查看一下RabbitMQ的浏览器控制台,然后查看Exchange:

接着再来查看一下队列:

shop.order.direct.exchange(取消订单消息队列所绑定的交换机),它所绑定的队列为shop.order.cancel.queue,这样如果有消息以shop.order.cancel.key为路由键发送过来,就会发送到此队列中。

shop.order.direct.ttl.exchange(订单延迟消息队列所绑定的交换机),它所绑定的队列为shop.order.cancel.ttl.queue,这样如果有消息以shop.order.cancel.ttl.key为路由键发送过来,就会发送到此队列中,并在此队列中保存一段时间,等到超时时间到,后自动将消息发送到shop.order.cancel.queue(取消订单消息队列)中。

第六步,在component包内定义一个名为CancelOrderSender的类,该类用于往订单延迟队列(订单消息通知TTL队列,shop.order.cancel.ttl.queue)中发送消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 往订单延迟队列(订单消息通知TTL队列,`shop.order.cancel.ttl.queue`)中发送消息
* 取消订单消息的发出者
*/
@Component
public class CancelOrderSender {
private static final Logger LOGGER = LoggerFactory.getLogger(CancelOrderSender.class);

@Autowired
private AmqpTemplate amqpTemplate;

/**
* 给延迟队列发送消息
* @param orderId 订单号
* @param delayTime 延迟时间(毫秒)
*/
public void sendMessage(Long orderId,long delayTime){
String exchange = QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange();
String routingKey = QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRoutingKey();
amqpTemplate.convertAndSend(exchange, routingKey, orderId, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//给消息设置延迟时间,单位毫秒
message.getMessageProperties().setExpiration(String.valueOf(delayTime));
return message;
}
}
);
LOGGER.info("发送延迟消息,订单号为:{}",orderId);
}
}

第七步,在component包内定义一个名为CancelOrderReceiver的类,该类用于从订单取消通知队列(shop.order.cancel.queue)中拉取消息并消费:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 处理取消订单消息
*/
@Component
@RabbitListener(queues = "shop.order.cancel.queue")
public class CancelOrderReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(CancelOrderReceiver.class);

@Autowired
private OmsPortalOrderService portalOrderService;

@RabbitHandler
public void handle(Long orderId){
LOGGER.info("收到延迟消息,订单号为:{}",orderId);
portalOrderService.cancelOrder(orderId);
}
}

第八步,在service包内定义一个名为OmsPortalOrderService的接口,用于定义与前台订单管理相关的接口方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 前台订单管理 Service
*/
public interface OmsPortalOrderService {
/**
* 根据提交信息生成订单
*/
@Transactional
CommonResult generateOrder(OrderParam orderParam);

/**
* 取消单个超时订单
*/
@Transactional
void cancelOrder(Long orderId);
}

第九步,在dto包内定义一个名为OrderParam的类,这是生成订单时需传入的参数对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 生成订单时需传入的参数
*/
@Data
public class OrderParam {
//收货地址id
private Long memberReceiveAddressId;
//优惠券id
private Long couponId;
//使用的积分数
private Integer useIntegration;
//支付方式
private Integer payType;
}

第十步,在impl包内定义一个名为OmsPortalOrderServiceImpl的类,这个类需要实现OmsPortalOrderService接口,并重写其中的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 前台订单管理 Service的实现类
*/
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {
private static final Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderService.class);

@Autowired
private CancelOrderSender cancelOrderSender;

@Override
public CommonResult generateOrder(OrderParam orderParam) {
//TODO 一系列下单操作,这些逻辑可参考完整项目
LOGGER.info("开始生成订单");
//下单完成后发送一个延迟消息,用于实现当用户没有付款时取消订单这一功能,注意orderId应该在下单后生成
Long orderId = 516516161616L;
sendDelayMessageCancelOrder(orderId);
return CommonResult.success("下单成功");
}

@Override
public void cancelOrder(Long orderId) {
//TODO 一系列取消订单操作,这些逻辑可参考完整项目
LOGGER.info("开始取消订单,订单号为:{}",orderId);
}

private void sendDelayMessageCancelOrder(Long orderId) {
//获取订单超时时间,假设为30秒
long delayTimes = 30 * 1000;
//发送延迟消息
cancelOrderSender.sendMessage(orderId, delayTimes);
}
}

第十一步,在controller包内定义一个名为OmsPortalOrderController的类,这是前台订单管理的Controller:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 前台订单管理的Controller
*/
@Api(tags = "OmsPortalOrderController",description = "订单管理")
@RestController
@RequestMapping("/order")
public class OmsPortalOrderController {
@Autowired
private OmsPortalOrderService omsPortalOrderService;

@ApiOperation("根据购物车信息生成订单")
@PostMapping("/generateOrder")
public Object generateOrder(@RequestBody OrderParam orderParam) {
return omsPortalOrderService.generateOrder(orderParam);
}
}

第十二步,启动项目,访问Swagger-UI接口文档地址,即浏览器访问http://localhost:8080/swagger-ui.html链接,可以看到新的接口已经出现了:

第十三步,进行接口测试。首先后台用户进行登录,接着测试“下单接口”:

之后回到IDEA控制台,可以看到输出如下信息:

1
2
2021-12-09 18:05:40.460  INFO 14628 --- [nio-8080-exec-1] c.k.s.s.service.OmsPortalOrderService    : 开始生成订单
2021-12-09 18:05:40.464 INFO 14628 --- [nio-8080-exec-1] c.k.s.s.component.CancelOrderSender : 发送延迟消息,订单号为:516516161616

之后等待30秒钟,IDEA控制台输出如下信息:

1
2
2021-12-09 18:06:10.494  INFO 14628 --- [cTaskExecutor-1] c.k.s.s.component.CancelOrderReceiver    : 收到延迟消息,订单号为:516516161616
2021-12-09 18:06:10.495 INFO 14628 --- [cTaskExecutor-1] c.k.s.s.service.OmsPortalOrderService : 开始取消订单,订单号为:516516161616

可以看到时间间隔就是30秒,这也说明该订单下单30秒后未支付便被自动取消了。