NameServer剖析

共 12038字,需浏览 25分钟

 ·

2021-11-03 17:13

前言

到现在为止,RocketMQ已经更了5篇文章:回过头看,里面基本就是跟了跟源码,并没有对重要的知识做剖析,为了让文章不太臃肿,所以我将跟源码和知识的剖析分开来写,今天先来剖析NameServer(以下简称namesrv)。
  • 一、namesrv的作用

通过RocketMQ的NameServer(路由中心)我们知道,namesrv的主要作用是为消息生产者和消息消费者提供关于主题(topic)的路由信息,因此namesrv不仅需要存储路由元信息,还要管理broker节点,包括路由注册路由删除等功能。
  • 1.1、路由元信息

找到namesrv的路由实现类,它位于namesrv模块下,路径为org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager,看下RouteInfoManager类中的属性
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;    private final ReadWriteLock lock = new ReentrantReadWriteLock();    private final HashMap> topicQueueTable;    private final HashMap brokerAddrTable;    private final HashMap> clusterAddrTable;    private final HashMap brokerLiveTable;    private final HashMap/* Filter Server */> filterServerTable;
  • BROKER_CHANNEL_EXPIRED_TIME:broker启动时会向集群中所有的namesrv发送心跳,之后每隔30s再次发送,如果namesrv在连续120s内没有收到broker发送的心跳,那么此broker就会被namesrv移除。

  • lock:读写锁,读写路由信息时候是需要加锁的,不然并发访问时会有问题。

  • topicQueueTable:topic消息队列路由信息,消息发送时根据路由表进行负载均衡。

  • brokerAddrTable:broker的基础信息,包含brokerName、所属集群名称、主从broker的地址。

  • clusterAddrTable:broker集群信息,存储集群中所有broker的名称。

  • brokerLiveTable:broker状态信息,namesrv每次收到broker发送的心跳包时都会更新该信息。

  • filterServerTable:broker上的FilterServer列表,用于类模式消费过滤。

通过代码分析可知,其实namesrv里的核心也就是五个HashMap,这五个map中涉及三个类
  • QueueData:见名猜意,队列数据,该类是topic对应的队列信息的抽象

c919c18b92334a998d4e714fe7ca414b.webp

brokerName:broker的名称
readQueueNums:读队列个数,一个broker默认为每个topic创建4个读队列writeQueueNums:写队列个数, 一个broker默认为每个topic创建4个写队列perm:读写权限,一般设置为6,6:同时支持读写, 4:禁写,2禁读topicSynFlag:topic同步标记,同步复制还是异步复制
  • BrokerData:broker数据的抽象

36d6049de3a9e5236a379c2f916f1e66.webp

cluster:broker集群的名称
brokerName:broker的名称

brokerAddrs:broker的ip集合,为什么是集合,因为互为主从的broker的brokerName是相同的,key是brokerId(0表示master,大于0表示slave),value是broker的ip地址。

:一个broker集群包含多个broker主从,一个broker主从中的主和从的brokerName是一样的。

random:随机数生成器,当一条消息发送到broker集群,就是根据随机数生成来选择消息存储到哪个broker实例上的。
  • BrokerLiveInfo:存活的broker信息,就是和namesrv有心跳连接的broker

182f14c875e8e3bf5314685fbdeac6c9.webp

lastUpdateTimestamp:最近一次心跳的时间戳dataVersion:版本号,topic的配置信息如果有改动,此版本号会+1,channel:当前broker和namesrv的socket连接haServerAddr:当前主从节点中主节点(master)的ip下面我们通过debug来看看这五个map中的内容,debug模式启动本地的namesrv,将centos中rocketmq的集群改为本地namesrv的地址即可

:这里是为了看map中的集群信息,所以利用了centos中搭建的mq集群。

b24cb787b0f9672c7e56977e299e580e.webp

然后在org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest中打上断点,这个就是namesrv处理来自broker、producer、namesrv请求的类

9d5fff70a6fdfdf9b76a8e7f46e72381.webp

上面说了,broker是每隔30s向namesrv发送一次心跳,那么我们就在心跳请求的过程中窥探下那五个map中的内容

6cdad1cbd36e80ebc0687a0102c184b1.webp

可以看到,请求进来了,接着让它进入org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#registerBrokerWithFilterServer中的第219行停住,你就可以清晰的看到里面的内容了。

6bbf26fd2c2b56655f6a4a2ed3d04ec4.webp

  • 1.2、路由注册

在1.1中debug心跳的时候会发现请求进入了org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest方法中的REGISTER_BROKER(broker注册)分支,所以说RocketMQ的路由注册其实就是通过心跳功能实现的,broker会每隔30s向namesrv发送一次心跳,证明自己还活着(有点续命的意思),那broker是怎么发送心跳的?broker启动过程中调用的方法如下:
  • org.apache.rocketmq.broker.BrokerStartup#main

  • org.apache.rocketmq.broker.BrokerStartup#start

  • org.apache.rocketmq.broker.BrokerController#start

我们看下org.apache.rocketmq.broker.BrokerController#start方法

9e24870698087be41c4449bd862d8bcc.webp

