Dubbo 原理,服务是如何注册的?

共 23890字,需浏览 48分钟

 ·

2022-07-31 21:35


之前的文章我们讲解了服务在发布时有三个阶段:准备、发布服务、注册服务。之前两个阶段已经详细讲解过,这篇文章将讲解最后一个阶段服务的注册。前置知识不懂的请移步这篇文章。


Dubbo原理,服务暴露的前置工作|原创


「后台回复"dubbo源码"获得源码笔记仓库地址」

服务注册

我们都知道从理论上来说,其实当provider暴露服务之后comsumer就可以直连然后开始调用服务,注册中心并不是必须的。但是这样的方式不利于服务治理,所以仅在测试环境中可以这样操作,在生产中,注册中心实际上是必不可少的,我们需要依赖注册中心来管理服务的上下线和拉取。

在Dubbo中默认使用Zookeeper作为注册中心,其他类型的注册中心原理也类似。

回到RegistryProtocol.export()方法中,可以看到注册中心的逻辑主要分为两部分,一是获得注册中心实例,二是向注册中心注册服务。

// 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
// 获取已注册的服务提供者 URL
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
// 根据 register 的值决定是否注册服务
if (register) {
    // 重点🌟!注册逻辑
    registry.register(registeredProviderUrl);
}

创建注册中心

getRegistry()中,首先我们会根据url中的参数,获得自适应扩展实例RegistryFactory$Adaptive,然后获得对应具体实现。接着调用RegistryFactory.getRegistry() 方法,最终走到默认实现 AbstractRegistryFactory 中。

AbstractRegistryFactory中,会首先构建注册服务需要的URL和key,然后根据key查询本地缓存中服务是否存在,存在说明已经暴露,直接返回,不存在则调用子类实现createRegistry(url)创建注册中心服务,这里再次使用了模板方法的设计模式。

protected Registry getRegistry(final Invoker<?> originInvoker) {
    URL registryUrl = getRegistryUrl(originInvoker);
    return getRegistry(registryUrl);
}
@Override
public Registry getRegistry(URL url) {

    Registry defaultNopRegistry = getDefaultNopRegistryIfDestroyed();
    if (null != defaultNopRegistry) {
        return defaultNopRegistry;
    }

    //  构建注册url
    url = URLBuilder.from(url)
            .setPath(RegistryService.class.getName())
            .addParameter(INTERFACE_KEYRegistryService.class.getName())
            .removeParameters(EXPORT_KEYREFER_KEY)
            .build()
;
    String key = createRegistryCacheKey(url);
    // Lock the registry access process to ensure a single instance of the registry
    LOCK.lock();
    try {
        // double check
        // fix https://github.com/apache/dubbo/issues/7265.
        defaultNopRegistry = getDefaultNopRegistryIfDestroyed();
        if (null != defaultNopRegistry) {
            return defaultNopRegistry;
        }
        // 缓存中获取key = zookeeper://houduankaifa.club:2181/org.apache.dubbo.registry.RegistryService
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        //create registry by spi/ioc
        // 缓存未命中,创建 Registry 实例,模板方法,子类实现
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // Release the lock
        LOCK.unlock();
    }
}

protected abstract Registry createRegistry(URL url);

进入ZookeeperRegistryFactory后,便会开始新建一个ZookeeperRegistry,这里没什么逻辑。

// zookeeperTransporter 由 SPI 在运行时注入,类型为 ZookeeperTransporter$Adaptive
private ZookeeperTransporter zookeeperTransporter;

public ZookeeperRegistryFactory() {
    this.zookeeperTransporter = ZookeeperTransporter.getExtension();
}

/**
 * Invisible injection of zookeeper client via IOC/SPI
 *
 * @param zookeeperTransporter
 */

@DisableInject
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
    this.zookeeperTransporter = zookeeperTransporter;
}

@Override
public Registry createRegistry(URL url) {
    // 创建 ZookeeperRegistry
    return new ZookeeperRegistry(url, zookeeperTransporter);
}

在下面的代码代码中,主要创建了新的Zk客户端,并且添加了监听器对各种事件进行处理。

我们重点关注 ZookeeperTransporter 的 connect() 方法调用,这个方法用于创建 Zookeeper 客户端。创建好 Zookeeper 客户端,意味着注册中心的创建过程就结束了。接下来,再来分析一下 Zookeeper 客户端的创建过程。

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    // 获取组名,默认为 dubbo
    String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(PATH_SEPARATOR)) {
        group = PATH_SEPARATOR + group;
    }
    this.root = group;
    // 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporter
    zkClient = zookeeperTransporter.connect(url);
    // 添加状态监听器
    zkClient.addStateListener((state) -> {
        if (state == StateListener.RECONNECTED) {
            logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
                    " Since ephemeral ZNode will not get deleted for a connection lose, " +
                    "there's no need to re-register url of this instance.");
            ZookeeperRegistry.this.fetchLatestAddresses();
        } else if (state == StateListener.NEW_SESSION_CREATED) {
            logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
            try {
                ZookeeperRegistry.this.recover();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        } else if (state == StateListener.SESSION_LOST) {
           ……无处理
    });
}

接着跟踪,会进入AbstractZookeeperTransporterconnect()方法中,这里重点关注createZookeeperClient()方法,其他都是一些缓存判断逻辑。

