DUBBO泛化调用原理与设计思想

JAVA前线

共 11181字,需浏览 23分钟

 ·

2022-01-11 16:51



JAVA前线 


欢迎大家关注公众号「JAVA前线」查看更多精彩分享,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时也非常欢迎大家加我微信「java_front」一起交流学习


1 泛化调用实例

对于JAVA服务端开发者而言在使用Dubbo时并不经常使用泛化调用,通常方法是在生产者发布服务之后,消费者可以通过引入生产者提供的client进行调用。那么泛化调用使用场景是什么呢?

第一种场景是消费者不希望引入生产者提供的client依赖,只希望关注调用哪个方法,需要传什么参数即可。第二种场景是消费者不是使用Java语言,而是使用例如Python语言,那么如何调用使用Java语言生产者提供的服务呢?这时我们可以选择泛化调用。

泛化调用使用方法并不复杂,下面我们编写一个泛化调用实例。首先生产者发布服务,这与普通服务发布没有任何区别。

package com.java.front.dubbo.demo.provider;

public interface HelloService {
    public String sayHelloGeneric(Person person, String message);
}

public class HelloServiceImpl implements HelloService {
    @Override
    public String sayHelloGeneric(Person person, String message) throws Exception {
        String result = "hello[" + person + "],message=" + message;
        return result;
    }
}

Person类声明:

package com.java.front.dubbo.demo.provider.model;

public class Person implements Serializable {
    private String name;
}

provider.xml文件内容:

<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
    xsi:schemaLocation="http://www.springframework.org/schema/beans        
    http://www.springframework.org/schema/beans/spring-beans-4.3.xsd        
    http://code.alibabatech.com/schema/dubbo        
    http://code.alibabatech.com/schema/dubbo/dubbo.xsd"
>


 
 <dubbo:application name="java-front-provider" />

 
 <dubbo:registry address="zookeeper://127.0.0.1:2181" />

 
 <dubbo:protocol name="dubbo" port="9999" />
 
 
 <bean id="helloService" class="com.java.front.dubbo.demo.provider.HelloServiceImpl" />
 
 
 <dubbo:service interface="com.java.front.dubbo.demo.provider.HelloService" ref="helloService" />
beans>

消费者代码有所不同:

import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.service.GenericService;

public class Consumer {
    public static void testGeneric() {
        ReferenceConfig reference = new ReferenceConfig();
        reference.setApplication(new ApplicationConfig("java-front-consumer"));
        reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
        reference.setInterface("com.java.front.dubbo.demo.provider.HelloService");
        reference.setGeneric(true);
        GenericService genericService = reference.get();
        Map person = new HashMap();
        person.put("name""微信公众号「JAVA前线」");
        String message = "你好";
        Object result = genericService.$invoke("sayHelloGeneric"new String[] { "com.java.front.dubbo.demo.provider.model.Person""java.lang.String" }, new Object[] { person, message });
        System.out.println(result);
    }
}

2  Invoker

我们通过源码分析讲解泛化调用原理,首先需要了解Invoker这个Dubbo重量级概念。

在生产者暴露服务流程总体分为两步,第一步是接口实现类转换为Invoker,第二步是Invoker转换为Exporter并放入ExporterMap,我们首先分析生产者暴露服务流程:



生产者通过ProxyFactory.getInvoker方法创建Invoker(AbstractProxyInvoker):

public class JdkProxyFactory extends AbstractProxyFactory {

    @Override
    public  Invoker getInvoker(T proxy, Class type, URL url) {
        return new AbstractProxyInvoker(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class[] parameterTypes,
                                      Object[] arguments)
 throws Throwable 
{

                // proxy为被代理对象 -> com.java.front.dubbo.demo.provider.HelloServiceImpl
                Method method = proxy.getClass().getMethod(methodName, parameterTypes);
                return method.invoke(proxy, arguments);
            }
        };
    }
}


我们再分析消费者引用服务流程:




消费者Invoker通过显示实例化创建,例如本地暴露和远程暴露都是通过显示初始化的方法创建Invoker(AbstractInvoker):

new InjvmInvoker(serviceType, url, url.getServiceKey(), exporterMap)
new DubboInvoker(serviceType, url, getClients(url), invokers)

再通过ProxyFactory.getProxy创建代理:

public class JdkProxyFactory extends AbstractProxyFactory {
    @Override
    public  getProxy(Invoker invoker, Class[] interfaces) {
        InvokerInvocationHandler invokerInvocationHandler = new InvokerInvocationHandler(invoker);
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, invokerInvocationHandler);
    }
}

无论是生产者还是消费者的Invoker都实现自org.apache.dubbo.rpc.Invoker:

public abstract class AbstractInvoker<Timplements Invoker<T>{
}

public abstract class AbstractProxyInvoker<Timplements Invoker<T{
}

3 责任链模式

为什么生产者和消费者都要转换为Invoker,而不是不直接调用呢?我认为Invoker正是Dubbo设计精彩之处:真实调用都转换为Invoker,这样就可以通过责任链模式增强Invoker功能。

public class ProtocolFilterWrapper implements Protocol {

    @Override
    public  Exporter export(Invoker invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        // 增加过滤器链
        Invoker invokerChain = buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER);
        return protocol.export(invokerChain);
    }

    @Override
    public  Invoker refer(Class type, URL url) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        // 增加过滤器链
        Invoker invoker = protocol.refer(type, url);
        Invoker result = buildInvokerChain(invoker, Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
        return result;
    }
}

无论是生产者还是消费者都会创建过滤器链,我们看看buildInvokerChain这个方法:

