Rocketmq源码分析10:consumer 启动流程
共 21796字,需浏览 44分钟
·
2021-04-28 00:28
注:本系列源码分析基于RocketMq 4.8.0,gitee仓库链接:https://gitee.com/funcy/rocketmq.git.
前面分析了producer
发送消息的流程,本文我们来分析consumer
消费消息的流程。
consumer
消费消息的demo
为org.apache.rocketmq.example.simple.PushConsumer
,代码如下:
public class PushConsumer {
public static void main(String[] args)
throws InterruptedException, MQClientException {
String nameServer = "localhost:9876";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20181109221800");
// 注册监听器,监听消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 这里获得了消息
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
consumer
使用起来还是挺简单的,先是创建了一个DefaultMQPushConsumer
对象,然后配置了一些属性,比较关键的就是注册消息监听器(在这个监听器里会获取消息),之后就调用start()
方法启动consumer
.
接下来我们就来分析这块的消费过程。
1. 构造方法:DefaultMQPushConsumer
consumer
的处理类为DefaultMQPushConsumer
,我们先来看看DefaultMQPushConsumer
的构造方法:
public DefaultMQPushConsumer(final String consumerGroup) {
// 这里指定了队列分配策略
this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
}
public DefaultMQPushConsumer(final String namespace, final String consumerGroup,
RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
在构造方法中,就只是做了一些成员变量的赋值操作,比较关键的是分配消息队列的策略:allocateMessageQueueStrategy
,如果指定,默认就使用AllocateMessageQueueAveragely
,即从各队列平均获取消息。
2. 启动consumer
:DefaultMQPushConsumer#start
consumer
的启动方法为DefaultMQPushConsumer#start
,代码如下:
public void start() throws MQClientException {
setConsumerGroup(
NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
// 启动
this.defaultMQPushConsumerImpl.start();
// 消息轨迹相关内容,我们不关注
if (null != traceDispatcher) {
...
}
}
继续进入DefaultMQPushConsumerImpl#start
:
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info(...);
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 客户端
this.mQClientFactory = MQClientManager.getInstance()
.getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 设置负载均衡相关属性
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(
this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
// 消息模式:广播模式存在本地,集群模式存在远程(broker)
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 加载消费信息的偏移量
this.offsetStore.load();
// 根据客户端实例化不同的consumeMessageService:顺序消息与并发消息
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService = new ConsumeMessageOrderlyService(this,
(MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService = new ConsumeMessageConcurrentlyService(this,
(MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
// 注册消费组
boolean registerOK = mQClientFactory.registerConsumer(
this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(
defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException(...);
}
// 启动
mQClientFactory.start();
log.info(...);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException(...);
default:
break;
}
// 更新 topic 的信息,从nameServer获取数据
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
// 发送心跳,发送到所有的broker
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 负载均衡
this.mQClientFactory.rebalanceImmediately();
}
这个方法比较长,整个consumer
的启动流程都在这里了,咱们挑重点说,来总结下这个方法做了什么。
获取客户端 mQClientFactory
,类型为org.apache.rocketmq.client.impl.factory.MQClientInstance
,如果对producer
还有印象的话,我们就会发现,producer
中的mQClientFactory
的类型也是它区分广播模式与集群模式的 offsetStore
,所谓的offsetStore
,就是一存储器,用来存储当前消费者消费信息的偏移量。在广播模式中,该偏移量保存在本地文件中,而在集群模式中,该偏移量保存在远程broker
中,广播模式与集群模式,我们后面再详细分析根据客户端实例化不同的 consumeMessageService
,这里用来区分顺序消息与并发消息,依然是后面再分析启动 mQClientFactory
,也就是启动客户端更新 topic
信息、发送心跳信息到broker
、处理负载均衡功能
以上就是DefaultMQPushConsumerImpl#start
方法所做的的主要工作了。实际上,上面的1
,2
,3
点都是一些配置工作,这些配置对应的服务是在mQClientFactory.start()
方法中启动的,我们继续。
3. 启动mQClientFactory
:MQClientInstance#start
我们来看看mQClientFactory
的启动流程,进入MQClientInstance#start
:
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 获取 nameServer 的地址
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// 启动客户端的远程服务,这个方法会配置netty客户端
this.mQClientAPIImpl.start();
// 启动定时任务
this.startScheduledTask();
// pull服务,仅对consumer启作用
this.pullMessageService.start();
// 启动负载均衡服务,仅对consumer启作用
this.rebalanceService.start();
// 启用内部的 producer
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException(...);
default:
break;
}
}
}
在producer
的启动过程中,也会调用这个方法,前面我们已经分析过了一波了,这次我们在consumer
的角度再来分析这个方法。
该方法所做的工作如下:
获取 nameServer
的地址启动客户端的远程服务,这个方法会配置 netty
客户端启动定时任务 启动拉取消息服务 启动负载均衡服务
上面的1
,2
与producer
的流程并无区别,就不再分析了,我们来看看定时任务的启动,进入方法MQClientInstance#startScheduledTask
:
private void startScheduledTask() {
...
// 持久化消费者的消费偏移量,每5秒一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 省略其他定时任务
...
}
这个方法中还启动了其他一些定时任务,这里我们重点关注执行MQClientInstance#persistAllConsumerOffset()
方法的定时任务,该定时任务会持久化当前消费者消费消息的偏移量,在本节我们先对这个定时任务有个印象,在分析偏移量持久化
一节再详细分析持久化流程。
我们再回到MQClientInstance#start
的流程,第4与第5步,主要是启动了两个服务:pullMessageService
与rebalanceService
,这个类的信息如下:
/**
* PullMessageService
*/
public class PullMessageService extends ServiceThread {
...
}
/**
* RebalanceService
*/
public class RebalanceService extends ServiceThread {
...
}
这两个类都是ServiceThread
的子类,这两个类的start()
方法也都是来自于ServiceThread
:
public abstract class ServiceThread implements Runnable {
// 省略其他代码
...
/**
* start() 方法
*/
public void start() {
log.info(...);
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
}
}
从代码来看,ServiceThread
实现了Runnable
接口,在其start()
方法中,启动了一个线程,线程的执行逻辑正是来自于其子类的run()
方法,因此我们要看pullMessageService
与rebalanceService
的start()
方法执行逻辑,只需要看对应类的run()
方法即可。
到此为止,consumer
的启动就已经完成了,各项服务也启动起来了,而consumer
拉取消息也正是由这些服务的配合处理的,接下来我们就来分析这些服务做了什么。
限于篇幅,本文就先到这里了,下篇继续。
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。
本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!