public ZookeeperClient connect(URL url) {
    ZookeeperClient zookeeperClient;
    // address format: {[username:password@]address}
    List<String> addressList = getURLBackupAddress(url);
    // The field define the zookeeper server , including protocol, host, port, username, password
    if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
        logger.info("find valid zookeeper client from the cache for address: " + url);
        return zookeeperClient;
    }
    // avoid creating too many connections, so add lock
    synchronized (zookeeperClientMap) {
        if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
            logger.info("find valid zookeeper client from the cache for address: " + url);
            return zookeeperClient;
        }

        // 重点!创建zkclient
        zookeeperClient = createZookeeperClient(url);
        logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
        writeToClientMap(addressList, zookeeperClient);
    }
    return zookeeperClient;
}

前面说过,这里的 zookeeperTransporter 类型为自适应拓展类,因此 connect 方法会在被调用时决定加载什么类型的 ZookeeperTransporter 拓展,默认为 CuratorZookeeperTransporter。下面我们到 CuratorZookeeperTransporter 中看一看。

//CuratorZookeeperTransporter
public ZookeeperClient connect(URL url) {
    // 创建 CuratorZookeeperClient
    return new CuratorZookeeperClient(url);
}

继续向下看。CuratorZookeeperClient 构造方法主要用于创建和启动 CuratorFramework 实例。以上基本上都是 Curator 框架的代码,大家如果对 Curator 框架不是很了解,可以参考 Curator 官方文档。在这个构造方法中,最终会启动zk客户端。

public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher{

    private final CuratorFramework client;
    
    public CuratorZookeeperClient(URL url) {
        super(url);
        try {
            // 创建 CuratorFramework 构造器
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                    .connectString(url.getBackupAddress())
                    .retryPolicy(new RetryNTimes(11000))
                    .connectionTimeoutMs(5000);
            String authority = url.getAuthority();
            if (authority != null && authority.length() > 0) {
                builder = builder.authorization("digest", authority.getBytes());
            }
            // 构建 CuratorFramework 实例
            client = builder.build();
            // 添加监听器
            client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                @Override
                public void stateChanged(CuratorFramework client, ConnectionState state) {
                    if (state == ConnectionState.LOST) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
                    } else if (state == ConnectionState.CONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
                    } else if (state == ConnectionState.RECONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                    }
                }
            });
            
            // 启动客户端
            client.start();
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
}

到此为止注册中心实例就创建好了,接下来要做的事情是向注册中心注册服务。

节点创建

以 Zookeeper 为例,所谓的服务注册,本质上是将服务配置数据写入到 Zookeeper 的某个路径的节点下。每一个服务就对应一个zk中的 node。为了让大家有一个直观的了解,下面我们将 Dubbo 的 demo 跑起来,然后通过 Zookeeper 可视化客户端 ZooInspector 查看节点数据。如下:

从上图中可以看到 com.alibaba.dubbo.demo.DemoService 这个服务对应的配置信息(存储在 URL 中)最终被注册到了 /dubbo/com.alibaba.dubbo.demo.DemoService/providers/ 节点下。搞懂了服务注册的本质,那么接下来我们就可以去阅读服务注册的代码了。服务注册的接口为 register(URL),这个方法定义在 FailbackRegistry 抽象类中。代码如下:

public void register(URL url) {
    super.register(url);
    failedRegistered.remove(url);
    failedUnregistered.remove(url);
    try {
        // 模板方法,由子类实现
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // 获取 check 参数,若 check = true 将会直接抛出异常
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register");
        } else {
            logger.error("Failed to register");
        }

        // 记录注册失败的链接
        failedRegistered.add(url);
    }
}

protected abstract void doRegister(URL url);

如上,我们重点关注 doRegister 方法调用即可,其他的代码先忽略。doRegister 方法是一个模板方法,因此我们到 FailbackRegistry 子类 ZookeeperRegistry 中进行分析。如下:

public void doRegister(URL url) {
    try {
        // 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:
        //   /${group}/${serviceInterface}/providers/${url}
        // 比如
        //   /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
        zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

如上,ZookeeperRegistry 在 doRegister 中调用了 Zookeeper 客户端创建服务节点。节点路径由 toUrlPath 方法生成,该方法逻辑不难理解,就不分析了。接下来分析 create 方法,如下:

public void create(String path, boolean ephemeral) {
    if (!ephemeral) {
        if (persistentExistNodePath.contains(path)) {
            return;
        }
        if (checkExists(path)) {
            // 如果要创建的节点类型非临时节点,那么这里要检测节点是否存在
            persistentExistNodePath.add(path);
            return;
        }
    }
    int i = path.lastIndexOf('/');
    if (i > 0) {
        // 递归创建上一级路径
        create(path.substring(0, i), false);
    }
    // 根据 ephemeral 的值创建临时或持久节点
    if (ephemeral) {
        createEphemeral(path);
    } else {
        createPersistent(path);
        persistentExistNodePath.add(path);
    }
}

上面方法先是通过递归创建当前节点的上一级路径,然后再根据 ephemeral 的值决定创建临时还是持久节点。createEphemeral 和 createPersistent 这两个方法都比较简单,这里简单分析其中的一个。如下:

public void createEphemeral(String path) {
    try {
        // 通过 Curator 框架创建节点
        client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
    } catch (NodeExistsException e) {
        logger.warn("ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" +
                ", this duplication might be caused by a delete delay from the zk server, which means the old expired session" +
                " may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " +
                "we can just try to delete and create again.", e);
        deletePath(path);
        createEphemeral(path);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}

好了,到此关于服务注册的过程就分析完了。

整个过程可简单总结为:先创建注册中心实例,之后再通过注册中心实例注册服务。

浏览 32
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报