Apache RocketMQ只用了7个类就实现了Nameserver,你知道是如何做到的吗?

共 6966字,需浏览 14分钟

 ·

2021-04-28 02:45

聊技术,不止于技术


Apache RocketMQ作为消息中间件相信大家都比较熟悉了,而NameServer作为RocketMQ的注册中心,也是整个RocketMQ中不可或缺的一部分,可能很多人并不知道这样一个注册中心源码实现其实只有7个类。
今天就让我们来一步步揭秘它是如何实现的。

1

NameServer概述

在文章开始,让我先来介绍一下NameServer组件在整个项目架构中的位置、承担的责任以及与各个组件的关系。
NameServer组件在RocketMQ架构中的位置如下图所示:
NameServer主要承担的责任如下:
(1) Broker的动态注册与发现;
(2) Topic路由信息管理。
NameServer与各个组件的关系介绍如下:
(1) 与Broker的关系
每个Broker会与NameServer建立长连接,并定时发送心跳包,心跳包中包含当前Broker信息(IP+端口等)以及存储的所有Topic信息。
(2) 与Producer的关系
Producer与Nameserver集群中的一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息。这样Producer就可以知道当前发送的Topic存在哪些Broker上,就可以与相应的Broker建立长连接进而发送消息了。
(3) 与Consumer的关系
Consumer与Producer类似,都是与NameServer节点建立长连接,定期从NameServer获取Topic路由信息,进而能够知道当前订阅的Topic存在哪些Broker上,就可以与相应的Broker建立长连接并进行消息消费。

2

让你来设计NameServer

该如何考虑?

RocketMQ中又是如何实现的呢?

现在我们知道了NameServer的主要功能以及与各个组件的关系,那么让你来设计这样一个组件,你会怎么考虑呢?
让我们来想一想。
(1) 首先,我们需要有网络连接及请求处理的功能,这是最基本的功能。作为NameServer,我们要能够处理Broker、Producer及Consumer的连接建立请求,以及具体的请求逻辑处理功能。
(2) 其次,我们需要有连接管理功能。要能够管理各个组件的长连接,并对异常连接进行处理。
(3) 另外一个很重要的功能是路由信息管理能力。我们能够对Broker发送过来的路由信息进行保存,能够根据Topic查询出相应的路由信息并返回给Producer或Consumer。
(4) 最后,我们需要有配置能力,能够配置NameServer的各个参数。
可以看到NameServer要实现的功能并不多,实际源码实现文章标题也已经告诉你了,只有7个类。
下面我们就直接来看看上述列的功能点,分别在RocketMQ中是如何实现的,对应的是哪些类。
(1) 网络连接处理功能实现
RocketMQ的网络处理都是通过Netty来进行的,包括网络连接的建立,数据包读写,NameServer的网络处理也是一样通过Netty来进行的,所以网络通信主要交给Netty来处理,而具体的请求处理逻辑则定义在 DefaultRequestProcessor 类中,里面包括了对各个组件的请求处理逻辑。
(2) 连接管理功能实现
这里的连接管理包括网络连接超时处理、连接异常断开处理等功能。
NameServer没有专门的连接管理类,实际上NameServer只管理了Broker的连接,对于Producer及Consumer的连接都没有进行管理。
NameServer对Broker的连接管理在 RouteInfoManager#scanNotActiveBroker 方法中,从方法名就已经可以看出是对无效的Broker连接进行了处理,在这里是超时没有发送心跳信息的连接,此方法通过定时任务的方式来调用。同时还有一个 BrokerHousekeepingService 类,用来在Broker连接主动断开时进行相应的处理。无论是连接超时还是Broker端主动断开的连接,对这些异常连接的处理实际上就是将相应失效的Broker信息及路由信息从内存中删掉。
(3) Topic路由信息管理
RouteInfoManager 类中定义了相应的Map结构,用来存储Broker信息、Topic路由信息等内容,并提供了存取方法。
(4) 配置功能。
配置类有 KVConfigManager 和 KVConfigSerializeWrapper
(5) 启动及控制类
作为一个独立的应用,肯定需要有一个启动类,来进行配置初始化及应用启动功能,在这里是 NamesrvStartup 类。
同样我们需要有一个总的控制器类,来作为所有方法功能的调用入口,这里是 NamesrvController 类。
现在让我们来总结一下:
NameServer中网络请求处理逻辑为类 DefaultRequestProcessor 负责。
路由信息管理为 RouteInfoManager 类来负责,同时这个类里还有负责Broker连接管理的方法。
Broker的主动连接断开处理逻辑在 BrokerHousekeepingService 类中。
配置相关类为 KVConfigManager KVConfigSerializeWrapper 。
启动及统一入口类为 NamesrvStartup 和 NamesrvController 类。
总共7个类,就能够实现NameServer的所有功能,而且各个类分工明确,各司其职,很好的体现了面向对象的思想。
细心的同学可能发现源码中有8个类,多了一个 ClusterTestRequestProcessor 类,该类其实继承了 DefaultRequestProcessor ,也是数据网络请求逻辑处理的类,就没有单独算上去。

