RabbitMQ 消费消息

泥瓦匠攻城狮

共 7037字,需浏览 15分钟

 ·

2023-05-18 16:38

a91964c15dacad9ad140f314b491170d.webp



RabbitMQ系列》: 1 .  RabbitMQ 与 AMQP 协议
2 RabbitMQ 消息属性详解 3. Rab bitMQ 可靠性与性能的平衡


对比 Basic.Get 和 Basic.Consume

RabbitMQ 实现了两个不同的 AMQP RPC 命令来获取队列中的消息:Basic.GetBasic.Consume

Basic.Get 是一个轮询模型,而 Basic.Consume 是一个推送模型。Basic.Get 不是从服务器获取消息的理想方法。

Basic.Get

当应用程序使用 Basic.Get 请求来获取消息时,每次接收消息就必须发送一个新的请求,即使队列中存在多个消息。

如果在发出 Basic.Get RPC 请求时有一条消息可用,RabbitMQ 将返回 Basic.GetOk 以及消息本身。如下图:

f7cc7c613dac99d1550be939a43c509b.webpBasic.Get_1

如果队列中没有待处理的消息,则 RabbitMQ 使用 Basic.GetEmpty 进行响应,如下图:

6a494f4d445f1d7a7e742f683fdf6fa0.webpBasic.Get_2

当使用 Basic.Get 时,应用程序需要评估来自 RabbitMQRPC 响应以确定是否收到了消息。这里的“评估”是指需要应用程序主动判断,需要有代码实现这部分逻辑。

因为使用 Basic.Get 会导致每条消息都会产生与 RabbitMQ 同步通信的开销,这一过程由发送请求帧的客户端应用程序和发送应答的 RabbitMQ 组成。在简单的消息速度测试中,使用 Basic.Consume 至少是使用 Basic.Get 的两倍。

避免使用 Basic.Get 的一个潜在的不太明显的原因是它会影响吞吐量,由于 Basic.Get 的临时性,RabbitMQ 不能以任何方式优化投递过程,因为它永远不知道应用程序何时会请求消息。

Basic.Consume

使用 Basic.Consume RPC 命令来消费消息,在消费者可用时,RabbitMQ 以异步的方式向消费者发送消息。这通常被称为发布——订阅模式。

使用 Basic.Consume 消费消息,当一个客户端发出 Basic.Consume 时,一旦有消息可用时 RabbitMQ 就会进行发送,直到客户端发出一个 Basic.Consume 为止。如下图:

a9e2a4983923b636c36b340485253e16.webpBasic.Consume

消费者标签

当应用程序发出 Basic.Consume 时会创建一个唯一的字符串,用来标识通过已建立的信道与 RabbitMQ 进行通信的应用程序。这个字符串被称为消费者标签(Consumer Tag),RabbitMQ 每次都会把该字符串与消息一起发送给应用程序。

如果一个应用程序同时从多个队列中消费消息,消费者标签就非常有用。因为每个收到的消息都在它的方法帧中包含该消息所投递的目标消费者标签。如果应用程序需要对从不同队列接收到的消息执行不同的操作,则可以使用 Basic.Consume 请求中的消费者标签来确定该如何处理消息。在大多数情况下,客户端已经对消费者标签做了封装,所以不必担心它。

通过发送一个 Basic.Cancel RPC 命令,消费者标签可以用来取消从 RabbitMQ 获取消息。

在同步 Basic.Get 和异步 Basic.Consume 之间进行选择是在编写消费者应用程序时需要做的几个决策之一。与发布消息时涉及的权衡一样,你为应用程序所做的选择可能会直接影响消息的可靠投递和性能。

优化消费者性能

当发布消息时,对消息的消费在吞吐量与可靠性之间存在一种平衡。RabbitMQ 也提供了一些选项,在弱化消息投递保证的同时提高消息投递的吞吐量。如下图:

f7956175a5e35b027984a3b4b8a8e47c.webp消费者性能优化维度
使用 no-ack 模式实现更快的吞吐量

在消息消费时,应用程序发送一个 Basic.Consume RPC 请求,与该请求一起发送的还有一个 no-ack 标志。当这个标志被启用时,它会告诉 RabbitMQ 消费者在接收到消息时不会进行确认,RabbitMQ 只管尽快发送它们。但这也是发送消息最不可靠的方式。

重点是要清除,在消费者应用程序之前有多个数据缓冲区接收消息。如下图:

0ade2c5e328f8203ef57bae3cbc4b701.webp消费者之前存在多个数据缓冲区

RabbitMQ 通过打开的连接发送消息时,它使用 TCP 套接字连接与客户端信息通信。如果这个连接是打开且可写的,那么 RabbitMQ 假定一切都处于正常工作状态并且成功投递了消息。如果当 RabbitMQ 尝试写入套接字以投递消息时出现了网络问题,操作系统将触发套接字错误从而让 RabbitMQ 知道出现了问题。如果没有发送错误,RabbitMQ 将假定消息投递成功。