public class ProtocolFilterWrapper implements Protocol {

    private final Protocol protocol;

    public ProtocolFilterWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }

    private static  Invoker buildInvokerChain(final Invoker invoker, String key, String group) {
        Invoker last = invoker;

        // (1)加载所有包含Activate注解的过滤器
        // (2)根据group过滤得到过滤器列表
        // (3)Invoker最终被放到过滤器链尾部
        List filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), keygroup);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker next = last;

                // 构造一个简化Invoker
                last = new Invoker() {

                    @Override
                    public Class getInterface() {
                        return invoker.getInterface();
                    }

                    @Override
                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    @Override
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        // 构造过滤器链路
                        Result result = filter.invoke(next, invocation);
                        if (result instanceof AsyncRpcResult) {
                            AsyncRpcResult asyncResult = (AsyncRpcResult) result;
                            asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));
                            return asyncResult;
                        } else {
                            return filter.onResponse(result, invoker, invocation);
                        }
                    }

                    @Override
                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }
}

加载所有包含Activate注解的过滤器,根据group过滤得到过滤器列表,Invoker最终被放到过滤器链尾部,生产者最终生成链路:

EchoFilter -> ClassloaderFilter -> GenericFilter -> ContextFilter -> TraceFilter -> TimeoutFilter -> MonitorFilter -> ExceptionFilter -> AbstractProxyInvoker

消费者最终生成链路:

ConsumerContextFilter -> FutureFilter -> MonitorFilter -> GenericImplFilter -> DubboInvoker

4 泛化调用原理

我们终于看到了泛化调用核心原理,在生产者链路看到GenericFilter过滤器,消费者链路看到GenericImplFilter过滤器,正是这两个过滤器实现了泛化调用。

(1) GenericImplFilter

@Activate(group = Constants.CONSUMER, value = Constants.GENERIC_KEY, order = 20000)
public class GenericImplFilter implements Filter {

    @Override
    public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {

        // 方法名=$invoke
        // invocation.getArguments()=("sayHelloGeneric", new String[] { "com.java.front.dubbo.demo.provider.model.Person", "java.lang.String" }, new Object[] { person, "你好" });
        if (invocation.getMethodName().equals(Constants.$INVOKE)
                && invocation.getArguments() != null
                && invocation.getArguments().length == 3
                && ProtocolUtils.isGeneric(generic)) {

            // 第一个参数表示方法名
            // 第二个参数表示参数类型
            // 第三个参数表示参数值 -> [{name=微信公众号「JAVA前线」},你好]
            Object[] args = (Object[]) invocation.getArguments()[2];
            if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                for (Object arg : args) {
                    if (!(byte[].class == arg.getClass())) {
                        error(generic, byte[].class.getName(), arg.getClass().getName());
                    }
                }
            } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                for (Object arg : args) {
                    if (!(arg instanceof JavaBeanDescriptor)) {
                        error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName());
                    }
                }
            }
            // 附加参数generic值设置为true
            ((RpcInvocation) invocation).setAttachment(Constants.GENERIC_KEY, invoker.getUrl().getParameter(Constants.GENERIC_KEY));
        }
        // 继续执行过滤器链路
        return invoker.invoke(invocation);
    }
}

(2) GenericFilter

@Activate(group = Constants.PROVIDER, order = -20000)
public class GenericFilter implements Filter {

    @Override
    public Result invoke(Invoker invoker, Invocation inv) throws RpcException {

        // RpcInvocation[methodName=$invoke, parameterTypes=[class java.lang.String, class [Ljava.lang.String;, class [Ljava.lang.Object;], arguments=[sayHelloGeneric, [Ljava.lang.String;@14e77f6b, [Ljava.lang.Object;@51e5f393], attachments={path=com.java.front.dubbo.demo.provider.HelloService, input=451, dubbo=2.0.2, interface=com.java.front.dubbo.demo.provider.HelloService, version=0.0.0, generic=true}]
        if (inv.getMethodName().equals(Constants.$INVOKE)
                && inv.getArguments() != null
                && inv.getArguments().length == 3
                && !GenericService.class.isAssignableFrom(invoker.getInterface())) {

            // sayHelloGeneric
            String name = ((String) inv.getArguments()[0]).trim();

            // [com.java.front.dubbo.demo.provider.model.Person, java.lang.String]
            String[] types = (String[]) inv.getArguments()[1];

            // [{name=微信公众号「JAVA前线」}, 你好]
            Object[] args = (Object[]) inv.getArguments()[2];

            // RpcInvocation[methodName=sayHelloGeneric, parameterTypes=[class com.java.front.dubbo.demo.provider.model.Person, class java.lang.String], arguments=[Person(name=JAVA前线), abc], attachments={path=com.java.front.dubbo.demo.provider.HelloService, input=451, dubbo=2.0.2, interface=com.java.front.dubbo.demo.provider.HelloService, version=0.0.0, generic=true}]
            RpcInvocation rpcInvocation = new RpcInvocation(method, args, inv.getAttachments());
            Result result = invoker.invoke(rpcInvocation);
        }
    }
}

5 文章总结

第一本文介绍了如何使用泛化调用,并引出泛化调用为什么生效这个问题。

第二本文介绍了重量级概念Invoker,并引出为什么Dubbo要创建Invoker这个问题。

第三通过源码分析知道了过滤器链增强了Invoker功能,并且是实现泛化调用的核心。



JAVA前线 


欢迎大家关注公众号「JAVA前线」查看更多精彩分享,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时也非常欢迎大家加我微信「java_front」一起交流学习

浏览 25
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报