3

一个具体请求示例,

来看看NameServer的源码实现


下面我们以Broker的注册请求处理为例,看看NameServer在源码中是如何处理相应的请求的,以求让大家有个直观的了解(Broker在启动的时候会发送注册请求到NameServer,启动后也会定时发送注册信息,以更新相应的Topic信息)。
DefaultRequestProcessor#processRequest 为网络请求处理的入口方法,这里我们主要关注Broker注册请求处理,源码如下:
public RemotingCommand processRequest(ChannelHandlerContext ctx,        RemotingCommand request) throws RemotingCommandException {
if (ctx != null) { log.debug("receive request, {} {} {}", request.getCode(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), request); }

switch (request.getCode()) { …… //Broker注册请求处理,可以看到对于不对版本的Broker调用了不同的方法进行处理,测试中版本大于3.0.11,所以我们直接看registerBrokerWithFilterServer方法调用 case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } …… case RequestCode.GET_ROUTEINFO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request); …… default: break; } return null; }
让我们进一步看Broker的注册请求处理逻辑,在 DefaultRequestProcessor#registerBrokerWithFilterServer 方法中:
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)        throws RemotingCommandException {        //构建了请求返回信息        final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);        final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();        //解码Broker注册请求头信息,头信息里包含了Broker地址、名称、brokerId等信息        final RegisterBrokerRequestHeader requestHeader =            (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
if (!checksum(ctx, request, requestHeader)) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("crc32 not match"); return response; }
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
if (request.getBody() != null) { try { //解码Broker注册请求体信息,请求体中包含了所有的Topic信息 registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed()); } catch (Exception e) { throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e); } } else { registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0)); registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0); } //调用了RouteInfoManager#registerBroker方法,存储Broker相关信息到路由管理类中 RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker( requestHeader.getClusterName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerName(), requestHeader.getBrokerId(), requestHeader.getHaServerAddr(), registerBrokerBody.getTopicConfigSerializeWrapper(), registerBrokerBody.getFilterServerList(), ctx.channel()); //构建返回请求 responseHeader.setHaServerAddr(result.getHaServerAddr()); responseHeader.setMasterAddr(result.getMasterAddr());
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG); response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
可以看到,对于Broker注册请求,NameServer将相信的注册信息都存储到了 RouteInfoManager 类中,这样对于Producer或Consumer获取Topic路由信息的请求,就可以通过 RouteInfoManager 类来进行查找并返回了,这里的逻辑在源码中也很清晰,大家可以进一步看源码。




写在最后

今天带大家学习了RocketMQ的注册中心NameServer的相关设计及实现,其实源码实现并不难,大家看一看都能懂,更重要的是要了解背后的设计思想。

网络连接及请求处理的能力是注册中心首先需要具备的功能。

其次需要有连接管理的能力,能够对超时连接,断开的连接进行处理。

路由信息管理能力是Nameserver的核心业务能力。

为了满足软件的灵活性,我们还需要有配置功能。

面向对象思想,类功能的合理分工,是所有软件开发都需要考虑的。

希望今天的内容能对大家有所帮助。


推荐阅读:
《软件项目管理的那些事儿》



聊技术,不止于技术。

在这里我会分享技术文章、管理知识以及个人的思想感悟,欢迎点击关注。


浏览 50
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报