通过 Basic.Ack RPC 响应发送的消息确认是客户端让 RabbitMQ 知道已成功接收消息的一种方法,这也是大多数情况下处理消息的方式。但如果关闭消息确认,那么当有新的可用消息时,RabbitMQ 将会发送该消息而不用等待。实际上,只要有新的消息,RabbitMQ 将会持续向消费者发送它们直到套接字缓冲区被填满为止。

在 Linux 中增加接收套接字缓冲区

要增加 Linux 操作系统中接收套接字缓冲区的数量,我们应该增加 net.core.rmem_defaultnet.core.rmem_max 值(默认是 128 KB)。对于大多数环境来说,16 MB16777216) 应该足够了。大多数 Linux 发行版都可以在 /etc/sysctl.conf 中更改此值,也可以通过以下命令来手动设置:

        
        echo 16777216>/proc/sys/net/core/rmem_default
echo 16777216>/proc/sys/net/core/rmem_max

值得注意的是,当操作系统的套接字接收缓冲区中存放了大量的消息时,如果消费者应用程序发生崩溃,并且套接字关闭时,RabbitMQ 认为它已经发送了这些消息,并且不会收到应该从操作系统读取多少消息的指示。应用程序所面临的风险取决于消息的大小和数量以及操作系统中套接字接收缓冲区的大小。

通过服务质量设置控制消费者预取

AMQP 规范要求信道具有服务质量(Quality Of Service,QoS)设置,即在确认消息接收之前,消费者可以预先要求接收一定数量的消息。QoS 设置允许 RabbitMQ 通过为消费者预先分配一定数量的消息来实现更高效的消息发送。

no-ack=true 的消费者不同,如果消费者应用程序在确认消息之前崩溃,则在套接字关闭时,所有预取的消息将返回到队列。

在协议级别上,可以在信道上发送 Basic.QoS RPC 请求来指定服务质量。作为这个 RPC 请求的一部分,可以指定 QoS 设置是针对其发送的信道还是针对连接上打开的所有信道。Basic.QoS RPC 请求可以随时发送,通常在用户发出 Basic.Consume RPC 请求之前进行发送。

注意

虽然 AMQP 规范要求 Basic.QoS 方法同时设置预取总量和预取大小,但如果设置了 no-ack 选项,预取大小将被忽略。

过度分配预取总量会对消息吞吐量有负面影响。

一次确认多个消息

使用 QoS 设置的好处之一就是不需要用 Basic.ACK RPC 响应来确认收到的每条消息。相反,Basic.ACK RPC 响应具有一个名为 multiple 的属性,当把它设置为 True 时就能让 RabbitMQ 知道你的应用程序想要确认所有以前未确认的消息。

同时确认多个消息可以使处理消息所需的网络通信量最小化,从而提高消息吞吐量。如果成功地处理了一些消息,并且应用程序在确认它们之前就已经死亡,则所有未确认的消息将返回队列以供其他消费者进行处理。

消费者使用事务

事务处理允许消费者应用程序提交和回滚批量操作。事务可能会对消息吞吐量产生负面影响,但有一个例外,如果你使用 QoS 设置,那么在使用事务来批量确认消息时,实际上可能会看到略微的性能提升。

无论是使用它们进行批量消息确认还是确保在消费消息时可以回滚 RPC 响应,了解事务对性能的真是影响将帮助你在消息可靠投递和消息吞吐量之间找到适当的平衡。

注意,事务不适用于已禁用确认的消费者。

拒绝消息

当消息本身或消息的处理过程出现问题时,RabbitMQ 提供了两种将消息踢回代理服务器的机制:Basic.RejectBasic.Nack

消费者可以对消息进行确认、拒绝和否定确认。Basic.Nack 允许一次拒绝多个消息,而 Basic.Reject 一次只允许拒绝一个消息。

Basic.Reject

Basic.Reject 是一个 AMQP 指定的 RPC 响应,用于通知代理服务器无法对所投递的消息进行处理。像 Basic.Ack 一样,它携带由 RabbitMQ 创建的投递标签,用于唯一标识消费者与 RabbitMQ 进行通信的信道上的消息。

当消费者拒绝消息时,可以指示 RabbitMQ 丢弃消息或使用 requeue 标志重新发送消息。当启用 requeue 标志时,RabbitMQ 将把消息放回到队列中并再次处理。redelivered 标志,用于通知消息的下一个消费者它以前已经被投递过。

Basic.Nack

Basic.Nacknegative acknowledgment(否定确认)的缩写,并非 AMQP 规范所定义的行为,但是对 Basic.Ack 多消息处理行为进行了补充。

死信交换器

RabbitMQ 的死信交换器(Dead-Letter eXchange,DLX)功能是对 AMQP 规范的扩展,是一种可以拒绝已投递消息的可选行为。

