dubbo源码之Proxy、Transporter和Exchanger执行过程

时间:2022-06-26
本文章向大家介绍dubbo源码之Proxy、Transporter和Exchanger执行过程,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

对于dubbo的provider和consumer,通过不同的配置项会进行不同的操作,provider会对应创建ExchangeServer,而consumer端对应创建ExchangeClient。而根据实现的不同server和client也用对应的方式工作,比如dubbo默认使用的实现是nettyServer和nettyClient。这一切加载流程是如何工作的呢?这个问题就是本文要解决的重点。

一般配置项:

provider:
<dubbo:application name="appA" />
<!--使用zookeeper注册中心暴露服务地址 --><dubbo:registry address="${zookeeper.url}" ></dubbo:registry><dubbo:protocol name="dubbo" port="-1" />
<dubbo:provider filter="MDCFilter,DubboExceptionFilter,-exception" delay="-1" timeout="7000" retries="0" /><!-- 让监控中心进行统计 --><dubbo:monitor protocol="registry" />
<!-- 使用注解方式暴露接口 -->     <dubbo:annotation package="com.user"/>
consumer: <dubbo:application name="appB"></dubbo:application><!--    注意这里的traceFilterc必须放在HystrixFilter前--><dubbo:consumer filter="HystrixFilter,MDCFilter,DubboExceptionFilter,-exception" timeout="10000"                check="false"/>
<!-- 使用注解方式暴露接口 缺损package时,默认扫描全包 --><dubbo:annotation/>
<dubbo:registry address="${zookeeper.url}"></dubbo:registry>

provider对应的是com.alibaba.dubbo.config.ProviderConfig中的属性值; consumer对应的是com.alibaba.dubbo.config.ConsumerConfig中的属性值。

1. 解析入口(DubboNamespaceHandler):

解析部分com.alibaba.dubbo.config.spring.schema.DubboBeanDefinitionParser#parse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, java.lang.Class, boolean):

从上面代码来看,整个流程就相当清晰了,DubboNamespaceHandler中加载的顺序是(这里只关注provider、consumer和service)先ProviderConfig再ConsumerConfig,再ProtocolConfig,之后才是ServiceBean。在解析完provider,consumer和protocol之后还会把这些节点当成serviceBean继续进行相应的解析,也就是说真正的解析过程都是在serviceBean中。包括provider和consumer中配置的filter属性也是会当做ServiceBean进行解析,解析时对应的protocol是:

在provider、consumer、registry、filter等作为ServiceBean来解析和进行相应的操作时,对应的ServiceBean的protocol属性是不一样的。这里需要提一点的是filter对应的是ProtocolFilterWrapper,而Consumer和Provider对应的是DubboProtocol

需要注意的是com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper中包裹的属性private final Protocol protocol;在这里对应的是DubboProtocol。

