RabbitMQ延时队列应用场景,学到了!
共 4334字,需浏览 9分钟
·
2021-10-26 19:12
应用场景
我们系统未付款的订单,超过一定时间后,需要系统自动取消订单并释放占有物品
常用的方案
就是利用Spring schedule定时任务,轮询检查数据库
但是会消耗系统内存,增加了数据库的压力、还存在较大的时间误差
解决:rabbitmq
的消息TTL和死信Exchange结合
介绍
1.何为消息TTL、死信
死信:对消息设置的过期时间到了,这个消息还没有被消费就认为这个消息死了,死了的消息会进入死信交换机(Dead Letter Exchanges)
成为死信的三种条件:
一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。(basic.reject/ basic.nack)requeue=false
上面的消息的TTL到了,消息过期了。
队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
消息TTL:消息的TTL就是消息的存活时间
RabbitMQ可以对队列和消息都设置过期时间,但代表的都是一个意思,只要消息在设置时间内没有消费,消息就死了,就被称为死信
如果队列和消息都设置了过期时间,那么就取时间最小的,单个消息的过期时间才是延时队列的关键
2.如何运作
设置队列过期时间
消费者P会通过一个路由键deal.message发送消息给X交换机,然后继续发送给delay queau队列,这个队列比较特殊,设置了过期时间5分钟过期,还设置了x-dead-letter-exchange用于指定下一个接收的交换机,消息过期之后会成为死信直接进入delay.exchange交换机,利用x-dead-letter-routing-key绑定的路由键找到下一个队列,这时候只需要有人监听这个队列。
设置消息过期时间
消费者发送一个消息,设置了5分钟过期时间,最后交给了延时队列,延时队列说消息死了不要乱放,指定了一个死信路由,用于找到下一个队列的路由键,等到五分钟后服务器会自动检查是否过期,过期的话会交给delay.exchange路由,最后再交给delay.message
代码模拟
下订单成功先发动给order-event-exchange
,order-event-exchange
绑定了两个路由键order.create.order
、order.release.order
,根据order.create.order
路由键找到order.delay.queue
队列,这是一个特殊的队列,上图所诉,消息的存活时间为一分钟,消息在order.delay.queue
队列中没人使用变成死信了,交给order-event-exchange
交换机,最后通过order.release.order
绑定关系找到了order.release.order.queue
队列
@Configuration
public class MyMQConfig {
//监听最后一个队列,获取那些过期的订单消息
@RabbitListener(queues = "order.release.order.queue")
public void listerner(OrderEntity orderEntity,Channel channel,Message message) throws IOException {
System.out.println("收到过期订单信息"+orderEntity.getOrderSn());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
//特殊队列
@Bean
public Queue orderDelayQueue() {
Map arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "order-event-exchange");
arguments.put("x-dead-letter-routing-key", "order.release.order");
arguments.put("x-message-ttl", 60000);
Queue orderDelayQueue = new Queue("order.delay.queue", true, false, false, arguments);
return orderDelayQueue;
}
//最后接收死信消息的队列
@Bean
public Queue orderReleaseQueue() {
return new Queue("order.release.order.queue", true, false, false);
}
//事件交换机
@Bean
public Exchange orderEventExchange() {
return new TopicExchange("order-event-exchange", true, false);
}
//绑定order.delay.queue队列和的order-event-exchange交换机的路由键
@Bean
public Binding orderCreateBingding() {
return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null);
}
//绑定order.release.order.queue队列和的order-event-exchange交换机的路由键
@Bean
public Binding orderReleaseBingding() {
return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null);
}
}
测试
@Autowired
RabbitTemplate rabbitTemplate;
@ResponseBody
@GetMapping("/test/createOrder")
public String createOrderTest(){
OrderEntity entity = new OrderEntity();
entity.setOrderSn(UUID.randomUUID().toString());
entity.setModifyTime(new Date());
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",entity);
return "ok";
}
库存解锁实际场景
在库存服务有个stock-event-exchange
交换机,如果我们想要解锁库存,
1、首先订单下成功、库存锁定成功
2、锁定成功就要通过stock.locked
路由键发送一个消息给交换机stock-event-exchange
,消息内容包括哪个订单、哪些商品、多少库存等等
3、交换机通过绑定关系再发送给延时队列stock.delay.queue
4、订单可能需要30分钟才会自动关闭,50分钟之后来检查库存,就会知道订单支付没有
5、50分钟消息没有被消息,就变为死信,通过stock.release
路由键绑定关系交给stock-event-exchange
交换机
6、stock-event-exchange
交换机通过stock.release
路由键绑定关系找到strock.relelase.stock.queue队列
7、所有的解锁库存服务就监听这个队列里的消息,只要这个队列里消息能够到达的都是超时没有支付订单的
下单远程锁定库存,然后将仓库锁定库存的数据发给订单,当在订单下单失败时,由于不是分布式事务,订单回滚,但仓库不回滚,所以订单一失败,就需要通过订单拿到mq中仓库传来的数据通知仓库解锁库存
库存解锁场景:
1、下订单成功,订单过期没有支付被系统自动取消或者用户手动取消,都要解锁库存
2、下订单成功、库存锁定成功,但是业务调用失败导致订单回滚,之前锁定的库存就自动解锁,Seata分布式事务太慢,就要用一段时间后自动解决库存。
3、订单失败,因为锁库存失败有一个商品没有锁成功,导致整个锁库存服务都回滚,
消息队列收到库存消息场景
消息队列收到消息之后
如果没有查到数据库有锁定成功的数据,说明库存锁失败了,锁库存自动回滚,数据库查不到记录无需解锁
如果查到有数据,就说明库存锁定成功了
没有这个订单必须解锁库存
有订单,订单没人支付失效了才能解锁库存
定时关闭订单实际场景
同上原理类似也是利用死信路由
,订单创建后,默认放入延时队列,也就是订单的有效时间,超过这个时间没有支付或者用户主动取消都会导致订单信息进入order.release.order.queue
队列,最后被释放
作者 | cg-ww
来源 | w.cnblogs.com/cg-ww/p/15449767.html