Rocketmq源码分析12:consumer 负载均衡
注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.
接上文,继续分析consumer
消费流程。
5. 如何选择消息队列:RebalanceService
让我们回到PullMessageService#run()
方法:
public class PullMessageService extends ServiceThread {
...
private final LinkedBlockingQueue<PullRequest> pullRequestQueue
= new LinkedBlockingQueue<PullRequest>();
/**
* 将 pullRequest 放入 pullRequestQueue 中
*/
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 从 pullRequestQueue 获取一个 pullRequest,阻塞的方式
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
...
}
PullMessageService
线程获得了pullRequest
后,然后就开始了一次又一次的拉起消息的操作,那这个pullRequest
最初是在哪里添加进来的呢?这就是本节要分析的「负载均衡」功能了。
处理负载均衡的线程为RebalanceService
,它是在MQClientInstance#start
方法中启动的,我们直接进入其run()
方法:
public class RebalanceService extends ServiceThread {
// 省略其他
...
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
在它的run()
方法中,仅是调用了MQClientInstance#doRebalance
方法,我们继续进入:
public void doRebalance() {
// consumerTable 存放的就是当前 consumer
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
在MQClientInstance#doRebalance
方法中,会遍历所有的consumer
,然后调用DefaultMQPushConsumerImpl#doRebalance
方法作进一步的处理,consumerTable
就是用来保存DefaultMQPushConsumerImpl
实例的,继续进入DefaultMQPushConsumerImpl#doRebalance
方法:
@Override
public void doRebalance() {
if (!this.pause) {
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
继续跟进,来到RebalanceImpl#doRebalance
方法:
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
// 客户端负载均衡:根据主题来处理负载均衡
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
/**
* 这就是最张处理负载均衡的地方了
*/
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
// 广播模式:不需要处理负载均衡,每个消费者都要消费,只需要更新负载信息
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
// 更新负载均衡信息,这里传入的参数是mqSet,即所有队列
boolean changed = this
.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info(...);
}
} else {
log.warn(...);
}
break;
}
// 集群模式
case CLUSTERING: {
// 根据订阅的主题获取消息队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 客户端id,根据 topic 与 consumerGroup 获取所有的 consumerId
List<String> cidAll = this.mQClientFactory
.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn(...);
}
}
if (null == cidAll) {
log.warn(...);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// 排序后才能保证消费者负载策略相对稳定
Collections.sort(mqAll);
Collections.sort(cidAll);
// MessageQueue 的负载策略
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 按负载策略进行分配,返回当前消费者实际订阅的messageQueue集合
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error(...);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// 更新负载均衡信息,传入参数是 allocateResultSet,即当前consumer分配到的队列
boolean changed = this
.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(...);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
RebalanceImpl#rebalanceByTopic
方法就是最终处理负载均衡的方法了,在这个方法里会区分广播模式与集群模式的处理。
在广播模式下,一条消息会被同一个消费组中的所有consumer
消费,而集群模式下,一条消息只会被同一个消费组下的一个consumer
消费。
正是因为如此,广播模式下并没有负载均衡可言,直接把所有的队列都分配给当前consumer
处理,然后更新QueueTable
的负载均衡信息;而集群模式会先分配当前consumer
消费的消息队列,再更新QueueTable
的负载均衡信息。
这里我们来看看集群模式,看看它的操作:
strategy.allocate(...)
:按负载均衡策略为当前consumer
分配队列updateProcessQueueTableInRebalance(...)
:更新负载均衡信息。
在rocketMq
中,提供了这些负载均衡策略:
AllocateMessageQueueAveragely
:平均负载策略,rocketMq
默认使用的策略AllocateMessageQueueAveragelyByCircle
:环形平均分配,这个和平均分配唯一的区别就是,再分队列的时候,平均队列是将属于自己的MessageQueue
全部拿走,而环形平均则是,一人拿一个,拿到的Queue不是连续的。AllocateMessageQueueByConfig
:用户自定义配置AllocateMessageQueueByMachineRoom
:同机房负载策略,这个策略就是当前Consumer
只负载处在指定的机房内的MessageQueue
,brokerName
的命名必须要按要求的格式来设置:机房名@brokerName
AllocateMachineRoomNearby
:就近机房负载策略,在AllocateMessageQueueByMachineRoom
策略中,如果同一机房中只有MessageQueue
而没有consumer
,那这个MessageQueue
上的消息该如何消费呢?AllocateMachineRoomNearby
就是扩充了该功能的处理AllocateMessageQueueConsistentHash
:一致性哈希策略
这里我们重点来分析平均负载策略AllocateMessageQueueAveragely
:
public List<MessageQueue> allocate(String consumerGroup, String currentCID,
List<MessageQueue> mqAll, List<String> cidAll) {
// 返回值
List<MessageQueue> result = new ArrayList<MessageQueue>();
// 省略一些判断操作
...
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
// 1. 消费者数量大于队列数量:averageSize = 1
// 2. 消费者数量小于等于队列数量:averageSize = 队列数量 / 消费者数量,还要处理个+1的操作
int averageSize = mqAll.size() <= cidAll.size()
? 1 : (mod > 0 && index < mod
? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
这个方法中,关键的分配方法就在后面几行,如果只看代码,会感觉有点晕,这里我举一个例子来简单解释下:
假设:messageQueue
一共有6个,consumer
有4个,当前consumer
的index
为1,有了这些前提后,接下来我们就来看它的分配过程了。
计算取余操作:
6 % 4 = 2
,这表明messageQueue
不能平均分配给每个consumer
,接下来就来看看这个余数2
是如何处理的计算每个
consumer
平均处理的messageQueue
数量消费者索引 0 1 2 3 处理数量 2 2 1 1 这里需要注意,如果 consumer
数量大于messageQueue
数量,那每个consumer
最多只会分配到一个messageQueue
,这种情况下,余数2
不会进行处理,并且有的consumer
处理的messageQueue
数量为0,同一个messageQueue
不会同时被两个及以上的consumer
消费掉这里的 messageQueue
数量为6,consumer
为4,计算得到每个consumer
处理的队列数最少为1,除此之外,为了实现“平均”,有2个consumer
会需要多处理1个messageQueue
,按“平均”的分配原则,如果index
小于mod
,则会分配多1个messageQueue
,这里的mod
为2,结果如下:分配完每个
consumer
处理的messageQueue
数量后,这些messageQueue
该如何分配呢?从代码来看,分配时会先分配完一个consumer
,再分配下一个consumer
,最终结果就是这样:队列 Q0 Q1 Q2 Q3 Q4 Q5 消费者 C1 C1 C2 C2 C4 C5
从图中可以看到,在6个messageQueue
、4个consumer
、当前consumer
的index
为1的情况下,当前consumer
会分到2个队列,分别为Q2
/Q3
.
将messageQueue
分配完成后,接下来就是更新负载信息了,方法为RebalanceImpl#updateProcessQueueTableInRebalance
:
private boolean updateProcessQueueTableInRebalance(final String topic,
final Set<MessageQueue> mqSet, final boolean isOrder) {
...
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn(...);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
}
// pullRequest 最初产生的地方:mq 不存在,就添加
else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
// 添加 pullRequest
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 发布
this.dispatchPullRequest(pullRequestList);
return changed;
}
这个方法中最最关键的就是pullRequestList
的添加操作了:先遍历传入的MessageQueue
,如果当前consumer
没有消费过该messageQueue
,则添加一个新的pullRequest
到pullRequestList
,之后就是发布pullRequestList
了。
看到这里,我们就应该能明白,最初的pullRequest
就是在这里产生的,而发布pullRequestList
的操作,就是将pullRequest
丢给pullMessageService
线程处理了:
/**
* RebalancePushImpl#dispatchPullRequest:发布pullRequest的操作
*/
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
// 在这里执行pullRequest,其实就是把 pullRequest 添加到
// PullMessageService#pullRequestQueue 中
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
限于篇幅,本文就先到这里了,下篇继续。
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。
本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!