2. dubbo的ServiceBean(注意找ServiceConfig中解析的部分与xml配置项标签的对应关系):

  • 接下来我们重点分析下:com.alibaba.dubbo.config.ServiceConfig#doExportUrls:
   private void doExportUrls() {        List<URL> registryURLs = loadRegistries(true);        for (ProtocolConfig protocolConfig : protocols) {//使用的协议,这里以dubbo协议为例            doExportUrlsFor1Protocol(protocolConfig, registryURLs);        }    }
  • 它的com.alibaba.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol方法:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {    ...    String scope = url.getParameter(Constants.SCOPE_KEY);            //配置为none不暴露            if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
                //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)                if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {                    //本地                    exportLocal(url);                }                //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)                if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){                    if (logger.isInfoEnabled()) {                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);                    }                    if (registryURLs != null && registryURLs.size() > 0                            && url.getParameter("register", true)) {                        for (URL registryURL : registryURLs) {                            url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));                            URL monitorUrl = loadMonitor(registryURL);                            if (monitorUrl != null) {                                url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());                            }                            if (logger.isInfoEnabled()) {                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);                            }                            //暴露时先获取invoker                            Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                            Exporter<?> exporter = protocol.export(invoker);                            exporters.add(exporter);                        }                    } else {                        //暴露时先获取invoker                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                        Exporter<?> exporter = protocol.export(invoker);                        exporters.add(exporter);                    }                }            }            this.urls.add(url);
}
exportLocal方法:private void exportLocal(URL url) {        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {            URL local = URL.valueOf(url.toFullString())                    .setProtocol(Constants.LOCAL_PROTOCOL)                    .setHost(NetUtils.LOCALHOST)                    .setPort(0);
            // modified by lishen            ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));            //暴露时先获取invoker            Exporter<?> exporter = protocol.export(                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));            exporters.add(exporter);            logger.info("Export dubbo service " + interfaceClass.getName() +" to local registry");        }    }

在进行服务暴露时,先获取invoker,然后获取exporter,并将exporter添加到ServiceConfig的List<exporter</exporter

关于暴露url成Service部分这里不去深究了,这里dubbo会把每个url暴露成服务(Service),在dubbo控制台上看时就可以看到每个服务对应一大串的url。

3. server部分的初始化

  • 我们接着来看看上面源码中的proxyFactory和invoker部分: proxyFactory的类继承结构图为:

这里我们主要看下JdkProxyFactory:

 public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));    }
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {        return new AbstractProxyInvoker<T>(proxy, type, url) {            @Override            protected Object doInvoke(T proxy, String methodName,                                       Class<?>[] parameterTypes,                                       Object[] arguments) throws Throwable {                Method method = proxy.getClass().getMethod(methodName, parameterTypes);                return method.invoke(proxy, arguments);            }        };    }

主要用于生成接口实现类的代理对象,并生成一个携带doInvoke方法的AbstractProxyInvoker。

  • 对于protocol.export我们主要看一下com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#export: DubboProtocol里的方法如下:

可见它包括创建Client和创建Server的相关方法,其中它的export方法如下:

图中标红的部分为openServer方法,也就是provider创建server的部分:

private void openServer(URL url) {        // find server.        String key = url.getAddress();        //client 也可以暴露一个只有server可以调用的服务。        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);        if (isServer) {            ExchangeServer server = serverMap.get(key);            if (server == null) {                serverMap.put(key, createServer(url));            } else {                //server支持reset,配合override功能使用                server.reset(url);            }        }    }
    private ExchangeServer createServer(URL url) {        //默认开启server关闭时发送readonly事件        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());        //默认开启heartbeat        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));        //public static final String  DEFAULT_REMOTING_SERVER            = "netty";        //这里默认使用的是netty        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);        //这里默认使用的是NettyTransporter        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);        ExchangeServer server;        try {            server = Exchangers.bind(url, requestHandler);        } catch (RemotingException e) {            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);        }        str = url.getParameter(Constants.CLIENT_KEY);        if (str != null && str.length() > 0) {            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();            if (!supportedTypes.contains(str)) {                throw new RpcException("Unsupported client type: " + str);            }        }        return server;    }
  • openServer为创建ExchangeServer的方法,其中传入的是createServer方法产生的一个ExchangeServer,这个ExchangeServer是一个装饰器模式装饰过的对象
  • 由上面代码可以知道,dubboProtocol默认使用的是nettyTransporter:
  • ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)是走SPI拓展机制的,可以是自定义的,但默认的是nettyTransporter。
  • 生成的是将NettyServer进行装饰过的ExchangeServer,对应代码:
 try {    server = Exchangers.bind(url, requestHandler);} catch (RemotingException e) {    throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);}

com.alibaba.dubbo.remoting.exchange.Exchangers#bind(com.alibaba.dubbo.common.URL, com.alibaba.dubbo.remoting.exchange.ExchangeHandler):

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {    if (url == null) {        throw new IllegalArgumentException("url == null");    }    if (handler == null) {        throw new IllegalArgumentException("handler == null");    }    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");    return getExchanger(url).bind(url, handler);}
public static Exchanger getExchanger(URL url) {        //com.alibaba.dubbo.common.Constants#DEFAULT_EXCHANGER        // public static final String  DEFAULT_EXCHANGER                  = "header";        // 默认用的echanger是header        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);    return getExchanger(type);}
public static Exchanger getExchanger(String type) {    //这里是加载Exchanger的spi拓展点信息    return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);}

加载Exchanger并通过type进行匹配,默认是使用的是HeaderExchanger

最终getExchanger(url).bind(url, handler)对应的方法是:

最终是将传入的ExchangeHandler装饰成HeaderExchangeHandler,然后将HeaderExchangeHandler装饰成DecodeHandler。

这里可以看到,server也是装饰器模式,最终工作的是NettyServer。到这里Server的初始化流程也就结束了。

4. client端的初始化

文章开头部分我们知道com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper中包裹的属性private final Protocol protocol;在这里对应的是DubboProtocol。com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper#refer方法:

 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {            return protocol.refer(type, url);        }        return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);    }

可以看到它调用了protocol.refer方法,那么com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#refer方法如下:

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    // modified by lishen    optimizeSerialization(url);
    // create rpc invoker.    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);    invokers.add(invoker);    return invoker;}

在这里面会调用com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#getClients方法:

private ExchangeClient[] getClients(URL url){    //是否共享连接    boolean service_share_connect = false;    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);    //如果connections不配置,则共享连接,否则每服务每连接    if (connections == 0){        service_share_connect = true;        connections = 1;    }
    ExchangeClient[] clients = new ExchangeClient[connections];    for (int i = 0; i < clients.length; i++) {        if (service_share_connect){            clients[i] = getSharedClient(url);        } else {            clients[i] = initClient(url);        }    }    return clients;}

继续看initClient:

接下来看client = Exchangers.connect(url ,requestHandler);的实现:

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {    if (url == null) {        throw new IllegalArgumentException("url == null");    }    if (handler == null) {        throw new IllegalArgumentException("handler == null");    }    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");    return getExchanger(url).connect(url, handler);}

其中getExchanger方法部分与server部分相同,我们看看它的connect方法的实现:

这里最后返回的是一个HeaderExchangeClient,它的内部装饰的是NettyClient。到这里client端的初始化也全部结束了。

5. 总结

本文以DubboNamespaceHandler为入口对Server和Client的初始化以及初始化过程中涉及到的Proxy、Transporter和Exchanger进行了一个流程梳理。