RabbitMQ 消费消息
《RabbitMQ系列》: 1 . RabbitMQ 与 AMQP 协议
2 . RabbitMQ 消息属性详解 3. Rab bitMQ 可靠性与性能的平衡
对比 Basic.Get 和 Basic.Consume
RabbitMQ
实现了两个不同的 AMQP RPC
命令来获取队列中的消息:Basic.Get
和 Basic.Consume
。
Basic.Get
是一个轮询模型,而 Basic.Consume
是一个推送模型。Basic.Get
不是从服务器获取消息的理想方法。
Basic.Get
当应用程序使用 Basic.Get
请求来获取消息时,每次接收消息就必须发送一个新的请求,即使队列中存在多个消息。
如果在发出 Basic.Get RPC
请求时有一条消息可用,RabbitMQ
将返回 Basic.GetOk
以及消息本身。如下图:
如果队列中没有待处理的消息,则 RabbitMQ
使用 Basic.GetEmpty
进行响应,如下图:
当使用 Basic.Get
时,应用程序需要评估来自 RabbitMQ
的 RPC
响应以确定是否收到了消息。这里的“评估”是指需要应用程序主动判断,需要有代码实现这部分逻辑。
因为使用 Basic.Get
会导致每条消息都会产生与 RabbitMQ
同步通信的开销,这一过程由发送请求帧的客户端应用程序和发送应答的 RabbitMQ
组成。在简单的消息速度测试中,使用 Basic.Consume
至少是使用 Basic.Get
的两倍。
避免使用 Basic.Get
的一个潜在的不太明显的原因是它会影响吞吐量,由于 Basic.Get
的临时性,RabbitMQ
不能以任何方式优化投递过程,因为它永远不知道应用程序何时会请求消息。
Basic.Consume
使用 Basic.Consume RPC
命令来消费消息,在消费者可用时,RabbitMQ
以异步的方式向消费者发送消息。这通常被称为发布——订阅模式。
使用 Basic.Consume
消费消息,当一个客户端发出 Basic.Consume
时,一旦有消息可用时 RabbitMQ
就会进行发送,直到客户端发出一个 Basic.Consume
为止。如下图:
消费者标签
当应用程序发出 Basic.Consume
时会创建一个唯一的字符串,用来标识通过已建立的信道与 RabbitMQ
进行通信的应用程序。这个字符串被称为消费者标签(Consumer Tag
),RabbitMQ
每次都会把该字符串与消息一起发送给应用程序。
如果一个应用程序同时从多个队列中消费消息,消费者标签就非常有用。因为每个收到的消息都在它的方法帧中包含该消息所投递的目标消费者标签。如果应用程序需要对从不同队列接收到的消息执行不同的操作,则可以使用 Basic.Consume
请求中的消费者标签来确定该如何处理消息。在大多数情况下,客户端已经对消费者标签做了封装,所以不必担心它。
通过发送一个 Basic.Cancel RPC
命令,消费者标签可以用来取消从 RabbitMQ
获取消息。
在同步
Basic.Get
和异步Basic.Consume
之间进行选择是在编写消费者应用程序时需要做的几个决策之一。与发布消息时涉及的权衡一样,你为应用程序所做的选择可能会直接影响消息的可靠投递和性能。
优化消费者性能
当发布消息时,对消息的消费在吞吐量与可靠性之间存在一种平衡。RabbitMQ
也提供了一些选项,在弱化消息投递保证的同时提高消息投递的吞吐量。如下图:
使用 no-ack 模式实现更快的吞吐量
在消息消费时,应用程序发送一个 Basic.Consume RPC
请求,与该请求一起发送的还有一个 no-ack
标志。当这个标志被启用时,它会告诉 RabbitMQ
消费者在接收到消息时不会进行确认,RabbitMQ
只管尽快发送它们。但这也是发送消息最不可靠的方式。
重点是要清除,在消费者应用程序之前有多个数据缓冲区接收消息。如下图:
消费者之前存在多个数据缓冲区当 RabbitMQ
通过打开的连接发送消息时,它使用 TCP
套接字连接与客户端信息通信。如果这个连接是打开且可写的,那么 RabbitMQ
假定一切都处于正常工作状态并且成功投递了消息。如果当 RabbitMQ
尝试写入套接字以投递消息时出现了网络问题,操作系统将触发套接字错误从而让 RabbitMQ
知道出现了问题。如果没有发送错误,RabbitMQ
将假定消息投递成功。
通过 Basic.Ack RPC
响应发送的消息确认是客户端让 RabbitMQ
知道已成功接收消息的一种方法,这也是大多数情况下处理消息的方式。但如果关闭消息确认,那么当有新的可用消息时,RabbitMQ
将会发送该消息而不用等待。实际上,只要有新的消息,RabbitMQ
将会持续向消费者发送它们直到套接字缓冲区被填满为止。
在 Linux 中增加接收套接字缓冲区
要增加
Linux
操作系统中接收套接字缓冲区的数量,我们应该增加net.core.rmem_default
和net.core.rmem_max
值(默认是128 KB
)。对于大多数环境来说,16 MB
(16777216
) 应该足够了。大多数Linux
发行版都可以在/etc/sysctl.conf
中更改此值,也可以通过以下命令来手动设置:echo 16777216>/proc/sys/net/core/rmem_default
echo 16777216>/proc/sys/net/core/rmem_max
值得注意的是,当操作系统的套接字接收缓冲区中存放了大量的消息时,如果消费者应用程序发生崩溃,并且套接字关闭时,RabbitMQ
认为它已经发送了这些消息,并且不会收到应该从操作系统读取多少消息的指示。应用程序所面临的风险取决于消息的大小和数量以及操作系统中套接字接收缓冲区的大小。
通过服务质量设置控制消费者预取
AMQP
规范要求信道具有服务质量(Quality Of Service,QoS
)设置,即在确认消息接收之前,消费者可以预先要求接收一定数量的消息。QoS
设置允许 RabbitMQ
通过为消费者预先分配一定数量的消息来实现更高效的消息发送。
与 no-ack=true
的消费者不同,如果消费者应用程序在确认消息之前崩溃,则在套接字关闭时,所有预取的消息将返回到队列。
在协议级别上,可以在信道上发送 Basic.QoS RPC
请求来指定服务质量。作为这个 RPC
请求的一部分,可以指定 QoS
设置是针对其发送的信道还是针对连接上打开的所有信道。Basic.QoS RPC
请求可以随时发送,通常在用户发出 Basic.Consume RPC
请求之前进行发送。
注意
虽然
AMQP
规范要求Basic.QoS
方法同时设置预取总量和预取大小,但如果设置了no-ack
选项,预取大小将被忽略。
过度分配预取总量会对消息吞吐量有负面影响。
一次确认多个消息
使用 QoS
设置的好处之一就是不需要用 Basic.ACK RPC
响应来确认收到的每条消息。相反,Basic.ACK RPC
响应具有一个名为 multiple
的属性,当把它设置为 True
时就能让 RabbitMQ
知道你的应用程序想要确认所有以前未确认的消息。
同时确认多个消息可以使处理消息所需的网络通信量最小化,从而提高消息吞吐量。如果成功地处理了一些消息,并且应用程序在确认它们之前就已经死亡,则所有未确认的消息将返回队列以供其他消费者进行处理。
消费者使用事务
事务处理允许消费者应用程序提交和回滚批量操作。事务可能会对消息吞吐量产生负面影响,但有一个例外,如果你使用 QoS
设置,那么在使用事务来批量确认消息时,实际上可能会看到略微的性能提升。
无论是使用它们进行批量消息确认还是确保在消费消息时可以回滚 RPC
响应,了解事务对性能的真是影响将帮助你在消息可靠投递和消息吞吐量之间找到适当的平衡。
注意,事务不适用于已禁用确认的消费者。
拒绝消息
当消息本身或消息的处理过程出现问题时,RabbitMQ
提供了两种将消息踢回代理服务器的机制:Basic.Reject
和 Basic.Nack
。
消费者可以对消息进行确认、拒绝和否定确认。Basic.Nack
允许一次拒绝多个消息,而 Basic.Reject
一次只允许拒绝一个消息。
Basic.Reject
Basic.Reject
是一个 AMQP
指定的 RPC
响应,用于通知代理服务器无法对所投递的消息进行处理。像 Basic.Ack
一样,它携带由 RabbitMQ
创建的投递标签,用于唯一标识消费者与 RabbitMQ
进行通信的信道上的消息。
当消费者拒绝消息时,可以指示 RabbitMQ
丢弃消息或使用 requeue
标志重新发送消息。当启用 requeue
标志时,RabbitMQ
将把消息放回到队列中并再次处理。redelivered
标志,用于通知消息的下一个消费者它以前已经被投递过。
Basic.Nack
Basic.Nack
是 negative acknowledgment
(否定确认)的缩写,并非 AMQP
规范所定义的行为,但是对 Basic.Ack
多消息处理行为进行了补充。
死信交换器
RabbitMQ
的死信交换器(Dead-Letter eXchange,DLX
)功能是对 AMQP
规范的扩展,是一种可以拒绝已投递消息的可选行为。
尽管听起来像是一种特殊的交换器,但实际上死信交换器是一种正常的交换器。创建它时没有特别的要求也不需要执行特别的操作。使交换器成为死信交换器的唯一要做的事情是在创建队列时声明该交换器将被用作保存被拒绝的消息。一旦拒绝了一个不重新发送的消息,RabbitMQ
将把消息路由到队列的 x-dead-letter-exchange
参数中指定的交换器。
死信交换器与备用交换器
过期或被拒绝的消息通过死信交换器进行投递,而备用交换器则路有那些无法由
RabbitMQ
路由的消息。
除交换器外,死信功能还允许使用预先指定的值覆盖路由键。这样可以允许使用同一个交换器同时处理死信消息和非死信消息,但需要确保死信消息不被投递到相同的队列。设置预定义的路由键需要在声明队列时指定一个额外的参数 x-dead-letter-routing-key
。
根据 AMQP
标准,RabbitMQ
中的所有队列设置都是不可变的,意味着,在队列被声明后它们不能被修改。为了改变队列的死信交换器,必须删除并重新声明它。
控制队列
消费者应用程序有很多不同的应用场景。对于某些应用程序,可以接受多个消费者监听同一个队列,而对于其他一些消费者,一个队列应该只有一个消费者。
RabbitMQ
在创建队列时几乎为任何应用场景提供了足够的灵活性,定义队列时,有多个设置可以确定队列的行为:
- 自动删除自己
- 只允许一个消费者进行消费
- 自动过期的消息
- 保持有限数量的消息
- 将旧消息推出堆栈
按照 AMQP
规范,队列的设置是不可变的,一旦声明了一个队列,就不能改变用来创建它的任何设置。要更改队列设置 ,必须删除并重新创建它。
临时队列
自动删除队列
自动删除的队列可以被创建并且用来处理消息,一旦消费者完成连接和检索消息,在断开连接时队列将被删除。
创建自动删除队列非常简单,只需要在 Queue.Declare RPC
请求中将 auto_delete
标志设置为 Ture
即可。
需要注意的是,任意数量的消费者都可以对自动删除队列进行消费,队列只会在没有消费者监听的时候自行删除。
只允许单个消费者
如果没有在队列上启用 exclusive
设置,那么可以连接到队列并消费消息的消费者数量没有限制。并对能够从队列中接收消息的所有消费者实施轮询(round-robin
)投递行为。
与 auto_delete
参数一样,启用队列的独占属性需要在队列创建时传递参数 exclusive=Ture
。声明为 exclusive
的队列只能被声明时所指定的同一个连接和信道所消费,当创建队列的信道关闭时,独占队列也将自动被删除。
与 auto_delete
队列不同,在信道关闭之前,可以根据需要多次使用和取消 exclusive
队列的消费者。exclusive
队列自动删除行为的发送不会考虑是否已经发出了一个 Basic.Consume
请求,这与 auto-delete
队列不同。
自动过期队列
创建一个自动过期的队列非常简单,在声明队列时使用 x-expires
参数,该参数以毫秒为单位设置队列的生存时间(Time To Live,TTL
)。
自动过期队列有一些严格的规定:
- 队列只有在没有消费者的情况下才会过期。
-
队列只有在
TTL
周期之内没有收到Basic.Get
请求时才会到期。一旦一个Basic.Get
请求中已经包含了一个具有过期值的队列,那么过期设置无效,该队列将不会被自动删除。 -
与任何其他队列一样,不能重新声明或更改
x-expires
的设置和参数。 -
RabbitMQ
不保证删除过期队列这一过程的时效性。
永久队列
队列持久性
在声明队列时将 durable
标志设置为 Ture
,这样在服务器重启之后队列仍然存在。
当消息发布时将 delivery-mode
属性设置为 2 时,消息就会存储在磁盘上。相反,durable
标志告诉 RabbitMQ
希望队列被设置在服务器中,直到 Queue.Delete
请求被调用为止。
队列中消息自动过期
消息级别的 TTL
设置允许服务器端对消息的最大生存时间进行限制。声明队列时同时指定死信交换器和 TTL
值将导致该队列中的已到期的消息成为死信消息。
与消息的过期时间属性相反,x-message-ttl
队列设置强制规定了队列中所有消息的最大生存时间。
最大长度队列
从 RabbitMQ 3.1.0
开始,可以在声明队列时指定最大长度。如果在队列上设置了 x-max-length
参数,一旦达到最大值,RabbitMQ
会在添加新消息时删除位于队列前端的消息。
如果使用死信交换器声明队列,则从队列前端移除的消息可能成为死信。
任意队列设置
队列参数可用于设置高可用性队列、死信交换器、消息过期时间、队列过期时间和队列最大长度。AMQP
规范将队列参数定义为一个表,其中参数值的语法和语义由服务器确定。RabbitMQ
保留了这些参数,并忽略了传入的任何其他参数。队列的保留参数如下:
参数是设置队列级别监控和阈值的有用方法。
记得 转发 、 在看 、 关注 哦!