尽管听起来像是一种特殊的交换器,但实际上死信交换器是一种正常的交换器。创建它时没有特别的要求也不需要执行特别的操作。使交换器成为死信交换器的唯一要做的事情是在创建队列时声明该交换器将被用作保存被拒绝的消息。一旦拒绝了一个不重新发送的消息,RabbitMQ 将把消息路由到队列的 x-dead-letter-exchange 参数中指定的交换器。

死信交换器与备用交换器

过期或被拒绝的消息通过死信交换器进行投递,而备用交换器则路有那些无法由 RabbitMQ 路由的消息。

除交换器外,死信功能还允许使用预先指定的值覆盖路由键。这样可以允许使用同一个交换器同时处理死信消息和非死信消息,但需要确保死信消息不被投递到相同的队列。设置预定义的路由键需要在声明队列时指定一个额外的参数 x-dead-letter-routing-key

根据 AMQP 标准,RabbitMQ 中的所有队列设置都是不可变的,意味着,在队列被声明后它们不能被修改。为了改变队列的死信交换器,必须删除并重新声明它。

控制队列

消费者应用程序有很多不同的应用场景。对于某些应用程序,可以接受多个消费者监听同一个队列,而对于其他一些消费者,一个队列应该只有一个消费者。

RabbitMQ 在创建队列时几乎为任何应用场景提供了足够的灵活性,定义队列时,有多个设置可以确定队列的行为:

  • 自动删除自己
  • 只允许一个消费者进行消费
  • 自动过期的消息
  • 保持有限数量的消息
  • 将旧消息推出堆栈

按照 AMQP 规范,队列的设置是不可变的,一旦声明了一个队列,就不能改变用来创建它的任何设置。要更改队列设置 ,必须删除并重新创建它。

临时队列
自动删除队列

自动删除的队列可以被创建并且用来处理消息,一旦消费者完成连接和检索消息,在断开连接时队列将被删除。

创建自动删除队列非常简单,只需要在 Queue.Declare RPC 请求中将 auto_delete 标志设置为 Ture 即可。

需要注意的是,任意数量的消费者都可以对自动删除队列进行消费,队列只会在没有消费者监听的时候自行删除。

只允许单个消费者

如果没有在队列上启用 exclusive 设置,那么可以连接到队列并消费消息的消费者数量没有限制。并对能够从队列中接收消息的所有消费者实施轮询(round-robin)投递行为。

auto_delete 参数一样,启用队列的独占属性需要在队列创建时传递参数 exclusive=Ture。声明为 exclusive 的队列只能被声明时所指定的同一个连接和信道所消费,当创建队列的信道关闭时,独占队列也将自动被删除。

auto_delete 队列不同,在信道关闭之前,可以根据需要多次使用和取消 exclusive 队列的消费者。exclusive 队列自动删除行为的发送不会考虑是否已经发出了一个 Basic.Consume 请求,这与 auto-delete 队列不同。

自动过期队列

创建一个自动过期的队列非常简单,在声明队列时使用 x-expires 参数,该参数以毫秒为单位设置队列的生存时间(Time To Live,TTL)。

自动过期队列有一些严格的规定:

  • 队列只有在没有消费者的情况下才会过期。
  • 队列只有在 TTL 周期之内没有收到 Basic.Get 请求时才会到期。一旦一个 Basic.Get 请求中已经包含了一个具有过期值的队列,那么过期设置无效,该队列将不会被自动删除。
  • 与任何其他队列一样,不能重新声明或更改 x-expires 的设置和参数。
  • RabbitMQ 不保证删除过期队列这一过程的时效性。
永久队列
队列持久性

在声明队列时将 durable 标志设置为 Ture,这样在服务器重启之后队列仍然存在。

当消息发布时将 delivery-mode 属性设置为 2 时,消息就会存储在磁盘上。相反,durable 标志告诉 RabbitMQ 希望队列被设置在服务器中,直到 Queue.Delete 请求被调用为止。

队列中消息自动过期

消息级别的 TTL 设置允许服务器端对消息的最大生存时间进行限制。声明队列时同时指定死信交换器和 TTL 值将导致该队列中的已到期的消息成为死信消息。

与消息的过期时间属性相反,x-message-ttl 队列设置强制规定了队列中所有消息的最大生存时间。

最大长度队列

RabbitMQ 3.1.0 开始,可以在声明队列时指定最大长度。如果在队列上设置了 x-max-length 参数,一旦达到最大值,RabbitMQ 会在添加新消息时删除位于队列前端的消息。

如果使用死信交换器声明队列,则从队列前端移除的消息可能成为死信。

任意队列设置

队列参数可用于设置高可用性队列、死信交换器、消息过期时间、队列过期时间和队列最大长度。AMQP 规范将队列参数定义为一个表,其中参数值的语法和语义由服务器确定。RabbitMQ 保留了这些参数,并忽略了传入的任何其他参数。队列的保留参数如下:

c3ee188e4b6c035558b83abf21622a1d.webp保留参数

参数是设置队列级别监控和阈值的有用方法。

37add1b3d59127145dd131de1a3ef274.webp

记得 转发 在看 关注 哦!
浏览 47
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报