Apache RocketMQ只用了7个类就实现了Nameserver,你知道是如何做到的吗?
1
2
让你来设计NameServer
该如何考虑?
3
一个具体请求示例,
来看看NameServer的源码实现
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
……
//Broker注册请求处理,可以看到对于不对版本的Broker调用了不同的方法进行处理,测试中版本大于3.0.11,所以我们直接看registerBrokerWithFilterServer方法调用
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
……
case RequestCode.GET_ROUTEINFO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
……
default:
break;
}
return null;
}
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
//构建了请求返回信息
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
//解码Broker注册请求头信息,头信息里包含了Broker地址、名称、brokerId等信息
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
if (!checksum(ctx, request, requestHeader)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("crc32 not match");
return response;
}
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
if (request.getBody() != null) {
try {
//解码Broker注册请求体信息,请求体中包含了所有的Topic信息
registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
} catch (Exception e) {
throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
}
} else {
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
}
//调用了RouteInfoManager#registerBroker方法,存储Broker相关信息到路由管理类中
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
registerBrokerBody.getTopicConfigSerializeWrapper(),
registerBrokerBody.getFilterServerList(),
ctx.channel());
//构建返回请求
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
网络连接及请求处理的能力是注册中心首先需要具备的功能。
其次需要有连接管理的能力,能够对超时连接,断开的连接进行处理。
路由信息管理能力是Nameserver的核心业务能力。
为了满足软件的灵活性,我们还需要有配置功能。
面向对象思想,类功能的合理分工,是所有软件开发都需要考虑的。
希望今天的内容能对大家有所帮助。
聊技术,不止于技术。
评论