写在前面
本文将在第七篇《整合MongoDB实现用户商品浏览记录》的基础上整合RabbitMQ,实现延迟消息这一功能。
RabbitMQ
AMQP简介
AMQP (Advanced Message Queuing Protocol ,高级消息队列协议)是一个线路层的协议规范,而不是API 规范(例如JMS)。由于AMQP 是一个线路层协议规范,因此它天然就是跨平台的,就像SMTP、HTTP等协议一样,只要开发者按照规范的格式发送数据,任何平台都可以通过AMQP进行消息交互。像目前流行的StormMQ、RabbitMQ等都实现了AMQP协议。
RabbitMQ简介
RabbitMQ是一个实现了AMQP的开源消息中间件,使用高性能的Erlang编写。RabbitMQ具有可靠性、支持多种协议、高可用、支持消息集群以及多语言客户端等特点,在分布式系统中存储转发消息,具有不错的性能表现。
RabbitMQ的安装
第一步,安装Erlang,可点击 这里 进行下载,然后进行安装。
第二步,安装RabbitMQ,可点击 这里 进行下载,然后进行安装。
第三步,以管理员身份打开终端,并切换到RabbitMQ安装目录下的sbin目录:
然后执行如下命令来开启管理功能:
1
| rabbitmq-plugins enable rabbitmq_management
|
第四步,打开浏览器,访问http://localhost:15672/
链接:
第五步,输出账号和密码:guest和guest,然后登陆,点击右上角的Admin,创建用户名为kenbings和kenbings的用户,并设置角色为管理员:
接着创建一个名为/shop
的虚拟host:
然后点击shop用户,进入到用户配置界面:
这样RabbitMQ的安装和配置就完成了,下面开始学习AMQP协议。
AMQP协议
AMQP协议作为RabbitMQ的规范,规定了RabbitMQ对外接口,同时学会了AMQP协议的使用,就基本掌握了RabbitMQ的使用。
Broker:接收和分发消息的应用,RabbitMQ就是Message Broker;
Virtual Host:虚拟Broker,用于将多个单元隔离开;
Connection:publisher/consumer和broker之间的TCP连接;
Channel:connection内部建立的逻辑连接,通常每个线程创建单独的channel;
Routing Key:路由键,用于指示消息的路由转发,相当于快递的地址;
Exchange:交换机,相当于快递的分拨中心;
Queue:消息队列,消息最终被送到这里等待consumer取走;
Binding Key:exchange和queue之间的虚拟连接,用于message的分发依据。
Exchange的作用
在AMQP协议或者是RabbitMQ实现中,最核心的组件是Exchange,Exchange承担RabbitMQ中的核心功能,即路由转发。Exchange有多个种类,配置多变,需要深度分析。
Exchange的功能是根据绑定关系和路由键为消息提供路由,将消息转发至相应的队列。
Exchange有4种类型,Direct、Topic、Fanout和Headers,其中Headers使用较少,以前三种为主。
业务场景描述
本篇要实现的延迟消息,是用户在下单之后,超过一定时间如30分钟,之后自动取消订单这一业务场景。可能的业务步骤如下:
(1)用户进行下单操作(会存在锁定商品库存、使用优惠券、积分、红包等操作);
(2)生成订单信息,并获取订单id;
(3)根据订单id获取到设置的订单超时时间(假设设置30分钟内不支付取消订单);
(4)按照订单超时时间发送一个延迟消息给RabbitMQ,让它在订单超时后触发取消订单的操作;
(5)如果用户没有支付,那么就执行取消订单的操作(释放商品库存、优惠券、积分、红包等操作)。
整合RabbitMQ实现延迟消息
第一步,复制一份shop-mongodb
源码,将其名字修改为shop-rabbitmq
,然后对应包和文件中的信息也记得修改,本篇后续所有操作均在shop-rabbitmq
这一Module中进行。注意复制之后需要重新执行一下Generator类,以覆盖之前项目的自动生成文件。关于如何使用IDEA复制module,可以点击 这里 进行阅读。
第二步,在shop-rabbitmq
的POM文件中新增如下依赖:
1 2 3 4 5
| <!--消息队列相关依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
第三步,往application.yml
配置文件中在spring节点下添加RabbitMQ相关配置信息:
1 2 3 4 5 6 7 8
| # RabbitMQ相关 rabbitmq: host: localhost # rabbitmq的连接地址 port: 5672 # rabbitmq的连接端口号 virtual-host: /shop # rabbitmq的虚拟host username: kenbings # rabbitmq的用户名 password: kenbings # rabbitmq的密码 publisher-confirms: true #如果对异步消息需要回调必须设置为true
|
第四步,在com.kenbings.shop.shoprabbitmq
包内定义一个名为enums的包,接着在enums包内定义一个名为QueueEnum
的枚举类,注意这是消息队列的枚举类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| /** * 消息队列枚举类 */ @Getter public enum QueueEnum { /** * 订单取消通知队列 */ QUEUE_ORDER_CANCEL("shop.order.direct.exchange","shop.order.cancel.queue","shop.order.cancel.key"), /** * 订单消息通知TTL队列 */ QUEUE_TTL_ORDER_CANCEL("shop.order.direct.ttl.exchange","shop.order.cancel.ttl.queue","shop.order.cancel.ttl.key"); /** * 交换机 */ private String exchange; /** * 队列名称 */ private String queue; /** * 路由键 */ private String routingKey;
QueueEnum(String exchange, String queue, String routingKey){ this.exchange = exchange; this.queue = queue; this.routingKey = routingKey; } }
|
第五步,在config包内定义一个名为RabbitMQConfig
的配置类,注意这是消息队列的配置类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| /** * 消息队列配置类 */ @Configuration public class RabbitMQConfig { /** * 订单消息实际消费队列所绑定的交换机 */ @Bean public DirectExchange orderDirectExchange(){ return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()).durable(true).build(); }
/** * 订单延迟队列所绑定的交换机 */ @Bean public DirectExchange orderTTLDirectExchange(){ return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()).durable(true).build(); }
/** * 订单消息实际消费队列 */ @Bean public Queue orderQueue(){ return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getQueue()); }
/** * 订单延迟消息队列(死信队列) */ @Bean public Queue orderTTLQueue(){ return QueueBuilder.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getQueue()) .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) //消息过期后转到的交换机 .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRoutingKey()) //消息过期后转发的路由键 .build(); }
/** * 将正常订单队列绑定到交换机 */ @Bean public Binding orderBinding(DirectExchange orderDirectExchange,Queue orderQueue){ return BindingBuilder.bind(orderQueue).to(orderDirectExchange).with(QueueEnum.QUEUE_ORDER_CANCEL.getRoutingKey()); }
/** * 将订单延迟队列绑定到交换机 */ @Bean public Binding orderTTLBinding(DirectExchange orderTTLDirectExchange,Queue orderTTLQueue){ return BindingBuilder.bind(orderTTLQueue).to(orderTTLDirectExchange).with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRoutingKey()); } }
|
之后我们尝试启动一下项目入口类,然后查看一下RabbitMQ的浏览器控制台,然后查看Exchange:
接着再来查看一下队列:
shop.order.direct.exchange
(取消订单消息队列所绑定的交换机),它所绑定的队列为shop.order.cancel.queue
,这样如果有消息以shop.order.cancel.key
为路由键发送过来,就会发送到此队列中。
shop.order.direct.ttl.exchange
(订单延迟消息队列所绑定的交换机),它所绑定的队列为shop.order.cancel.ttl.queue
,这样如果有消息以shop.order.cancel.ttl.key
为路由键发送过来,就会发送到此队列中,并在此队列中保存一段时间,等到超时时间到,后自动将消息发送到shop.order.cancel.queue
(取消订单消息队列)中。
第六步,在component包内定义一个名为CancelOrderSender
的类,该类用于往订单延迟队列(订单消息通知TTL队列,shop.order.cancel.ttl.queue
)中发送消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| /** * 往订单延迟队列(订单消息通知TTL队列,`shop.order.cancel.ttl.queue`)中发送消息 * 取消订单消息的发出者 */ @Component public class CancelOrderSender { private static final Logger LOGGER = LoggerFactory.getLogger(CancelOrderSender.class);
@Autowired private AmqpTemplate amqpTemplate;
/** * 给延迟队列发送消息 * @param orderId 订单号 * @param delayTime 延迟时间(毫秒) */ public void sendMessage(Long orderId,long delayTime){ String exchange = QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(); String routingKey = QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRoutingKey(); amqpTemplate.convertAndSend(exchange, routingKey, orderId, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //给消息设置延迟时间,单位毫秒 message.getMessageProperties().setExpiration(String.valueOf(delayTime)); return message; } } ); LOGGER.info("发送延迟消息,订单号为:{}",orderId); } }
|
第七步,在component包内定义一个名为CancelOrderReceiver
的类,该类用于从订单取消通知队列(shop.order.cancel.queue
)中拉取消息并消费:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| /** * 处理取消订单消息 */ @Component @RabbitListener(queues = "shop.order.cancel.queue") public class CancelOrderReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(CancelOrderReceiver.class);
@Autowired private OmsPortalOrderService portalOrderService;
@RabbitHandler public void handle(Long orderId){ LOGGER.info("收到延迟消息,订单号为:{}",orderId); portalOrderService.cancelOrder(orderId); } }
|
第八步,在service包内定义一个名为OmsPortalOrderService
的接口,用于定义与前台订单管理相关的接口方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| /** * 前台订单管理 Service */ public interface OmsPortalOrderService { /** * 根据提交信息生成订单 */ @Transactional CommonResult generateOrder(OrderParam orderParam);
/** * 取消单个超时订单 */ @Transactional void cancelOrder(Long orderId); }
|
第九步,在dto包内定义一个名为OrderParam
的类,这是生成订单时需传入的参数对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| /** * 生成订单时需传入的参数 */ @Data public class OrderParam { //收货地址id private Long memberReceiveAddressId; //优惠券id private Long couponId; //使用的积分数 private Integer useIntegration; //支付方式 private Integer payType; }
|
第十步,在impl包内定义一个名为OmsPortalOrderServiceImpl
的类,这个类需要实现OmsPortalOrderService
接口,并重写其中的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| /** * 前台订单管理 Service的实现类 */ @Service public class OmsPortalOrderServiceImpl implements OmsPortalOrderService { private static final Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderService.class);
@Autowired private CancelOrderSender cancelOrderSender;
@Override public CommonResult generateOrder(OrderParam orderParam) { //TODO 一系列下单操作,这些逻辑可参考完整项目 LOGGER.info("开始生成订单"); //下单完成后发送一个延迟消息,用于实现当用户没有付款时取消订单这一功能,注意orderId应该在下单后生成 Long orderId = 516516161616L; sendDelayMessageCancelOrder(orderId); return CommonResult.success("下单成功"); }
@Override public void cancelOrder(Long orderId) { //TODO 一系列取消订单操作,这些逻辑可参考完整项目 LOGGER.info("开始取消订单,订单号为:{}",orderId); }
private void sendDelayMessageCancelOrder(Long orderId) { //获取订单超时时间,假设为30秒 long delayTimes = 30 * 1000; //发送延迟消息 cancelOrderSender.sendMessage(orderId, delayTimes); } }
|
第十一步,在controller包内定义一个名为OmsPortalOrderController
的类,这是前台订单管理的Controller:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| /** * 前台订单管理的Controller */ @Api(tags = "OmsPortalOrderController",description = "订单管理") @RestController @RequestMapping("/order") public class OmsPortalOrderController { @Autowired private OmsPortalOrderService omsPortalOrderService;
@ApiOperation("根据购物车信息生成订单") @PostMapping("/generateOrder") public Object generateOrder(@RequestBody OrderParam orderParam) { return omsPortalOrderService.generateOrder(orderParam); } }
|
第十二步,启动项目,访问Swagger-UI接口文档地址,即浏览器访问http://localhost:8080/swagger-ui.html
链接,可以看到新的接口已经出现了:
第十三步,进行接口测试。首先后台用户进行登录,接着测试“下单接口”:
之后回到IDEA控制台,可以看到输出如下信息:
1 2
| 2021-12-09 18:05:40.460 INFO 14628 --- [nio-8080-exec-1] c.k.s.s.service.OmsPortalOrderService : 开始生成订单 2021-12-09 18:05:40.464 INFO 14628 --- [nio-8080-exec-1] c.k.s.s.component.CancelOrderSender : 发送延迟消息,订单号为:516516161616
|
之后等待30秒钟,IDEA控制台输出如下信息:
1 2
| 2021-12-09 18:06:10.494 INFO 14628 --- [cTaskExecutor-1] c.k.s.s.component.CancelOrderReceiver : 收到延迟消息,订单号为:516516161616 2021-12-09 18:06:10.495 INFO 14628 --- [cTaskExecutor-1] c.k.s.s.service.OmsPortalOrderService : 开始取消订单,订单号为:516516161616
|
可以看到时间间隔就是30秒,这也说明该订单下单30秒后未支付便被自动取消了。