其实BrokerController中第814行到823行的定时任务就是心跳的核心代码,brokerConfig.getRegisterNameServerPeriod()就是30s的时间间隔

309fdbdc50dc09e34207151c0d0c081c.webp

当然,此时间可以配置,默认是30s。定时任务中执行的方法其实就是注册的核心方法了,看下注册方法的调用过程:
  • org.apache.rocketmq.broker.BrokerController#registerBrokerAll

  • org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll

  • org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll

其中registerBrokerAll和doRegisterBrokerAll主要是做了一些校验,封装了一些参数(这里就不赘述了,不然太臃肿),核心的注册逻辑其实是在registerBrokerAll方法,见注释
public List registerBrokerAll(        final String clusterName,        final String brokerAddr,        final String brokerName,        final long brokerId,        final String haServerAddr,        final TopicConfigSerializeWrapper topicConfigWrapper,        final List filterServerList,        final boolean oneway,        final int timeoutMills,        final boolean compressed) {
final List registerBrokerResultList = Lists.newArrayList();        //获取所有namesrv的地址:ip端口 List nameServerAddressList = this.remotingClient.getNameServerAddressList();        //遍历namesrv集合 if (nameServerAddressList != null && nameServerAddressList.size() > 0) {            //封装请求头 final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();            //broker地址 requestHeader.setBrokerAddr(brokerAddr);            //brokerId,0:master,大于0:slave requestHeader.setBrokerId(brokerId);            //broker名称 requestHeader.setBrokerName(brokerName);            //所属集群的名称 requestHeader.setClusterName(clusterName);            //当前broker所属主从中主节点的地址 requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed);            ////封装请求体 RegisterBrokerBody requestBody = new RegisterBrokerBody();            /**主题配置,topicConfigWrapper内部封装的是topicConfigManager中的            topicConfigTable,内部存储的是broker启动时默认的一些topic,            ,MixAll.SELF_TEST_TOPIC、MixAll.DEFAULT_TOPIC(AutoCreateTopic            -Enable=true)、MixAll.BENCHMARK_TOPIC、MixAll.OFFSET_MOVED_EVNET、            BrokerConfig中的brokerClusterName、brokerName。Broker中Topic            默认存储在${Rocket_Home}/store/config/topic.json中*/ requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try {                            //通过netty向namesrv发送网络请求,注册broker RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { registerBrokerResultList.add(result); }
log.info("register broker to name server {} OK", namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); }
try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } }
return registerBrokerResultList; }
接下来本地启动broker,debug跟下看看它给namesrv发送请求的过程

注:为了简洁明了,直接将断点打到

org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll方法中,着重看下封装的参数即可

4bcc0a672de4974469b7db9921cc5ab0.webp

放行请求,它将到达namesrv的org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest方法

9fbae48fee55fd1883d4e5cbf2648deb.webp

注:broker每隔30s会发一个心跳,所以你会看到请求不断地进入

org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll方法,为了排除干扰,我们将心跳时间改长点,然后重启broker

2eae37e3e58ba323443b6c0e5c38cf7c.webp

接下来就详细看下namesrv是怎么处理心跳包的

e2a620609083d00c0489eedb0d4679a1.webp

接着看org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#registerBrokerWithFilterServer方法先校验请求头和请求体

8bacfd9df21e08dcbb3be5df6a4ec871.webp

接着调用org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker方法,核心逻辑来了先加锁(读写锁),避免并发操作路由表引起问题

c75f77e61cc44ae74f0699135ff00949.webp

然后根据集群名称(clusterName)从clusterAddrTable(五个map中的其中一个)取brokerNames,可以看到这里取出来的是一个,就是我本地起的broker,然后将传过来的brokerName添加到brokerNames

c9752b5dd281354a8610aa3af119ad62.webp

接着默认不是第一次注册,再根据传过来的brokerName从brokerAddrTable中查询brokerData,如果取出来是空,就以传过来的brokerName为key,构造一个brokerData放入brokerAddrTable中;如果存在,直接替换原先的,然后把registerFirst标识置为false,表示非第一次注册。

e92da397d7299bd14ec6e3d48eef6a40.webp

接着判断当前broker是不是master(就是主从中的主),如果是主节点,在判断,如果topic的配置有更改或者是第一次注册,则需要创建或更新topic路由元数据,填充topicQueueTable,其实就是为默认topic自动注册路由信息,其中包括MixAll.DEFAULT_TOPIC的路由信息,当消息生产者发送topic时,如果该主题未创建并且BrokerConfig中的autoCreateTopicEnable为true时,将返回MixAll.DEFAULT_TOPIC的路由信息。

69c583acc46c68d1e54da976d92653f4.webp

然后根据broker的地址去brokerLiveTable(存活的broker信息表)中取broker最近一次的心跳信息,如果取出来是空,打印一条新注册的日志。然后检查有没有注册broker的过滤器

de1e86e3ff65285903409ecad5aa6245.webp

如果当前broker是从节点,则需要查找该broker的master的节点信息,并更新对应的masterAddr和haServerAddr属性

37f0a08309977c90d52db26806711027.webp

最后注册完,解锁,完事

