Dubbo 原理,服务是如何注册的?
之前的文章我们讲解了服务在发布时有三个阶段:准备、发布服务、注册服务。之前两个阶段已经详细讲解过,这篇文章将讲解最后一个阶段服务的注册。前置知识不懂的请移步这篇文章。
Dubbo原理,服务暴露的前置工作|原创
「后台回复"dubbo源码"获得源码笔记仓库地址」
服务注册
我们都知道从理论上来说,其实当provider暴露服务之后comsumer就可以直连然后开始调用服务,注册中心并不是必须的。但是这样的方式不利于服务治理,所以仅在测试环境中可以这样操作,在生产中,注册中心实际上是必不可少的,我们需要依赖注册中心来管理服务的上下线和拉取。
在Dubbo中默认使用Zookeeper作为注册中心,其他类型的注册中心原理也类似。
回到RegistryProtocol.export()
方法中,可以看到注册中心的逻辑主要分为两部分,一是获得注册中心实例,二是向注册中心注册服务。
// 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
// 获取已注册的服务提供者 URL
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
// 根据 register 的值决定是否注册服务
if (register) {
// 重点🌟!注册逻辑
registry.register(registeredProviderUrl);
}
创建注册中心
在getRegistry()
中,首先我们会根据url中的参数,获得自适应扩展实例RegistryFactory$Adaptive
,然后获得对应具体实现。接着调用RegistryFactory.getRegistry()
方法,最终走到默认实现 AbstractRegistryFactory
中。
在AbstractRegistryFactory
中,会首先构建注册服务需要的URL和key,然后根据key查询本地缓存中服务是否存在,存在说明已经暴露,直接返回,不存在则调用子类实现createRegistry(url)
创建注册中心服务,这里再次使用了模板方法的设计模式。
protected Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = getRegistryUrl(originInvoker);
return getRegistry(registryUrl);
}
@Override
public Registry getRegistry(URL url) {
Registry defaultNopRegistry = getDefaultNopRegistryIfDestroyed();
if (null != defaultNopRegistry) {
return defaultNopRegistry;
}
// 构建注册url
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
String key = createRegistryCacheKey(url);
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
// double check
// fix https://github.com/apache/dubbo/issues/7265.
defaultNopRegistry = getDefaultNopRegistryIfDestroyed();
if (null != defaultNopRegistry) {
return defaultNopRegistry;
}
// 缓存中获取key = zookeeper://houduankaifa.club:2181/org.apache.dubbo.registry.RegistryService
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
// 缓存未命中,创建 Registry 实例,模板方法,子类实现
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
protected abstract Registry createRegistry(URL url);
进入ZookeeperRegistryFactory
后,便会开始新建一个ZookeeperRegistry,这里没什么逻辑。
// zookeeperTransporter 由 SPI 在运行时注入,类型为 ZookeeperTransporter$Adaptive
private ZookeeperTransporter zookeeperTransporter;
public ZookeeperRegistryFactory() {
this.zookeeperTransporter = ZookeeperTransporter.getExtension();
}
/**
* Invisible injection of zookeeper client via IOC/SPI
*
* @param zookeeperTransporter
*/
@DisableInject
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
// 创建 ZookeeperRegistry
return new ZookeeperRegistry(url, zookeeperTransporter);
}
在下面的代码代码中,主要创建了新的Zk客户端,并且添加了监听器对各种事件进行处理。
我们重点关注 ZookeeperTransporter 的 connect()
方法调用,这个方法用于创建 Zookeeper 客户端。创建好 Zookeeper 客户端,意味着注册中心的创建过程就结束了。接下来,再来分析一下 Zookeeper 客户端的创建过程。
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 获取组名,默认为 dubbo
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
// 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporter
zkClient = zookeeperTransporter.connect(url);
// 添加状态监听器
zkClient.addStateListener((state) -> {
if (state == StateListener.RECONNECTED) {
logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
" Since ephemeral ZNode will not get deleted for a connection lose, " +
"there's no need to re-register url of this instance.");
ZookeeperRegistry.this.fetchLatestAddresses();
} else if (state == StateListener.NEW_SESSION_CREATED) {
logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
try {
ZookeeperRegistry.this.recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else if (state == StateListener.SESSION_LOST) {
……无处理
});
}
接着跟踪,会进入AbstractZookeeperTransporter
的connect()
方法中,这里重点关注createZookeeperClient()
方法,其他都是一些缓存判断逻辑。
public ZookeeperClient connect(URL url) {
ZookeeperClient zookeeperClient;
// address format: {[username:password@]address}
List<String> addressList = getURLBackupAddress(url);
// The field define the zookeeper server , including protocol, host, port, username, password
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
logger.info("find valid zookeeper client from the cache for address: " + url);
return zookeeperClient;
}
// avoid creating too many connections, so add lock
synchronized (zookeeperClientMap) {
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
logger.info("find valid zookeeper client from the cache for address: " + url);
return zookeeperClient;
}
// 重点!创建zkclient
zookeeperClient = createZookeeperClient(url);
logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
writeToClientMap(addressList, zookeeperClient);
}
return zookeeperClient;
}
前面说过,这里的 zookeeperTransporter 类型为自适应拓展类,因此 connect 方法会在被调用时决定加载什么类型的 ZookeeperTransporter 拓展,默认为 CuratorZookeeperTransporter。下面我们到 CuratorZookeeperTransporter 中看一看。
//CuratorZookeeperTransporter
public ZookeeperClient connect(URL url) {
// 创建 CuratorZookeeperClient
return new CuratorZookeeperClient(url);
}
继续向下看。CuratorZookeeperClient
构造方法主要用于创建和启动 CuratorFramework 实例。以上基本上都是 Curator 框架的代码,大家如果对 Curator 框架不是很了解,可以参考 Curator 官方文档。在这个构造方法中,最终会启动zk客户端。
public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {
private final CuratorFramework client;
public CuratorZookeeperClient(URL url) {
super(url);
try {
// 创建 CuratorFramework 构造器
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(5000);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest", authority.getBytes());
}
// 构建 CuratorFramework 实例
client = builder.build();
// 添加监听器
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState state) {
if (state == ConnectionState.LOST) {
CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
} else if (state == ConnectionState.CONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
} else if (state == ConnectionState.RECONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
}
}
});
// 启动客户端
client.start();
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
到此为止注册中心实例就创建好了,接下来要做的事情是向注册中心注册服务。
节点创建
以 Zookeeper 为例,所谓的服务注册,本质上是将服务配置数据写入到 Zookeeper 的某个路径的节点下。每一个服务就对应一个zk中的 node。为了让大家有一个直观的了解,下面我们将 Dubbo 的 demo 跑起来,然后通过 Zookeeper 可视化客户端 ZooInspector 查看节点数据。如下:
从上图中可以看到 com.alibaba.dubbo.demo.DemoService 这个服务对应的配置信息(存储在 URL 中)最终被注册到了 /dubbo/com.alibaba.dubbo.demo.DemoService/providers/ 节点下。搞懂了服务注册的本质,那么接下来我们就可以去阅读服务注册的代码了。服务注册的接口为 register(URL),这个方法定义在 FailbackRegistry 抽象类中。代码如下:
public void register(URL url) {
super.register(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// 模板方法,由子类实现
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 获取 check 参数,若 check = true 将会直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register");
} else {
logger.error("Failed to register");
}
// 记录注册失败的链接
failedRegistered.add(url);
}
}
protected abstract void doRegister(URL url);
如上,我们重点关注 doRegister 方法调用即可,其他的代码先忽略。doRegister 方法是一个模板方法,因此我们到 FailbackRegistry 子类 ZookeeperRegistry 中进行分析。如下:
public void doRegister(URL url) {
try {
// 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:
// /${group}/${serviceInterface}/providers/${url}
// 比如
// /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
如上,ZookeeperRegistry 在 doRegister 中调用了 Zookeeper 客户端创建服务节点。节点路径由 toUrlPath 方法生成,该方法逻辑不难理解,就不分析了。接下来分析 create 方法,如下:
public void create(String path, boolean ephemeral) {
if (!ephemeral) {
if (persistentExistNodePath.contains(path)) {
return;
}
if (checkExists(path)) {
// 如果要创建的节点类型非临时节点,那么这里要检测节点是否存在
persistentExistNodePath.add(path);
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
// 递归创建上一级路径
create(path.substring(0, i), false);
}
// 根据 ephemeral 的值创建临时或持久节点
if (ephemeral) {
createEphemeral(path);
} else {
createPersistent(path);
persistentExistNodePath.add(path);
}
}
上面方法先是通过递归创建当前节点的上一级路径,然后再根据 ephemeral 的值决定创建临时还是持久节点。createEphemeral 和 createPersistent 这两个方法都比较简单,这里简单分析其中的一个。如下:
public void createEphemeral(String path) {
try {
// 通过 Curator 框架创建节点
client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
} catch (NodeExistsException e) {
logger.warn("ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" +
", this duplication might be caused by a delete delay from the zk server, which means the old expired session" +
" may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " +
"we can just try to delete and create again.", e);
deletePath(path);
createEphemeral(path);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
好了,到此关于服务注册的过程就分析完了。
整个过程可简单总结为:先创建注册中心实例,之后再通过注册中心实例注册服务。