《RabbitMQ》| 解决消息延迟和堆积问题

小菜良记

共 4233字,需浏览 9分钟

 ·

2021-11-10 07:27

大家好,我是小菜。一个希望能够成为 吹着牛X谈架构 的男人!如果你也想成为我想成为的人,不然点个关注做个伴,让小菜不再孤单!

本文主要介绍 RabbitMQ的常见问题

如有需要,可以参考

如有帮助,不忘 点赞

微信公众号已开启,小菜良记,没关注的同学们记得关注哦!

  • 消息可靠性问题:如何确保发送的消息至少被消费一次?
  • 延迟消息问题:如何实现消息的延迟投递?
  • 消息堆积问题:如何解决数百万级以上消息堆积,无法及时消费问题?

我们在上篇已经说明了如何解决消息丢失的问题,也就是保证了消息的可靠性,那么其余两个问题同样重要,这篇我们将讲述其余两个问题的解决方式~!

消息丢失解决方案:《RabbitMQ》 | 消息丢失也就这么回事

一、延迟消息

延迟消息 字面意思就是让延迟接收消息,那么如何能让消息延迟到达?这就是我们要思考解决的问题,在了解延迟队列之前我们需要先明白 RabbitMQ 中的两个概念

  • 死信交换机
  • TTL

1)死信交换机

死信(dead letter),也就是废弃已死亡的消息,那什么情况下一个普通的消息能够成为死信?需要符合以下三个条件:

  1. 消费者使用 basic.rejectbasic.nack 声明消费失败,并将消息的 requeue 参数设置为 false
  2. 消息是一个过期消息,超时后无人消费
  3. 要投递的队列消息堆积满了,最早的消息就会成为死信

死信交换机 便是 死信 的归属。

如果一个队列配置了 dead-letter-exchange 属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为 死信交换机 - DLXDead Letter Exchange

步骤:当生产者正常投递到队列(simple.queue)中,如果消费者从队列(simple.queue) 消费消息却声明了 reject,那并且队列绑定了死信交换机(dl.queue),那么这个时候成为死信的消息就会投递到这个死信队列(dl.queue)中。

79c6d20000294d5b4f43d615f6bb3d1a.webp死信投递过程

正常队列 --> 死信队列 的过程,我们必须声明两个关键信息

  • 死信交换机的名称
  • 死信交换机与死信队列绑定的路由key

而这两个信息也是我们投递消息的基础配置。

接下来我们简单模拟一下 条件1 所产生的场景

1、首先声明一个死信交换机和死信队列

我们这边是使用简单的注解方式直接生成

1ceec37646e579d400186db2c4379c4f.webp生成死信交换机和死信队列

通过 RabbitMQ 控制台界面可以看出已经成功生成

582f1eb00e613a28b63268a141dbdff9.webp
2、声明正常使用交换机与队列

然后这个时候我们就可以创建一个正常使用的交换机与队列,并指明死信交换机

27180e271c8609f49335e4337b357445.webp

同样可以通过控制台查看创建状态

27180e271c8609f49335e4337b357445.webp

其中是否有声明死信交换机我们可以通过队列的 DLXDLK 标志判断

3、模拟拒收

然后我们现在通过代码模拟客户端拒绝消息的场景

1)消息发送

3e35bbfb18fcc2cd1863ee856f4c44e8.webp

2)消息接收

3e35bbfb18fcc2cd1863ee856f4c44e8.webp

查看控制台,结果如下:

2021-11-06 23:56:52.095  INFO 2112 --- [ntContainer#0-1] c.l.m.c.listener.SpringRabbitListener    : 正常业务交换机 | 接收到的消息 : [hello]
2021-11-06 23:56:52.118  INFO 2112 --- [ntContainer#1-1] c.l.m.c.listener.SpringRabbitListener    : 死信交换机 | 接收到的消息 : hello

这说明我们死信交换机已经成功发挥作用

2)TTL

以上我们已经成功认识到了 死信交换机 的使用,但是这与我们一开始说的 延迟队列 似乎并没有太大关系,莫急~接下来说到的 TTL(Time-To-Live) 就是用来处理延迟消息的~!

在 TTL 的概念中,如果一个队列中的消息 TTL 结束后仍未被消费,那么这个消息就会自动变为死信,而 TTL 超时情况分为两种:

  1. 消息所在的队列设置了存活时间
  2. 消息本身设置了存活时间
dc5384b28159208eaedb27d28cf8d64a.webp

我们同样进行上述 条件2 的模拟场景

1、声明死信交换机与死信队列(上述已完成)
2、声明延迟队列并指定死信交换机
795338b120a773eb9c239305be8d5ffb.webp

同样控制台查看创建结果,并且我们发现不止有 DLXDLK 标志,还多了个 TTL ,说明该队列是延迟队列

1fec583091dbf2dd996799c498fe6dda.webp
3、模拟消费超时情况

我们往延迟队列中发送一条消息,并且没有消费者进行消费,等待 1 分钟后查看是否能进入 死信队列

93044138bc7ff6a79c01823ac6e4349a.webp

我们已经发送了一条消息到延迟队列并且一分钟后也成功在控制台发现了这条信息已经进入到了死信交换机

2021-11-07 00:01:30.854  INFO 32752 --- [ntContainer#1-1] c.l.m.c.listener.SpringRabbitListener    : 死信交换机 | 接收到的消息 : test ttl-message

以上是配置了队列超时时间,消息本身自然也能配置超时时间,当 消息队列 都存在超时时间时,那么就以最短的 TTL 为准,消息的超时配置如下:

7cfd1269da86b3f6c26b477d550cb226.webp

如上图所示,我们可以利用 Message 这个类来传递消息信息,并设置上超时时间,我们设置的是 5000 ms,等待发送成功后,控制台过5000 ms 也成功打印了死信交换机消费的消息:

2021-11-07 00:03:09.048  INFO 39996 --- [ntContainer#1-1] c.l.m.c.listener.SpringRabbitListener    : 死信交换机 | 接收到的消息 : this is a ttl message

3)延迟队列

我们上述是使用 死信交换机 来间接实现 延迟队列 的效果,但实际在 RabbitMQ 不必如此麻烦,RabbitMQ 已经为我们封装好了插件,我们只需要下载安装即可~

RabbitMQ 插件下载地址

我们进入地址可以发现有许多插件,搜索 delay 关键字找到我们需要的插件进行下载

7cfd1269da86b3f6c26b477d550cb226.webp

下载完后直接上传到 RabbitMQ 的插件目录 - plugins,小菜这边是使用 docker 临时安装测试的,所以已经将该插件目录挂载出来了:

docker run -itd --name rabbitmq -v plugins:/plugins -p 15672:15672 -p 5672:5672 rabbitmq:management

因此我这边直接将插件上传到容器中的 plugins 目录即可~

然后进入到容器中执行以下命令进行插件开启

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
8861bd7af5f3c7356b9ec2217ba8f3c2.webp

并且我们在控制台创建交换机的时候可以看到 type 类型多了个选项

670c7faee20f25aefb05cbe262667e5e.webp

成功执行到这步就说明已经开启了 RabbitMQ 的延迟队列功能

那接下来我们就可以来使用 DelayExchange,首先我们需要了解代码的方式创建延迟交换机:

方式1
6f7d65630f61afc3d9cebeeb9dda5c0b.webp
方式2
fda140282e78b53cfe34d66ba8b72782.webp

当我们万事具备之后就可以来发送消息了

在发送消息的时候,消息头中一定要携带上 x-delay 参数,指定上延迟时间

5261c3ff4e473bc064238a618c202ede.webp

通过这样配置之后,我们可以在控制台看到,经过10秒后 delay.queue 才收到对应消息,然后被对应消费者消费

3)总结

我们上面从 死信交换机TTL延迟队列,一步步认识了如何实现延迟消息的功能,然后我们进行一个小小的总结:

问题1:什么样的消息会成为死信?
  1. 消息被消费者 reject 或返回 nack
  2. 消息超时未及时消费
  3. 消息队列满了
问题2:消息超时的方式
  1. 给队列设置 TTL 属性
  2. 给消息设置 TTL 属性
问题3:如何使用延迟队列
  1. 下载并启用 RabbitMQ 延迟队列插件
  2. 声明一个交换机,并将 delayed 属性设置为 true
  3. 发送消息时,添加 x-delay 头,值为超时时间
问题4:延迟队列的使用场景
  1. 延迟发送短信通知
  2. 订单自动取消
  3. 库存自动回滚

二、惰性队列

讲完延迟队列,我们继续来认识惰性队列

惰性队列之前,我们先抛出一个问题~

RabbitMQ 如何解决消息堆积问题

什么情况下会出现消息堆积问题?

  1. 当生产者生产速度远远消费者消费速度
  2. 当消费者宕机没有及时重启

那么如何解决这个问题?通常思路如下:

  1. 在消费者机器重启后,增加更多的消费者进行处理
  2. 在消费者处理逻辑内部开辟线程池,利用多线程的方式提高处理速度
  3. 扩大队列的容量,提高堆积上限

这几个方式从理论上来说解决消息堆积问题也是没有问题的,但是处理方式不够优雅甚至不够灵活~ 那么除了以上的几种解决方式,我们可以利用 RabbitMQ 中自带的一种队列类型 -- 惰性队列

什么是惰性队列?我们认识一下惰性队列的几个特性:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存中
  • 它支持百万级消息的存储

说到底,就是利用磁盘的缓冲机制,而这种机制的缺点就是消息的时效性会降低,性能受限于磁盘的IO,认识特性和缺点之后,我们便来看看如何创建惰性队列

方式1
e9d57588b40e78909c9cdbaa94a75d47.webp
方式2
290bc69e8db114e3782590c0517cc350.webp
方式3

该方式是直接基于命令行修改将一个正在运行中的队列修改为惰性队列

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  

其中几个命令参数含义如下:

  • rabbitmqctl:命令行工具
  • set_policy:添加一个策略
  • Lazy:策略名称,可以自定义
  • ^lazy-queue$:用正则表达式匹配队列的名称
  • '{"queue-mode":"lazy"}':设置队列为 lazy 模式
  • --apply-to queues:策略的作用对象,是所有的队列

这种惰性队列的方式尽管缺点是消息时效性会降低,但是在某些场景下也不是不能接受,何况它的优点同样明显:

  • 基于磁盘存储,消息上限高
  • 没有间歇性的 page-out,性能稳定

到这里,我们就已经讲述了 RabbitMQ 的常见问题,对于我们来说,普通的开发场景可能比较少遇到这些问题,但是没遇到不等于没有,所以我们还是需要多认识来防患于未然!

不要空谈,不要贪懒,和小菜一起做个吹着牛X做架构的程序猿吧~点个关注做个伴,让小菜不再孤单。咱们下文见!

0517677a2f55515d134bee26036b68b3.webp看完不赞,都是坏蛋

今天的你多努力一点,明天的你就能少说一句求人的话!我是小菜,一个和你一起变强的男人。 💋微信公众号已开启,小菜良记,没关注的同学们记得关注哦!


浏览 65
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报