8922ae4de789907a54427fb3393f924f.webp

总结:namesrv和broker保持长连接,broker的存活状态存储在brokerLiveTable中,namesrv每收到一个心跳包,就会更新brokerLiveTable中broker的状态信息以及其他路由表(topicQueueTable、brokerAddrTable、filterServerTable),在更新这些路由表时使用了读写锁,允许多个消息发送者并发读,保证消息发送时的高并发,但是同一时刻namesrv只处理一个broker心跳请求,多个心跳请求串行执行,这是读写锁的经典使用场景
  • 1.3、路由删除

经过上面的分析,我们已经知道了broker每隔30s会向namesrv发送一个心跳包,那如果broker宕机,namesrv就无法再收到此broker发送的心跳包,namesrv是怎么删除下线的broker的?在1.1介绍路由元信息说了,namesrv如果连续120s没有收到broker的心跳包,就认为它已经下线,移除并关闭和此broker的连接,同时更新路由表(就是那五个map),那namesrv是怎么实现这个功能的呢?看下namesrv启动过程中的类调用关系
  • org.apache.rocketmq.namesrv.NamesrvStartup#main

  • org.apache.rocketmq.namesrv.NamesrvStartup#main0

  • org.apache.rocketmq.namesrv.NamesrvStartup#start

  • org.apache.rocketmq.namesrv.NamesrvController#initialize

NamesrvControllerinitialize方法开启了一个叫做scanNotActiveBroker的定时任务,它每隔10s执行一次,从名字就可以看出这个定时任务是用来扫描不再存活的broker的

e23a7eedde2229ba5cdc2a296ff7ca79.webp

看下scanNotActiveBroker方法的逻辑
public void scanNotActiveBroker() {        Iterator> it = this.brokerLiveTable.entrySet().iterator();        while (it.hasNext()) {            Entry next = it.next();            long last = next.getValue().getLastUpdateTimestamp();            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {                RemotingUtil.closeChannel(next.getValue().getChannel());                it.remove();                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());            }        }    }
你会发现这个方法逻辑很简单,就是遍历brokerLiveTable,判断每一个brokerLiveInfo的lastUpdateTimestamp和当前时间戳的差距是不是大于120s,如果是,就关闭和此broker的socket连接,并将此brokerLiveInfo从brokerLiveTable中删除,然后删除与该broker相关的路由信息,路由的具体删除逻辑在onChannelDestroy方法中

根据brokerAddress从brokerLiveTable、filterServerTable移除

4b8f67ebf9593f23acb4bf10d2033dc2.webp

遍历brokerAddrTable,从BrokerData的brokerAddrs中找到具体的broker,从brokerData中移除,如果移除后在brokerData中不再包含其他broker,则在brokerAddrTable中移除该brokerName对应的条目。

f283dd2ade09bf3b2fd8ec38e41f98ad.webp

根据brokerName从clusterAddrTable中找到broker并从集群中删除,如果移除后,集群中不包含任何的broker,则将该集群从clusterAddrTable中移除。

0aaebd6babcc7dc1c3512e3a57926c78.webp

根据brokerName,遍历所有topic的队列,如果队列中包含了当前broker的队列,则移除,如果topic只包含待移除broker的队列的话,从路由表中删除该topic。

8cc8caf28d52c7a1f574d2e135d6bddf.webp

  • 1.4、路由发现

通过1.3的分析可知,路由信息是在不断变化的,有新broker加入或者老broker下线都会引起路由表的变动,那客户端(producer、consumer)是怎么知道最新的路由信息的?是namesrv定时推给客户端吗?其实不是的,是客户端根据topic主动定时的到namesrv拉取的,对应的requestCode是RequestCode.GET_ROUTEINTO_BY_TOPIC

b258d08ea6c9de3ee21ba5692aee086c.webp

调用的方法是org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,        RemotingCommand request) throws RemotingCommandException {        final RemotingCommand response = RemotingCommand.createResponseCommand(null);        final GetRouteInfoRequestHeader requestHeader =            (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);        //1:        TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());        //2:        if (topicRouteData != null) {            if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {                String orderTopicConf =                    this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,                        requestHeader.getTopic());                topicRouteData.setOrderTopicConf(orderTopicConf);            }
byte[] content = topicRouteData.encode(); response.setBody(content); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } //3: response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; }
方法主要逻辑如下:
  • 调用pickupTopicRouteData方法,从路由表topicQueueTable、brokerAddrTable、filterServerTable中分别填充到TopicRouteData中的queueData、brokerDatas、filterServerTable属性中。

  1. List:topic队列元数据

  2. List:topic分布的broker元数据

  3. filterServer:broker上过滤服务器地址列表

  • 如果找到topic对应的路由信息并且该topic为顺序消息,则从NameServerKVconfig中获取关于顺序消息相关的配置填充路由信息。

  • 如果找不到路由信息就返回ResponseCode.TOPIC_NOT_EXIST

  • 二、总结

今天算是详细的分析了namesrv的功能,包括路由元数据、路由注册、路由删除路由发现四大模块,用张图形象的表示如下:

34b211461931633aa0273ba512908067.webp

浏览 60
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报