编程语言
首页 > 编程语言> > Dubbo核心源码剖析(集群 集群容错 负载均衡 服务治理 通信协议)

Dubbo核心源码剖析(集群 集群容错 负载均衡 服务治理 通信协议)

作者:互联网

Dubbo核心源码剖析(集群 集群容错 负载均衡 服务治理 通信协议

Dubbo核心源码剖析

1. Dubbo高可用集群

1.1 服务集群的概述

1.1.1 概述

为了避免单点故障,现在的应用通常至少会部署在两台服务器上,这样就组成了集群。集群就是单机的多实例,在多个服务器上部署多个服务,每个服务就是一个节点,部署N个节点,处理业务的能力就提升 N倍(大约),这些节点的集合就叫做集群。
在这里插入图片描述

1.1.2 调用过程

在对集群相关代码进行分析之前,这里有必要先来介绍一下集群容错的所有组件。包含 Cluster、Cluster Invoker、Directory、Router 和 LoadBalance 等。
在这里插入图片描述
集群工作过程可分为两个阶段,第一个阶段是在服务消费者初始化期间,集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例,即上图中的 merge 操作。第二个阶段是在服务消费者进行远程调用时。以 FailoverClusterInvoker 为例,该类型 Cluster Invoker 首先会调用 Directory 的 list 方法列举Invoker 列表(可将 Invoker 简单理解为服务提供者)。Directory 的用途是保存 Invoker,可简单类比为 List。其实现类 RegistryDirectory 是一个动态服务目录,可感知注册中心配置的变化,它所持有的Invoker 列表会随着注册中心内容的变化而变化。每次变化后,RegistryDirectory 会动态增删Invoker,并调用 Router 的 route 方法进行路由,过滤掉不符合路由规则的 Invoker。当FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表后,它会通过 LoadBalance 从 Invoker 列表中选择一个 Invoker。最后 FailoverClusterInvoker 会将参数传给 LoadBalance 选择出的 Invoker 实例的 invoke 方法,进行真正的远程调用。

1.1.3 组件介绍

1.2 集群容错机制

在分布式系统中,集群某个某些节点出现问题是大概率事件,因此在设计分布式RPC框架的过程中,必须要把失败作为设计的一等公民来对待。一次调用失败之后,应该如何选择对失败的选择策略,这是一个见仁见智的问题,每种策略可能都有自己独特的应用场景。因此,作为框架来说,应当针对不同场景提供多种策略,供用户进行选择。
在Dubbo设计中,通过Cluster这个接口的抽象,把一组可供调用的Provider信息组合成为一个统一的Invoker 供调用方进行调用。经过路由规则过滤,负载均衡选址后,选中一个具体地址进行调用,如果调用失败,则会按照集群配置的容错策略进行容错处理。

1.2.1 内置集群容错策略

Dubbo默认内置了若干容错策略,如果不能满足用户需求,则可以通过自定义容错策略进行配置
Dubbo主要内置了如下几种策略:

这些名称比较相似,概念也比较容易混淆,下面逐一进行解释。

1. Failover(失败自动切换)
Failover 是高可用系统中的一个常用概念,服务器通常拥有主备两套机器配置,如果主服务器出现故障,则自动切换到备服务器中,从而保证了整体的高可用性。
Dubbo也借鉴了这个思想,并且把它作为Dubbo 默认的容错策略。当调用出现失败的时候,根据配置的重试次数,会自动从其他可用地址中重新选择一个可用的地址进行调用,直到调用成功,或者是达到重试的上限位置。
Dubbo里默认配置的重试次数是2,也就是说,算上第一次调用,最多会调用3次。
其配置方法,容错策略既可以在服务提供方配置,也可以服务调用方进行配置。而重试次数的配置则更为灵活,既可以在服务级别进行配置,也可以在方法级别进行配置。具体优先顺序为:

服务调用方方法级配置 > 服务调用方服务级配置 > 服务提供方方法级配置 > 服务提供方服务级配置

以XML方式为例,具体配置方法如下:
服务提供方,服务级配置

<dubbo:service interface="org.apache.dubbo.demo.DemoService" 
ref="demoService" cluster="failover" retries="2" />

服务提供方,方法级配置

<dubbo:service interface="org.apache.dubbo.demo.DemoService" 
ref="demoService"cluster="failover"> 
<dubbo:method name="sayHello" retries="2" />
 </dubbo:reference>

服务调用方,服务级配置

<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" 
cluster="failover" retries="1"/>

服务调用方,方法级配置:

<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService"
 cluster="failover"> <dubbo:method name="sayHello" retries="3" /> </dubbo:reference>

Failover可以自动对失败进行重试,对调用者屏蔽了失败的细节,但是Failover策略也会带来一些副作用:

某些情况下,重试甚至会造成资源的浪费。考虑一个调用场景,A->B->C,如果A处设置了超时100ms,再B->C的第一次调用完成时已经超过了100ms,但很不幸B->C失败,这时候会进行重试,但其实这时候重试已经没有意义,因此在A看来这次调用已经超时,A可能已经开始执行其他逻辑。

2. Failsafe(失败安全)
失败安全策略的核心是即使失败了也不会影响整个调用流程。通常情况下用于旁路系统或流程中,它的失败不影响核心业务的正确性。在实现上,当出现调用失败时,会忽略此错误,并记录一条日志,同时返回一个空结果,在上游看来调用是成功的。
应用场景,可以用于写入审计日志等操作。
具体配置方法:
服务提供方,服务级配置

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="failsafe" />

服务调用方,服务级配置

<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" cluster="failsafe"/>

其中服务调用方配置优先于服务提供方配置。

3. Failfast(快速失败)
某些业务场景中,某些操作可能是非幂等的,如果重复发起调用,可能会导致出现脏数据等。例如调用某个服务,其中包含一个数据库的写操作,如果写操作完成,但是在发送结果给调用方的过程中出错了,那么在调用发看来这次调用失败了,但其实数据写入已经完成。这种情况下,重试可能并不是一个好策略,这时候就需要使用到 Failfast 策略,调用失败立即报错。让调用方来决定下一步的操作并保证业务的幂等性。

具体配置方法:
服务提供方,服务级配置

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="failfast" />

服务调用方,服务级配置

<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" cluster="failfast"/>

其中服务调用方配置优先于服务提供方配置。

4. Failback(失败自动恢复)
Failback 通常和 Failover 两个概念联系在一起。在高可用系统中,当主机发生故障,通过Failover 进行主备切换后,待故障恢复后,系统应该具备自动恢复原始配置的能力

Dubbo中的 Failback 策略中,如果调用失败,则此次失败相当于 Failsafe ,将返回一个空结果。而与 Failsafe 不同的是,Failback策略会将这次调用加入内存中的失败列表中,对于这个列表中的失败调用,会在另一个线程中进行异步重试,重试如果再发生失败,则会忽略,即使重试调用成功,原来的调用方也感知不到了。因此它通常适合于,对于实时性要求不高,且不需要返回值的一些异步操作。
具体配置方法:
服务提供方,服务级配置

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="failsafe" />

服务调用方,服务级配置

<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" cluster="failsafe"/>

其中服务调用方配置优先于服务提供方配置。
按照目前的实现,Failback策略还有一些局限,例如内存中的失败调用列表没有上限,可能导致堆积,异步重试的执行间隔无法调整,默认是5秒。

5. Forking(并行调用)
上述几种策略中,主要都是针对调用失败发生后如何进行弥补的角度去考虑的,而 Forking 策略则跟上述几种策略不同,是一种典型的用成本换时间的思路。即第一次调用的时候就同时发起多个调用,只要其中一个调用成功,就认为成功。在资源充足,且对于失败的容忍度较低的场景下,可以采用此策略。
具体配置方法:
服务提供方,服务级配置

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="forking" />

服务调用方,服务级配置

<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" cluster="forking"/>

其中服务调用方配置优先于服务提供方配置

6. Broadcast(广播调用)
在某些场景下,可能需要对服务的所有提供者进行操作,此时可以使用广播调用策略。此策略会逐个调用所有提供者,只要任意有一个提供者出错,则认为此次调用出错。通常用于通知所有提供者更新缓存或日志等本地资源信息。
具体配置方法:
服务提供方,服务级配置

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" cluster="broadcast" />

服务调用方,服务级配置

<dubbo:reference id="demoService" interface="org.apache.dubbo.demo.DemoService" cluster="broadcast"/>

其中服务调用方配置优先于服务提供方配置。

1.2.2 集群容错调优

下表对各种策略做一个简单对比,

在这里插入图片描述
综上我们得知,不同的容错策略往往对应不同的业务处理,这里做一个总结如下:
Failover :通常用于对调用rt不敏感的场景,如读操作;但重试会带来更长延迟
Failfast :通常用于非幂等性操作,需要快速感知失败的场景;比如新增记录
Failsafe :通常用于旁路系统,失败不影响核心流程正确性的场景;如日志记录
Failback :通常用于对于实时性要求不高,且不需要返回值的一些异步操作的场景
Forking :通常用于资源充足,且对于失败的容忍度较低,实时性要求高的读操作,但需要浪费更多服务资源
Broadcast:如通知所有提供者更新缓存或日志等本地资源信息

1.2.3 源码分析

我们在上一章看到了两个概念,分别是集群接口 Cluster 和 Cluster Invoker,这两者是不同的。Cluster 是接口,而 Cluster Invoker 是一种 Invoker。服务提供者的选择逻辑,以及远程调用失败后的的处理逻辑均是封装在 Cluster Invoker 中。那么 Cluster 接口和相关实现类有什么用呢?用途比较简单,仅用于生成 Cluster Invoker。下面我们来看一下源码。

public class FailbackCluster implements Cluster {

    public final static String NAME = "failback";

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailbackClusterInvoker<T>(directory);
    }

}

如上,FailoverCluster 总共就包含这几行代码,用于创建 FailoverClusterInvoker 对象,很简单。下面
再看一个。

public class FailbackCluster implements Cluster {

    public final static String NAME = "failback";

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailbackClusterInvoker<T>(directory);
    }

}

如上,FailbackCluster 的逻辑也是很简单,无需解释了。所以接下来,我们把重点放在各种 Cluster Invoker 上

1. Cluster Invoker
我们首先从各种 Cluster Invoker 的父类 AbstractClusterInvoker 源码开始说起。前面说过,集群工作过程可分为两个阶段,第一个阶段是在服务消费者初始化期间,这个在服务引用那篇文章中分析过,就不赘述。第二个阶段是在服务消费者进行远程调用时,此时 AbstractClusterInvoker 的 invoke 方法会被调用。列举 Invoker,负载均衡等操作均会在此阶段被执行。因此下面先来看一下 invoke 方法的逻辑。

   @Override
    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();

        // binding attachments into invocation.
        // 绑定 attachments 到 invocation 中.
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
		    ((RpcInvocation) invocation).addAttachments(contextAttachments);
	    }

        //获取invokers集合
        List<Invoker<T>> invokers = list(invocation);
        LoadBalance loadbalance = initLoadBalance(invokers, invocation);
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        //调用指定的服务容错组件
        return doInvoke(invocation, invokers, loadbalance);
    }

    protected void checkWhetherDestroyed() {
        if (destroyed.get()) {
            throw new RpcException("Rpc cluster invoker for " + getInterface() + " on consumer " + NetUtils.getLocalHost()
                    + " use dubbo version " + Version.getVersion()
                    + " is now destroyed! Can not invoke any more.");
        }
    }

// 抽象方法,由子类实现
 protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                       LoadBalance loadbalance) throws RpcException;

AbstractClusterInvoker 的 invoke 方法主要用于列举 Invoker,以及加载 LoadBalance。最后再调用模板方法 doInvoke 进行后续操作。下面我们来看一下 Invoker 列举方法 list(Invocation) 的逻辑,如下:

protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
        return directory.list(invocation);
    }

2. FailoverClusterInvoker
FailoverClusterInvoker 在调用失败时,会自动切换 Invoker 进行重试。默认配置下,Dubbo 会使用这个类作为缺省 Cluster Invoker。下面来看一下该类的逻辑。

  public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        //重试次数
        int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        //循环重试次数执行重试调用
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i > 0) {
            // 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,
            // 通过调用 list 可得到最新可用的 Invoker 列表
                checkWhetherDestroyed();
                copyInvokers = list(invocation);
                // check again
                // 对 copyinvokers 进行判空检查
                checkInvokers(copyInvokers, invocation);
            }
            //通过负载均衡算法找到对应的Invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            // 添加到 invoker 到 invoked 列表中
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                //执行远程调用
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyInvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                //返回远程执行结果
                return result;
                //如果发生异常,则重试循环执行
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le.getCode(), "Failed to invoke the method "
                + methodName + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyInvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }

如上,FailoverClusterInvoker 的 doInvoke 方法首先是获取重试次数,然后根据重试次数进行循环调用,失败后进行重试。在 for 循环内,首先是通过负载均衡组件选择一个 Invoker,然后再通过这个Invoker 的 invoke 方法进行远程调用。如果失败了,记录下异常,并进行重试。重试时会再次调用父类的 list 方法列举 Invoker。整个流程大致如此,不是很难理解。下面我们看一下 select 方法的逻辑。

 protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
                                List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        }
        // 获取调用方法名
        String methodName = invocation == null ? StringUtils.EMPTY : invocation.getMethodName();

        // 获取 sticky 配置,sticky 表示粘滞连接。所谓粘滞连接是指让服务消费者尽可能的 
        // 调用同一个服务提供者,除非该提供者挂了再进行切换
        boolean sticky = invokers.get(0).getUrl()
                .getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);

        //ignore overloaded method
        // 检测 invokers 列表是否包含 stickyInvoker,如果不包含, 
        // 说明 stickyInvoker 代表的服务提供者挂了,此时需要将其置空
        if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
            stickyInvoker = null;
        }
        //ignore concurrency problem
        // 在 sticky 为 true,且 stickyInvoker != null 的情况下。如果 selected 包含 
        // stickyInvoker,表明 stickyInvoker 对应的服务提供者可能因网络原因未能成功提供 服务。 
        // 但是该提供者并没挂,此时 invokers 列表中仍存在该服务提供者对应的 Invoker。
        if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
        // availablecheck 表示是否开启了可用性检查,如果开启了,则调用 stickyInvoker 的 
        // isAvailable 方法进行检查,如果检查通过,则直接返回 stickyInvoker。
            if (availablecheck && stickyInvoker.isAvailable()) {
                return stickyInvoker;
            }
        }
        //负载均衡算法调用
        // 如果线程走到当前代码处,说明前面的 stickyInvoker 为空,或者不可用。
         // 此时继续调用 doSelect 选择 Invoker
        Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

       // 如果 sticky 为 true,则将负载均衡组件选出的 Invoker 赋值给 stickyInvoker
        if (sticky) {
            stickyInvoker = invoker;
        }
        return invoker;
    }

如上,select 方法的主要逻辑集中在了对粘滞连接特性的支持上。首先是获取 sticky 配置,然后再检测invokers 列表中是否包含 stickyInvoker,如果不包含,则认为该 stickyInvoker 不可用,此时将其置空。这里的 invokers 列表可以看做是存活着的服务提供者列表,如果这个列表不包含 stickyInvoker,那自然而然的认为 stickyInvoker 挂了,所以置空。如果 stickyInvoker 存在于 invokers 列表中,此时要进行下一项检测 — 检测 selected 中是否包含 stickyInvoker。如果包含的话,说明 stickyInvoker 在此之前没有成功提供服务(但其仍然处于存活状态)。此时我们认为这个服务不可靠,不应该在重试期间内再次被调用,因此这个时候不会返回该 stickyInvoker。如果 selected 不包含 stickyInvoker,此时还需要进行可用性检测,比如检测服务提供者网络连通性等。当可用性检测通过,才可返回stickyInvoker,否则调用 doSelect 方法选择 Invoker。如果 sticky 为 true,此时会将 doSelect 方法选出的 Invoker 赋值给 stickyInvoker。

以上就是 select 方法的逻辑,这段逻辑看起来不是很复杂,但是信息量比较大。不搞懂 invokers 和selected 两个入参的含义,以及粘滞连接特性,这段代码是不容易看懂的。所以大家在阅读这段代码时,不要忽略了对背景知识的理解。关于 select 方法先分析这么多,继续向下分析。

    private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
                                List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        }
        if (invokers.size() == 1) {
            return invokers.get(0);
        }
        //负载均衡算法获取Invoker
        Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

        //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
        //如果当前Invoker已经被选中调用,或者无效,则重新选择一个invoker
        if ((selected != null && selected.contains(invoker))
                || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
            try {
              // 进行重选
                Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                if (rInvoker != null) {
                // 如果 rinvoker 不为空,则将其赋值给 invoker
                    invoker = rInvoker;
                } else {
                    //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                    // rinvoker 为空,定位 invoker 在 invokers 中的位置
                    int index = invokers.indexOf(invoker);
                    try {
                        //Avoid collision
                        invoker = invokers.get((index + 1) % invokers.size());
                    } catch (Exception e) {
                        logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                    }
                }
            } catch (Throwable t) {
                logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
            }
        }
        return invoker;
    }

doSelect 主要做了两件事,第一是通过负载均衡组件选择 Invoker。第二是,如果选出来的 Invoker 不稳定,或不可用,此时需要调用 reselect 方法进行重选。若 reselect 选出来的 Invoker 为空,此时定位invoker 在 invokers 列表中的位置 index,然后获取 index + 1 处的 invoker,这也可以看做是重选逻辑的一部分。下面我们来看一下 reselect 方法的逻辑。

private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
                                List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException {

        //Allocating one in advance, this list is certain to be used.
        List<Invoker<T>> reselectInvokers = new ArrayList<>(
                invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());

        // First, try picking a invoker not in `selected`.
        // 根据 availablecheck 进行不同的处理
        for (Invoker<T> invoker : invokers) {
        // 检测可用性
            if (availablecheck && !invoker.isAvailable()) {
                continue;
            }
          // 如果 selected 列表不包含当前 invoker,则将其添加到 reselectInvokers 中
            if (selected == null || !selected.contains(invoker)) {
                reselectInvokers.add(invoker);
            }
        }
       // reselectInvokers 不为空,此时通过负载均衡组件进行选择
        if (!reselectInvokers.isEmpty()) {
        // 通过负载均衡组件进行选择
            return loadbalance.select(reselectInvokers, getUrl(), invocation);
        }

        // Just pick an available invoker using loadbalance policy
        // 若线程走到此处,说明 reselectInvokers 集合为空,此时不会调用负载均衡组件进行筛 选。
         // 这里从 selected 列表中查找可用的 Invoker,并将其添加到 reselectInvokers 集 合中
        if (selected != null) {
            for (Invoker<T> invoker : selected) {
                if ((invoker.isAvailable()) // available first
                        && !reselectInvokers.contains(invoker)) {
                    reselectInvokers.add(invoker);
                }
            }
        }
        if (!reselectInvokers.isEmpty()) {
        // 再次进行选择,并返回选择结果
            return loadbalance.select(reselectInvokers, getUrl(), invocation);
        }

        return null;
    }

reselect 方法总结下来其实只做了两件事情,第一是查找可用的 Invoker,并将其添加到reselectInvokers 集合中。第二,如果 reselectInvokers 不为空,则通过负载均衡组件再次进行选择。其中第一件事情又可进行细分,一开始,reselect 从 invokers 列表中查找有效可用的 Invoker,若未能找到,此时再到 selected 列表中继续查找。关于 reselect 方法就先分析到这,继续分析其他的 ClusterInvoker。

3. FailbackClusterInvoker
FailbackClusterInvoker 会在调用失败后,返回一个空结果给服务消费者。并通过定时任务对失败的调用进行重传,适合执行消息通知等操作。下面来看一下它的实现逻辑。

public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);

    private static final long RETRY_FAILED_PERIOD = 5;

    private final int retries;

    private final int failbackTasks;

    private volatile Timer failTimer;

    public FailbackClusterInvoker(Directory<T> directory) {
        super(directory);

        int retriesConfig = getUrl().getParameter(RETRIES_KEY, DEFAULT_FAILBACK_TIMES);
        if (retriesConfig <= 0) {
            retriesConfig = DEFAULT_FAILBACK_TIMES;
        }
        int failbackTasksConfig = getUrl().getParameter(FAIL_BACK_TASKS_KEY, DEFAULT_FAILBACK_TASKS);
        if (failbackTasksConfig <= 0) {
            failbackTasksConfig = DEFAULT_FAILBACK_TASKS;
        }
        retries = retriesConfig;
        failbackTasks = failbackTasksConfig;
    }

    private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
        if (failTimer == null) {
            synchronized (this) {
                if (failTimer == null) {
                    failTimer = new HashedWheelTimer(
                            new NamedThreadFactory("failback-cluster-timer", true),
                            1,
                            TimeUnit.SECONDS, 32, failbackTasks);
                }
            }
        }
        RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
        try {
            failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
        } catch (Throwable e) {
            logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
        }
    }

    @Override
    protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        Invoker<T> invoker = null;
        try {
            checkInvokers(invokers, invocation);
            //负载均衡算法查找对应的节点
            invoker = select(loadbalance, invocation, invokers, null);
            //直接执行
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                    + e.getMessage() + ", ", e);
            //添加重试定时任务
            addFailed(loadbalance, invocation, invokers, invoker);
            return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
        }
    }

这个类主要由3个方法组成,首先是 doInvoker,该方法负责初次的远程调用。若远程调用失败,则通过 addFailed 方法将调用信息存入到 failed 中,等待定时重试。addFailed 在开始阶段会根据retryFuture 为空与否,来决定是否开启定时任务。retryFailed 方法则是包含了失败重试的逻辑,该方法会对 failed 进行遍历,然后依次对 Invoker 进行调用。调用成功则将 Invoker 从 failed 中移除,调用失败则忽略失败原因

4. FailfastClusterInvoker
FailfastClusterInvoker 只会进行一次调用,失败后立即抛出异常。适用于幂等操作,比如新增记录。源码如下:

public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {

    public FailfastClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        //负载均衡算法找到一个节点信息
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        try {
            //执行远程调用
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
		        throw (RpcException) e;
	        }
            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
                    "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
                            + " select from all providers " + invokers + " for service " + getInterface().getName()
                            + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                            + " use dubbo version " + Version.getVersion()
                            + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
                    e.getCause() != null ? e.getCause() : e);
        }
    }
}

5. FailsafeClusterInvoker
FailsafeClusterInvoker 是一种失败安全的 Cluster Invoker。所谓的失败安全是指,当调用过程中出现异常时,FailsafeClusterInvoker 仅会打印异常,而不会抛出异常。适用于写入审计日志等操作。下面分析源码

public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);

    public FailsafeClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            //负载均衡算法获取节点信息
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
            //执行远程调用
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            //发生异常,记录日志
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
            //返回空数据,客户端不会看到异常
            return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
        }
    }
}

6. ForkingClusterInvoker
ForkingClusterInvoker 会在运行时通过线程池创建多个线程,并发调用多个服务提供者只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。ForkingClusterInvoker 的应用场景是在一些对实时性要求比较高读操作(注意是读操作,并行写操作可能不安全)下使用,但这将会耗费更多的资源。下面来看该类的实现。

public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {

    /**
     * Use {@link NamedInternalThreadFactory} to produce {@link org.apache.dubbo.common.threadlocal.InternalThread}
     * which with the use of {@link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
     */
    private final ExecutorService executor = Executors.newCachedThreadPool(
            new NamedInternalThreadFactory("forking-cluster-timer", true));

    public ForkingClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            final List<Invoker<T>> selected;
            // 获取 forks 配置
            final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS);
            // 获取超时配置
            final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            // 如果 forks 配置不合理,则直接将 invokers 赋值给 selected
            if (forks <= 0 || forks >= invokers.size()) {
                selected = invokers;
            } else {
                selected = new ArrayList<>();
                //循环每个节点,添加到集合中
                for (int i = 0; i < forks; i++) {
                    Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                    // 选择 Invoker
                    if (!selected.contains(invoker)) {
                        //Avoid add the same invoker several times.
                        selected.add(invoker);
                    }
                }
            }
            // ----------------------✨ 分割线1 ✨---------------------- //
            RpcContext.getContext().setInvokers((List) selected);
            final AtomicInteger count = new AtomicInteger();
            final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
            //在线程池中执行每个节点的访问
            for (final Invoker<T> invoker : selected) {
            // 为每个 Invoker 创建一个执行线程
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //每个节点执行调用
                            Result result = invoker.invoke(invocation);
                            //结果集放到队列中
                            ref.offer(result);
                        } catch (Throwable e) {
                            int value = count.incrementAndGet();
                            // 仅在 value 大于等于 selected.size() 时,才将异常对象 
                            // 放入阻塞队列中,请大家思考一下为什么要这样做。
                            if (value >= selected.size()) {
                                ref.offer(e);
                            }
                        }
                    }
                });
            }
            // ----------------------✨ 分割线2 ✨---------------------- //
            try {
                //只要一个成功,就返回结果
                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                if (ret instanceof Throwable) {
                // 如果结果类型为 Throwable,则抛出异常
                    Throwable e = (Throwable) ret;
                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                }
                // 返回结果 return (Result) r
                return (Result) ret;
            } catch (InterruptedException e) {
                throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
            }
        } finally {
            // clear attachments which is binding to current thread.
            RpcContext.getContext().clearAttachments();
        }
    }
}

ForkingClusterInvoker 的 doInvoker 方法比较长,这里通过两个分割线将整个方法划分为三个逻辑块。从方法开始到分割线1之间的代码主要是用于选出 forks 个 Invoker,为接下来的并发调用提供输入。分割线1和分割线2之间的逻辑通过线程池并发调用多个 Invoker,并将结果存储在阻塞队列中。分割线2到方法结尾之间的逻辑主要用于从阻塞队列中获取返回结果,并对返回结果类型进行判断。如果为异常类型,则直接抛出,否则返回。
以上就是ForkingClusterInvoker 的 doInvoker 方法大致过程。我们在分割线1和分割线2之间的代码上留了一个问题,问题是这样的:为什么要在 value >= selected.size() 的情况下,才将异常对象添加到阻塞队列中?这里来解答一下。原因是这样的,在并行调用多个服务提供者的情况下,只要有一个服务提供者能够成功返回结果,而其他全部失败。此时 ForkingClusterInvoker 仍应该返回成功的结果,而非抛出异常。在 value >= selected.size() 时将异常对象放入阻塞队列中,可以保证异常对象不会出现在正常结果的前面,这样可从阻塞队列中优先取出正常的结果。

7. BroadcastClusterInvoker
BroadcastClusterInvoker 会逐个调用每个服务提供者,如果其中一台报错,在循环调用结束后,BroadcastClusterInvoker 会抛出异常。该类通常用于通知所有提供者更新缓存或日志等本地资源信息。源码如下。

public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {

    /**
     * Use {@link NamedInternalThreadFactory} to produce {@link org.apache.dubbo.common.threadlocal.InternalThread}
     * which with the use of {@link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
     */
    private final ExecutorService executor = Executors.newCachedThreadPool(
            new NamedInternalThreadFactory("forking-cluster-timer", true));

    public ForkingClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            final List<Invoker<T>> selected;
            final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS);
            final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            if (forks <= 0 || forks >= invokers.size()) {
                selected = invokers;
            } else {
                selected = new ArrayList<>();
                //循环每个节点,添加到集合中
                for (int i = 0; i < forks; i++) {
                    Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                    if (!selected.contains(invoker)) {
                        //Avoid add the same invoker several times.
                        selected.add(invoker);
                    }
                }
            }
            RpcContext.getContext().setInvokers((List) selected);
            final AtomicInteger count = new AtomicInteger();
            final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
            //在线程池中执行每个节点的访问
            for (final Invoker<T> invoker : selected) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //每个节点执行调用
                            Result result = invoker.invoke(invocation);
                            //结果集放到队列中
                            ref.offer(result);
                        } catch (Throwable e) {
                            int value = count.incrementAndGet();
                            if (value >= selected.size()) {
                                ref.offer(e);
                            }
                        }
                    }
                });
            }
            try {
                //只要一个成功,就返回结果
                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                if (ret instanceof Throwable) {
                    Throwable e = (Throwable) ret;
                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                }
                return (Result) ret;
            } catch (InterruptedException e) {
                throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
            }
        } finally {
            // clear attachments which is binding to current thread.
            RpcContext.getContext().clearAttachments();
        }
    }
}

这里分析了集群容错的几种实现方式。集群容错对于 Dubbo 框架来说,是很重要的逻辑。集群模块处于服务提供者和消费者之间,对于服务消费者来说,集群可向其屏蔽服务提供者集群的情况,使其能够专心进行远程调用。除此之外,通过集群模块,我们还可以对服务之间的调用链路进行编排优化,治理服务。总的来说,对于 Dubbo 而言,集群容错相关逻辑是非常重要的。想要对 Dubbo 有比较深的理解,集群容错是必须要掌握的

1.3 集群负载均衡策略

1.3.1 负载均衡的主要作用

在这里插入图片描述
负载均衡(LoadBalance),它的职责是将网络请求,或者其他形式的负载“均摊”到不同的机器上。避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况。通过负载均衡,可以让每台服务器获取到适合自己处理能力的负载。在为高负载服务器分流的同时,还可以避免资源浪费,一举两得。
在 Dubbo 中,也有负载均衡的概念和相应的实现。Dubbo 需要对服务消费者的调用请求进行分配,避免少数服务提供者负载过大。服务提供者负载过大,会导致部分请求超时。因此将负载均衡到每个服务提供者上,是非常必要的。

1.3.2 内置的负载均衡策略

Dubbo 提供了4种负载均衡实现,分别是基于权重随机算法的 RandomLoadBalance、基于最少活跃调用数算法的 LeastActiveLoadBalance、基于 hash 一致性的 ConsistentHashLoadBalance,以及基于加权轮询算法的 RoundRobinLoadBalance。这几个负载均衡算法代码不是很长,但是想看懂也不是很容易,需要大家对这几个算法的原理有一定了解才行。如果不是很了解,也没不用太担心。我们会在分析每个算法的源码之前,对算法原理进行简单的讲解,帮助大家建立初步的印象。

1. RandomLoadBalance
RandomLoadBalance 是加权随机算法的具体实现,它的算法思想很简单。假设我们有一组服务器servers = [A, B, C],他们对应的权重为 weights = [5, 3, 2],权重总和为10。现在把这些权重值平铺在一维坐标值上,[0, 5) 区间属于服务器 A,[5, 8) 区间属于服务器 B,[8, 10) 区间属于服务器 C。接下来通过随机数生成器生成一个范围在 [0, 10) 之间的随机数,然后计算这个随机数会落到哪个区间上。比如数字3会落到服务器 A 对应的区间上,此时返回服务器 A 即可。权重越大的机器,在坐标轴上对应的区间范围就越大,因此随机数生成器生成的数字就会有更大的概率落到此区间内。只要随机数生成器产生的随机数分布性很好,在经过多次选择后,每个服务器被选中的次数比例接近其权重比例。
在这里插入图片描述

/**
 * random load balance.
 * 随机负载均衡算法:随机+权重
 */
public class RandomLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "random";

    /***
     * 随机负载均衡算法:随机+权重
     * @param invokers
     * @param url
     * @param invocation
     * @param <T>
     * @return
     */
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // Number of invokers
        int length = invokers.size();
        // Every invoker has the same weight?
        boolean sameWeight = true;
        //每个提供者权重数组,下标代表节点下标,值为节点的权重值
        int[] weights = new int[length];
        // the first invoker's weight 获取第1个服务的权重
        int firstWeight = getWeight(invokers.get(0), invocation);
        weights[0] = firstWeight;
        // 总权重(所有服务的权重叠加集合)
        int totalWeight = firstWeight;
        //循环所有节点进行权重统计
        for (int i = 1; i < length; i++) {
        	//获取每个提供的权重
            int weight = getWeight(invokers.get(i), invocation);
            // save for later use
            //保存对应节点的权重
            weights[i] = weight;
            //叠加所有权重
            totalWeight += weight;

            /***
             * 任何一个节点的权重只要和第1个的权重不同,则表示【并非所有权重都相同】
             */
            if (sameWeight && weight != firstWeight) {
                sameWeight = false;
            }
        }

        /***
         * 并非所有权重都相同
         */
         // 下面的 if 分支主要用于获取随机数,并计算随机数落在哪个区间上
        if (totalWeight > 0 && !sameWeight) {
        	//在总权重中,随机一个数   (10==》6)
            int offset = ThreadLocalRandom.current().nextInt(totalWeight); //随机的
            // 循环多个提供者
            // 循环让 offset 数减去服务提供者权重值,当 offset 小于0时,返回相应的 Invoker。 
            // 举例说明一下,我们有 servers = [A, B, C],weights = [5, 3, 2],offset = 7。 
            // 第一次循环,offset - 5 = 2 > 0,即 offset > 5, 
            // 表明其不会落在服务器 A 对应的区间上。 
            // 第二次循环,offset - 3 = -1 < 0,即 5 < offset < 8, 
            // 表明其会落在服务器 B 对应的区间上
            for (int i = 0; i < length; i++) {
            	//随机数 - 当前机器权重
	            //6 -5 >-
                offset -= weights[i];
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        //如果所有节点权重都相同,则直接随机获取一个
        return invokers.get(ThreadLocalRandom.current().nextInt(length));
    }

}

RandomLoadBalance 的算法思想比较简单,在经过多次请求后,能够将调用请求按照权重值进行“均 匀”分配。然 RandomLoadBalance 也存在一定的缺点,当调用次数比较少时,Random 产生的随机数可能会比较集中,此时多数请求会落到同一台服务器上。这个缺点并不是很严重,多数情况下可以忽略。RandomLoadBalance 是一个简单,高效的负载均衡实现,因此 Dubbo 选择它作为缺省实现。

2. LeastActiveLoadBalance
LeastActiveLoadBalance 翻译过来是最小活跃数负载均衡。活跃调用数越小,表明该服务提供者效率越高,单位时间内可处理更多的请求。此时应优先将请求分配给该服务提供者。在具体实现中,每个服务提供者对应一个活跃数 active。初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1。在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求、这就是最小活跃数负载均衡算法的基本思想。关于 LeastActiveLoadBalance 的背景知识就先介绍到这里,下面开始分析源码

public class LeastActiveLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "leastactive";

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size();
        // 最小的活跃数
        int leastActive = -1;
        // 具有相同“最小活跃数”的服务者提供者(以下用 Invoker 代称)数量
        int leastCount = 0;
        // leastIndexs 用于记录具有相同“最小活跃数”的 Invoker 在 invokers 列表中的下标信息
        int[] leastIndexes = new int[length];
        // 记录每个Invoker的权重
        int[] weights = new int[length];
        int totalWeight = 0;
	    // 第一个最小活跃数的 Invoker 权重值,用于与其他具有相同最小活跃数的 Invoker 的权重进行对比,
	    // 以检测是否“所有具有相同最小活跃数的 Invoker 的权重”均相等
        int firstWeight = 0;
        boolean sameWeight = true;


		//遍历 invokers 列表
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
	        // 获取 Invoker 对应的活跃数
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
            int afterWarmup = getWeight(invoker, invocation);
	        //获取权重
            weights[i] = afterWarmup;
	        // 发现更小的活跃数,重新开始
            if (leastActive == -1 || active < leastActive) {
	            // 使用当前活跃数 active 更新最小活跃数 leastActive
                leastActive = active;
	            // 更新 leastCount 为 1
                leastCount = 1;
	            // 记录当前下标值到 leastIndexs 中
                leastIndexes[0] = i;
                totalWeight = afterWarmup;
                firstWeight = afterWarmup;
                sameWeight = true;
                // 当前 Invoker 的活跃数 active 与最小活跃数 leastActive 相同
            } else if (active == leastActive) {
	            // 在 leastIndexs 中记录下当前 Invoker 在 invokers 集合中的下标
                leastIndexes[leastCount++] = i;
	            // 累加权重
                totalWeight += afterWarmup;
	            // 检测当前 Invoker 的权重与 firstWeight 是否相等,
	            // 不相等则将 sameWeight 置为 false
                if (sameWeight && i > 0
                        && afterWarmup != firstWeight) {
                    sameWeight = false;
                }
            }
        }
	    // 当只有一个 Invoker 具有最小活跃数,此时直接返回该 Invoker 即可
        if (leastCount == 1) {
            return invokers.get(leastIndexes[0]);
        }
	    // 有多个 Invoker 具有相同的最小活跃数,但它们之间的权重不同
        if (!sameWeight && totalWeight > 0) {
	        // 随机生成一个 [0, totalWeight) 之间的数字
            int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
	        // 循环让随机数减去具有最小活跃数的 Invoker 的权重值,
	        // 当 offset 小于等于0时,返回相应的 Invoker
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexes[i];
	            //获取权重值,并让随机数减去权重值
                offsetWeight -= weights[leastIndex];
                if (offsetWeight < 0) {
                    return invokers.get(leastIndex);
                }
            }
        }
	    // 如果权重相同或权重为0时,随机返回一个 Invoker
        return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
    }
}

除了最小活跃数,LeastActiveLoadBalance 在实现上还引入了权重值。所以准确的来说,LeastActiveLoadBalance 是基于加权最小活跃数算法实现的。举个例子说明一下,在一个服务提供者集群中,有两个性能优异的服务提供者。某一时刻它们的活跃数相同,此时 Dubbo 会根据它们的权重去分配请求,权重越大,获取到新请求的概率就越大。如果两个服务提供者权重相同,此时随机选择一个即可。

3. ConsistentHashLoadBalance
一致性 hash 算法由麻省理工学院的 Karger 及其合作者于1997年提出的,算法提出之初是用于大规模缓存系统的负载均衡。它的工作过程是这样的,首先根据 ip 或者其他的信息为缓存节点生成一个hash,并将这个 hash 投射到 [0, 2^32-1] 的圆环上。当有查询或写入请求时,则为缓存项的 key 生成一个 hash 值。然后查找第一个大于或等于该 hash 值的缓存节点,并到这个节点中查询或写入缓存项。如果当前节点挂了,则在下一次查询或写入缓存时,为缓存项查找另一个大于其 hash 值的缓存节点即可。大致效果如下图所示,每个缓存节点在圆环上占据一个位置。如果缓存项的 key 的 hash 值小于缓存节点 hash 值,则到该缓存节点中存储或读取缓存项。比如下面绿色点对应的缓存项将会被存储到 cache-2 节点中。由于 cache-3 挂了,原本应该存到该节点中的缓存项最终会存储到 cache-4 节点中。
在这里插入图片描述
下面来看看一致性 hash 在 Dubbo 中的应用。我们把上图的缓存节点替换成 Dubbo 的服务提供者,于是得到了下图:
在这里插入图片描述
这里相同颜色的节点均属于同一个服务提供者,比如 Invoker1-1,Invoker1-2,……, Invoker1-160。这样做的目的是通过引入虚拟节点,让 Invoker 在圆环上分散开来,避免数据倾斜问题。所谓数据倾斜是指,由于节点不够分散,导致大量请求落到了同一个节点上,而其他节点只会接收到了少量请求的情况。比如:
在这里插入图片描述
如上,由于 Invoker-1 和 Invoker-2 在圆环上分布不均,导致系统中75%的请求都会落到 Invoker-1上,只有 25% 的请求会落到 Invoker-2 上。解决这个问题办法是引入虚拟节点,通过虚拟节点均衡各
个节点的请求量。
到这里背景知识就普及完了,接下来开始分析源码。我们先从 ConsistentHashLoadBalance 的doSelect 方法开始看起,如下

public class ConsistentHashLoadBalance extends AbstractLoadBalance {
    public static final String NAME = "consistenthash";

    /**
     * Hash nodes name
     */
    public static final String HASH_NODES = "hash.nodes";

    /**
     * Hash arguments name
     */
    public static final String HASH_ARGUMENTS = "hash.arguments";

    private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

    @SuppressWarnings("unchecked")
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String methodName = RpcUtils.getMethodName(invocation);
        String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
        // 获取 invokers 原始的 hashcode
        int identityHashCode = System.identityHashCode(invokers);
        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
        // 如果 invokers 是一个新的 List 对象,意味着服务提供者数量发生了变化,可能新增也 可能减少了。
        // 此时 selector.identityHashCode != identityHashCode 条件成立
        if (selector == null || selector.identityHashCode != identityHashCode) {
        // 创建新的 ConsistentHashSelector
            selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
            // 调用 ConsistentHashSelector 的 select 方法选择 Invoker
            selector = (ConsistentHashSelector<T>) selectors.get(key);
        }
        return selector.select(invocation);
    }

如上,doSelect 方法主要做了一些前置工作,比如检测 invokers 列表是不是变动过,以及创建ConsistentHashSelector。这些工作做完后,接下来开始调用 ConsistentHashSelector 的 select 方法执行负载均衡逻辑。在分析 select 方法之前,我们先来看一下一致性 hash 选择器ConsistentHashSelector 的初始化过程,如下

   private static final class ConsistentHashSelector<T> {
        // 使用 TreeMap 存储 Invoker 虚拟节点
        private final TreeMap<Long, Invoker<T>> virtualInvokers;

        private final int replicaNumber;

        private final int identityHashCode;

        private final int[] argumentIndex;

        ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
            this.identityHashCode = identityHashCode;
            URL url = invokers.get(0).getUrl();
            // 获取虚拟节点数,默认为160
            this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
            // 获取参与 hash 计算的参数下标值,默认对第一个参数进行 hash 运算
            String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i++) {
                argumentIndex[i] = Integer.parseInt(index[i]);
            }
            for (Invoker<T> invoker : invokers) {
                String address = invoker.getUrl().getAddress();
                for (int i = 0; i < replicaNumber / 4; i++) {
                // 对 address + i 进行 md5 运算,得到一个长度为16的字节数组
                    byte[] digest = md5(address + i);
                    // 对 digest 部分字节进行4次 hash 运算,得到四个不同的 long 型正整数
                    for (int h = 0; h < 4; h++) {
                    // h = 0 时,取 digest 中下标为 0 ~ 3 的4个字节进行位运算 
                    // h = 1 时,取 digest 中下标为 4 ~ 7 的4个字节进行位运算
                     // h = 2, h = 3 时过程同上
                        long m = hash(digest, h);
                        // 将 hash 到 invoker 的映射关系存储到 virtualInvokers 中, 
                        // virtualInvokers 需要提供高效的查询操作,因此选用 TreeMap 作为存 储结构
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }

ConsistentHashSelector 的构造方法执行了一系列的初始化逻辑,比如从配置中获取虚拟节点数以及参与 hash 计算的参数下标,默认情况下只使用第一个参数进行 hash。需要特别说明的是,**ConsistentHashLoadBalance 的负载均衡逻辑只受参数值影响,具有相同参数值的请求将会被分配给同一个服务提供者。**ConsistentHashLoadBalance 不 关系权重,因此使用时需要注意一下。
在获取虚拟节点数和参数下标配置后,接下来要做的事情是计算虚拟节点 hash 值,并将虚拟节点存储到 TreeMap 中。到此,ConsistentHashSelector 初始化工作就完成了。接下来,我们来看看 select 方法的逻辑。

  public Invoker<T> select(Invocation invocation) {
            // 将参数转为 key
            String key = toKey(invocation.getArguments());
            // 对参数 key 进行 md5 运算
            byte[] digest = md5(key);
            // 取 digest 数组的前四个字节进行 hash 运算,再将 hash 值传给 selectForKey 方法, 
            // 寻找合适的 Invoker
            return selectForKey(hash(digest, 0));
        }

        private String toKey(Object[] args) {
            StringBuilder buf = new StringBuilder();
            for (int i : argumentIndex) {
                if (i >= 0 && i < args.length) {
                    buf.append(args[i]);
                }
            }
            return buf.toString();
        }

        private Invoker<T> selectForKey(long hash) {
        // 到 TreeMap 中查找第一个节点值大于或等于当前 hash 的 Invoker
            Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
            // 如果 hash 大于 Invoker 在圆环上最大的位置,此时 entry = null, 
            // 需要将 TreeMap 的头节点赋值给 entry
            if (entry == null) {
                entry = virtualInvokers.firstEntry();
            }
            // 返回 Invoker
            return entry.getValue();
        }

        private long hash(byte[] digest, int number) {
            return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                    | ((long) (digest[2 + number * 4] & 0xFF) << 16)
                    | ((long) (digest[1 + number * 4] & 0xFF) << 8)
                    | (digest[number * 4] & 0xFF))
                    & 0xFFFFFFFFL;
        }

        private byte[] md5(String value) {
            MessageDigest md5;
            try {
                md5 = MessageDigest.getInstance("MD5");
            } catch (NoSuchAlgorithmException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            md5.reset();
            byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
            md5.update(bytes);
            return md5.digest();
        }

如上,选择的过程相对比较简单了。首先是对参数进行 md5 以及 hash 运算,得到一个 hash 值。然后再拿这个值到 TreeMap 中查找目标 Invoker 即可。

** RoundRobinLoadBalance**
LeastActiveLoadBalance 即加权轮询负载均衡,我们先来了解一下什么是加权轮询。这里从最简单的轮询开始讲起,所谓轮询是指将请求轮流分配给每台服务器。举个例子,我们有三台服务器 A、B、C。我们将第一个请求分配给服务器 A,第二个请求分配给服务器 B,第三个请求分配给服务器 C,第四个请求再次分配给服务器 A。这个过程就叫做轮询。轮询是一种无状态负载均衡算法,实现简单,适用于每台服务器性能相近的场景下。但现实情况下,我们并不能保证每台服务器性能均相近。如果我们将等量的请求分配给性能较差的服务器,这显然是不合理的。因此,这个时候我们需要对轮询过程进行加权,以调控每台服务器的负载。经过加权后,每台服务器能够得到的请求数比例,接近或等于他们的权重比。比如服务器 A、B、C 权重比为 5:2:1。那么在8次请求中,服务器 A 将收到其中的5次请求,服务器 B 会收到其中的2次请求,服务器 C 则收到其中的1次请求。

public class RoundRobinLoadBalance extends AbstractLoadBalance {
    public static final String NAME = "roundrobin";
    
    private static final int RECYCLE_PERIOD = 60000;
    
    protected static class WeightedRoundRobin {
    	//服务权重
        private int weight;
        //当前计算权重
        private AtomicLong current = new AtomicLong(0);
        //最后更新时间
        private long lastUpdate;
        public int getWeight() {
            return weight;
        }
        public void setWeight(int weight) {
            this.weight = weight;
            current.set(0);
        }
        public long increaseCurrent() {
            return current.addAndGet(weight);
        }
        public void sel(int total) {
            current.addAndGet(-1 * total);
        }
        public long getLastUpdate() {
            return lastUpdate;
        }
        public void setLastUpdate(long lastUpdate) {
            this.lastUpdate = lastUpdate;
        }
    }
// 嵌套 Map 结构,存储的数据结构示例如下: 
// { 
// "UserService.query": { 
// "url1": WeightedRoundRobin@123, 
// "url2": WeightedRoundRobin@456, 
// }, 
// "UserService.update": { 
// "url1": WeightedRoundRobin@123, 
// "url2": WeightedRoundRobin@456, 
// } 
// } 
// 最外层为服务类名 + 方法名,第二层为 url 到 WeightedRoundRobin 的映射关系。 
// 这里我们可以将 url 看成是服务提供者的 id 

    private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
    // 原子更新锁
    private AtomicBoolean updateLock = new AtomicBoolean();
    
    /**
     * get invoker addr list cached for specified invocation
     * <p>
     * <b>for unit test only</b>
     * 
     * @param invokers
     * @param invocation
     * @return
     */
    protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        if (map != null) {
            return map.keySet();
        }
        return null;
    }
    
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        // 获取 url 到 WeightedRoundRobin 映射表,如果为空,则创建一个新的
        ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        if (map == null) {
            methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
            map = methodWeightMap.get(key);
        }
        int totalWeight = 0;
        long maxCurrent = Long.MIN_VALUE;
        // 获取当前时间
        long now = System.currentTimeMillis();
        Invoker<T> selectedInvoker = null;
        WeightedRoundRobin selectedWRR = null;
        // 下面这个循环主要做了这样几件事情: 
// 1. 遍历 Invoker 列表,检测当前 Invoker 是否有 
// 相应的 WeightedRoundRobin,没有则创建 
// 2. 检测 Invoker 权重是否发生了变化,若变化了, 
// 则更新 WeightedRoundRobin 的 weight 字段 
// 3. 让 current 字段加上自身权重,等价于 current += weight 
// 4. 设置 lastUpdate 字段,即 lastUpdate = now 
// 5. 寻找具有最大 current 的 Invoker,以及 Invoker 对应的 WeightedRoundRobin, 
// 暂存起来,留作后用 
// 6. 计算权重总和 

        for (Invoker<T> invoker : invokers) {
            String identifyString = invoker.getUrl().toIdentityString();
            WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
            int weight = getWeight(invoker, invocation);
            // 检测当前 Invoker 是否有对应的 WeightedRoundRobin,没有则创建
            if (weightedRoundRobin == null) {
            // 设置 Invoker 权重
                weightedRoundRobin = new WeightedRoundRobin();
                weightedRoundRobin.setWeight(weight);
                // 存储 url 唯一标识 identifyString 到 weightedRoundRobin 的映射关系
                map.putIfAbsent(identifyString, weightedRoundRobin);
            }
            // Invoker 权重不等于 WeightedRoundRobin 中保存的权重,说明权重变化了,此 时进行更新
            if (weight != weightedRoundRobin.getWeight()) {
                //weight changed
                weightedRoundRobin.setWeight(weight);
            }
            // 让 current 加上自身权重,等价于 current += weight
            long cur = weightedRoundRobin.increaseCurrent();  //当前权重加 1
            // 设置 lastUpdate,表示近期更新过
            weightedRoundRobin.setLastUpdate(now);
            // 找出最大的 current
            if (cur > maxCurrent) {
                maxCurrent = cur;
                // 将具有最大 current 权重的 Invoker 赋值给 selectedInvoker
                selectedInvoker = invoker;  //选中的提供者
                // 将 Invoker 对应的 weightedRoundRobin 赋值给 selectedWRR,留作后用
                selectedWRR = weightedRoundRobin;
            }
            // 计算权重总和
            totalWeight += weight;
        }
        // 对 <identifyString, WeightedRoundRobin> 进行检查,过滤掉长时间未被更新的节 点。 
// 该节点可能挂了,invokers 中不包含该节点,所以该节点的 lastUpdate 长时间无法被 更新
// 若未更新时长超过阈值后,就会被移除掉,默认阈值为60秒。 

        if (!updateLock.get() && invokers.size() != map.size()) {
            if (updateLock.compareAndSet(false, true)) {
                try {
                    // copy -> modify -> update reference
                    ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
                    // 拷贝
                    newMap.putAll(map);
                    // 遍历修改,即移除过期记录
                    Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<String, WeightedRoundRobin> item = it.next();
                        if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
                            it.remove();
                        }
                    }
                    // 更新引用
                    methodWeightMap.put(key, newMap);
                } finally {
                    updateLock.set(false);
                }
            }
        }
        if (selectedInvoker != null) {
        // 让 current 减去权重总和,等价于 current -= totalWeight
            selectedWRR.sel(totalWeight);
            // 返回具有最大 current 的 Invoker
            return selectedInvoker;
        }
        // should not happen here
        return invokers.get(0);
    }

}

轮询调用并不是简单的一个接着一个依次调用,它是根据权重的值进行循环的。

1.3.3 负载均衡总结

Dubbo 负载均衡策略提供下列四种方式:
Random LoadBalance 随机,按权重设置随机概率。 Dubbo的默认负载均衡策略
在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
RoundRobin LoadBalance 轮循,按公约后的权重设置轮循比率。
存在慢的提供者累积请求问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。
LeastActive LoadBalance 最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。
ConsistentHash LoadBalance 一致性Hash,相同参数的请求总是发到同一提供者。
当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变
动。

1.4 服务治理

1.4.1 服务治理的概述

服务治理主要作用是改变运行时服务的行为和选址逻辑,达到限流,权重配置等目的,主要有:标签路由,条件路由,黑白名单,动态配置,权重调节,负载均衡等功能。
在这里插入图片描述

1.4.2 执行过程

在这里插入图片描述
1、消费者,提供者启动成功,订阅zookeeper节点
2、管理平台对服务进行治理处理,向zookeeper写入节点数据
3、写入成功,通知消费者,提供者
4、根据不同的业务处理,在invoker调用时做出响应的处理

1.4.3 服务治理功能

Dubbo管理后台提供了服务的动态治理, 包括以下功能:

  1. 应用级别的服务治理:
    Dubbo 2.7 版本中增加了应用粒度的服务治理操作,对于条件路由(包括黑白名单),动态配置(包括权重,负载均衡)都可以做应用级别的配置。可以按照应用名和服务名两个维度来维护。
  2. 条件路由
    可以根据条件, 来设定消费端访问的路由配置规则。通过条件路由, 能够更好的管理服务之间的访问关系。例如: app1的消费者只能消费所有端口为20880的服务实例,app2的消费者只能消费所有端口为20881的服务实例。 配置示例:
scope: application 
force: true 
runtime: true 
enabled: true 
key: governance-conditionrouter-consumer 
conditions: 
  - application=app1 => address=*:20880 
  - application=app2 => address=*:20881

  1. 标签路由
    配置以应用作为维度,给不同的服务器打上不同名字的标签, 可以通过这种方式来实现流量隔离,灰度发布等功能。
    在这里插入图片描述

  2. 黑白名单配置
    黑白名单是条件路由的一部分,规则存储和条件路由放在一起, 可以通过服务和应用两个维度,指定黑名单和白名单。
    在这里插入图片描述
    白名单配置示例:

register.ip != 10.20.153.10,10.20.153.11 =>

黑名单配置示例:

register.ip = 10.20.153.10,10.20.153.11 =>
  1. 动态配置
    动态配置是和路由规则平行的另一类服务治理治理功能,主要作用是在不重启服务的情况下,动态改变调用行为。
  2. 权重调节
    权重调节是动态配置的子功能,主要作用是改变服务端的权重,更大的权重会有更大的几率被客户端选中作为服务提供者,从而达到流量分配的目的。
  3. 负载均衡
    负载均衡也是动态配置的子功能,主要作用是调整客户端的选址逻辑,目前可选的负载均衡策略有随机,轮询和最小活跃数。

2. 网络通信原理

在这里插入图片描述

2.1 网络通信协议

序列化就是将对象转成字节流
网络通信位于Remoting模块:

网络通信的问题:

通信协议
dubbo内置,dubbo协议 ,rmi协议,hessian协议,http协议,webservice协议,thrift协议,rest协议,grpc协议,memcached协议,redis协议等10种通讯协议。各个协议特点如下

2.1.1 dubbo协议

Dubbo 缺省协议采用单一长连接和 NIO 异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。
缺省协议,使用基于 mina 1.1.7 和 hessian 3.2.1 的 tbremoting 交互。
连接个数:单连接
连接方式:长连接
传输协议:TCP
传输方式:NIO 异步传输
序列化:Hessian 二进制序列化
适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用 dubbo 协议传输大文件或超大字符串。
适用场景:常规远程服务方法调用

2.1.2 rmi协议

RMI 协议采用 JDK 标准的 java.rmi.* 实现,采用阻塞式短连接和 JDK 标准序列化方式。
连接个数:多连接
连接方式:短连接
传输协议:TCP
传输方式:同步传输
序列化:Java 标准二进制序列化
适用范围:传入传出参数数据包大小混合,消费者与提供者个数差不多,可传文件。
适用场景:常规远程服务方法调用,与原生RMI服务互操作

2.1.3 hessian协议

Hessian 协议用于集成 Hessian 的服务,Hessian 底层采用 Http 通讯,采用 Servlet 暴露服务,Dubbo 缺省内嵌 Jetty 作为服务器实现。
Dubbo 的 Hessian 协议可以和原生 Hessian 服务互操作,即:
提供者用 Dubbo 的 Hessian 协议暴露服务,消费者直接用标准 Hessian 接口调用或者提供方用标准 Hessian 暴露服务,消费方用 Dubbo 的 Hessian 协议调用。
连接个数:多连接
连接方式:短连接
传输协议:HTTP
传输方式:同步传输
序列化:Hessian二进制序列化
适用范围:传入传出参数数据包较大,提供者比消费者个数多,提供者压力较大,可传文件。
适用场景:页面传输,文件传输,或与原生hessian服务互操作

2.1.4 http协议

基于 HTTP 表单的远程调用协议,采用 Spring 的 HttpInvoker 实现
连接个数:多连接
连接方式:短连接
传输协议:HTTP
传输方式:同步传输
序列化:表单序列化
适用范围:传入传出参数数据包大小混合,提供者比消费者个数多,可用浏览器查看,可用表单或URL传入参数,暂不支持传文件。
适用场景:需同时给应用程序和浏览器 JS 使用的服务。

2.1.5 webservice协议

可以和原生 WebService 服务互操作,即:
提供者用 Dubbo 的 WebService 协议暴露服务,消费者直接用标准 WebService 接口调用,或者提供方用标准WebService 暴露服务,消费方用 Dubbo 的 WebService 协议调用。
连接个数:多连接
连接方式:短连接
传输协议:HTTP
传输方式:同步传输
序列化:SOAP 文本序列化(http + xml)
适用场景:系统集成,跨语言调用

2.1.6 thrift协议

当前 dubbo 支持 [1]的 thrift 协议是对 thrift 原生协议 [2] 的扩展,在原生协议的基础上添加了一些额外的头信息,比如 service name,magic number 等。

2.1.7 rest协议

基于标准的Java REST API——JAX-RS 2.0(Java API for RESTful Web Services的简写)实现的REST调用支持

2.1.8 grpc协议

Dubbo 自 2.7.5 版本开始支持 gRPC 协议,对于计划使用 HTTP/2 通信,或者想利用 gRPC 带来的Stream、反压、Reactive 编程等能力的开发者来说, 都可以考虑启用 gRPC 协议。
为期望使用 gRPC 协议的用户带来服务治理能力,方便接入 Dubbo 体系
用户可以使用 Dubbo 风格的,基于接口的编程风格来定义和使用远程服务

2.1.9 memcached协议

基于 memcached实现的 RPC 协议

2.1.10 redis协议

基于 Redis 实现的 RPC 协议

2.2 序列化实现剖析

序列化就是将对象转成字节流,用于网络传输,以及将字节流转为对象,用于在收到字节流数据后还原成对象。序列化的优势有很多,例如安全性更好、可跨平台等。我们知道dubbo基于netty进行网络通讯,在 NettyClient.doOpen() 方法中可以看到Netty的相关类

 /***
     * 服务创建
     * @throws Throwable
     */
    @Override
    protected void doOpen() throws Throwable {
        final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
        bootstrap = new Bootstrap();
        bootstrap.group(nioEventLoopGroup)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                .channel(NioSocketChannel.class);

        if (getConnectTimeout() < 3000) {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
        } else {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
        }

        //添加Handler
        bootstrap.handler(new ChannelInitializer() {

            @Override
            protected void initChannel(Channel ch) throws Exception {
                int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
                //编解码器
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                        //编、解码器
                        .addLast("decoder", adapter.getDecoder())
                        .addLast("encoder", adapter.getEncoder())
                        .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                        //业务处理器
                        .addLast("handler", nettyClientHandler);
                String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
                if(socksProxyHost != null) {
                    int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
                    Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
                    ch.pipeline().addFirst(socks5ProxyHandler);
                }
            }
        });
    }

然后去看NettyCodecAdapter 类最后进入ExchangeCodec类的encodeRequest方法,如下:

 @Override
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        if (msg instanceof Request) {
            //请求编码
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            //响应编码
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            super.encode(channel, buffer, msg);
        }
    }

 protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
    	//获取序列化实现方式
        Serialization serialization = getSerialization(channel);
        // header.  16字节
        byte[] header = new byte[HEADER_LENGTH];
        // set magic number.  存储2个字节魔法数
        Bytes.short2bytes(MAGIC, header);

        // set request and serialization flag.
        //第三个字节标识序列化id、请求信息等,对应16-23位
        header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

        if (req.isTwoWay()) {
            header[2] |= FLAG_TWOWAY;
        }
        if (req.isEvent()) {
            header[2] |= FLAG_EVENT;
        }

        // set request id.
        //请求ID
        Bytes.long2bytes(req.getId(), header, 4);

        // encode request data.
        int savedWriteIndex = buffer.writerIndex();
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        //对数据进行序列化操作
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        //序列化数据
        if (req.isEvent()) {
            encodeEventData(channel, out, req.getData());
        } else {
            encodeRequestData(channel, out, req.getData(), req.getVersion());
        }
        out.flushBuffer();
        if (out instanceof Cleanable) {
            ((Cleanable) out).cleanup();
        }
        bos.flush();
        bos.close();
        int len = bos.writtenBytes();
        //检查请求数据实体长度
        checkPayload(channel, len);
        //数据长度,占4个字节
        Bytes.int2bytes(len, header, 12);

        // write
        buffer.writerIndex(savedWriteIndex);
        buffer.writeBytes(header); // write header.
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    }

是的,就是Serialization接口,默认是Hessian2Serialization序列化接口。
在这里插入图片描述
Dubbo序列化支持java、compactedjava、nativejava、fastjson、dubbo、fst、hessian2、kryo,
protostuff其中默认hessian2。其中java、compactedjava、nativejava属于原生java的序列化。

这些序列化方式的性能多数都显著优于 hessian2 (甚至包括尚未成熟的dubbo序列化)。所以我们可以为 dubbo 引入 Kryo 和 FST 这两种高效 Java 来优化 dubbo 的序列化。
使用Kryo和FST非常简单,只需要在dubbo RPC的XML配置中添加一个属性即可:

<dubbo:protocol name="dubbo" serialization="kryo"/>

2.3 网络通信深入解析

2.3.1 dubbo中数据格式

解决socket中数据粘包拆包问题,一般有三种方式

定长的协议是指协议内容的长度是固定的,比如协议byte长度是50,当从网络上读取50个 byte后,就进行decode解码操作。定长协议在读取或者写入时,效率比较高,因为数据缓存的大小基本都确定了,就好比数组一样,缺陷就是适应性不足,以RPC场景为例,很难估计出定长的长度是多少。

相比定长协议,如果能够定义一个特殊字符作为每个协议单元结束的标示,就能够以变长的方式进行通信,从而在数据传输和高效之间取得平衡,比如用特殊字符 \n 。特殊结束符方式的问题是过于简单的思考了协议传输的过程,对于一个协议单元必须要全部读入才能够进行处理,除此之外必须要防止用户传输的数据不能同结束符相同,否则就会出现紊乱。

这种一般是自定义协议,会以定长加不定长的部分组成,其中定长的部分需要描述不定长的内容长度。
dubbo就是使用这种形式的数据传输格式

Dubbo 框架定义了私有的RPC协议,其中请求和响应协议的具体内容我们使用表格来展示。
在这里插入图片描述
Dubbo 数据包分为消息头和消息体,消息头用于存储一些元信息,比如魔数(Magic),数据包类型
(Request/Response),消息体长度(Data Length)等。消息体中用于存储具体的调用消息,比如
方法名称,参数列表等。下面简单列举一下消息头的内容。
在这里插入图片描述

2.3.2 消费方发送请求

(1)发送请求
为了便于大家阅读代码,这里以 DemoService 为例,将 sayHello 方法的整个调用路径贴出来。

proxy0#sayHello(String) 
—> InvokerInvocationHandler#invoke(Object, Method, Object[]) 
—> MockClusterInvoker#invoke(Invocation) 
—> AbstractClusterInvoker#invoke(Invocation) 
—> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, 
LoadBalance)
—> Filter#invoke(Invoker, Invocation) // 包含多个 Filter 调用 
—> ListenerInvokerWrapper#invoke(Invocation) 
—> AbstractInvoker#invoke(Invocation) 
—> DubboInvoker#doInvoke(Invocation) 
—> ReferenceCountExchangeClient#request(Object, int)—> HeaderExchangeClient#request(Object, int) 
—> HeaderExchangeChannel#request(Object, int) 
—> AbstractPeer#send(Object) 
—> AbstractClient#send(Object, boolean) 
—> NettyChannel#send(Object, boolean) 
—> NioClientSocketChannel#write(Object)

dubbo消费方,自动生成代码对象如下

public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
	// 方法数组
	public static Method[] methods;
	private InvocationHandler handler;

	public proxy0(InvocationHandler invocationHandler) {
		this.handler = invocationHandler;
	}

	public proxy0() {
	}

	@Override
	public String sayHello(String string) {
		// 将参数存储到 Object 数组中
		Object[] arrobject = new Object[]{string};
		// 调用 InvocationHandler 实现类的 invoke 方法得到调用结果
		try {
			Object object = this.handler.invoke(this, methods[0], arrobject);
			return (String) object;
		} catch (Throwable throwable) {
			throw new RuntimeException(throwable);
		}
		// 返回调用结果
	}

InvokerInvocationHandler 中的 invoker 成员变量类型为 MockClusterInvoker,MockClusterInvoker内部封装了服务降级逻辑。下面简单看一下:

 @Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
        // 获取 mock 配置值
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || value.equalsIgnoreCase("false")) {
        // 无 mock 逻辑,直接调用其他 Invoker 对象的 invoke 方法, 
        // 比如 FailoverClusterInvoker
            //no mock
            // force:xxx 直接执行 mock 逻辑,不发起远程调用
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            if (logger.isWarnEnabled()) {
                logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
            }
            //force:direct mock
            result = doMockInvoke(invocation, null);
        } else {
            //fail-mock
            // fail:xxx 表示消费方对调用服务失败后,再执行 mock 逻辑,不抛出异常
            try {
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
            // 调用失败,执行 mock 逻辑
                if (e.isBiz()) {
                    throw e;
                }

                if (logger.isWarnEnabled()) {
                    logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                }
                // 调用失败,执行 mock 逻辑
                result = doMockInvoke(invocation, e);
            }
        }
        return result;
    }

分析 DubboInvoker。

 @Override
    public Result invoke(Invocation inv) throws RpcException {
        // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
        if (destroyed.get()) {
            logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                    + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
        }
        // 设置 Invoker
        RpcInvocation invocation = (RpcInvocation) inv;
        invocation.setInvoker(this);
        if (CollectionUtils.isNotEmptyMap(attachment)) {
        // 设置 attachment
            invocation.addAttachmentsIfAbsent(attachment);
        }
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
            /**
             * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
             * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
             * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
             * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
             */
             // 添加 contextAttachments 到 RpcInvocation#attachment 变量中
            invocation.addAttachments(contextAttachments);
        }

        invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        try {
            return doInvoke(invocation);
        } catch (InvocationTargetException e) { // biz exception
            Throwable te = e.getTargetException();
            if (te == null) {
                return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            } else {
                if (te instanceof RpcException) {
                    ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                }
                return AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
            }
        } catch (RpcException e) {
            if (e.isBiz()) {
                return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            } else {
                throw e;
            }
        } catch (Throwable e) {
            return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        }
    }

上面的代码来自 AbstractInvoker 类,其中大部分代码用于添加信息到 RpcInvocation#attachment 变量中,添加完毕后,调用 doInvoke 执行后续的调用。doInvoke 是一个抽象方法,需要由子类实现,下面到 DubboInvoker 中看一下。

 /***
     * 发起远程请求
     */
    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        //将目标方法以及版本号作为参数放入到Invocation中
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);
        //获得客户端连接
        ExchangeClient currentClient;  //初始化invoker的时候,构建的一个远程通信连接
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
        //通过取模获得其中一个连接
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
        //表示当前的方法是否存在返回值
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
            //isOneway 为 true,表示“单向”通信
            if (isOneway) { //异步无返回值 boolean isSent = getUrl()
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {  //存在返回值
            //是否采用异步
                AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
                //设置数据监听器(表示此对象赋值之后,监听执行的业务逻辑)
                //此时对asyncRpcResult赋值,asyncRpcResult是Future对象可以通过它的get方法获取返回结果
                CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
                //对asyncRpcResult赋值后,它的get方法能获取值(AsyncRpcResult ->return this.get().recreate();)
                responseFuture.whenComplete((obj, t) -> {
                    //设置对应的值
                    if (t != null) {
                        asyncRpcResult.completeExceptionally(t);
                    } else {
                    	//对asyncRpcResult内容赋值
                        asyncRpcResult.complete((AppResponse) obj);
                    }
                });
                RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
                return asyncRpcResult;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

最终进入到HeaderExchangeChannel#request方法,拼装Request并将请求发送出去

 @Override
    public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        // 创建请求对象
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
        try {
        //NettyClient
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

(2)请求编码
在netty启动时,我们设置了编解码器,其中通过ExchangeCodec完成编解码工作如下

/**
 * ExchangeCodec.
 * 编解码器
 */
public class ExchangeCodec extends TelnetCodec {

    // header length.
    protected static final int HEADER_LENGTH = 16;
    // magic header.
    protected static final short MAGIC = (short) 0xdabb;
    protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
    protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
    // message flag.
    protected static final byte FLAG_REQUEST = (byte) 0x80;
    protected static final byte FLAG_TWOWAY = (byte) 0x40;
    protected static final byte FLAG_EVENT = (byte) 0x20;
    protected static final int SERIALIZATION_MASK = 0x1f;
    private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);



   @Override
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        if (msg instanceof Request) {
            //请求编码
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            //响应编码
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            super.encode(channel, buffer, msg);
        }
    }

 /***
     * 请求编码
     * @param channel
     * @param buffer
     * @param req
     * @throws IOException
     */
    protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
    	//获取序列化实现方式
        Serialization serialization = getSerialization(channel);
        // header.  16字节
        byte[] header = new byte[HEADER_LENGTH];
        // set magic number.  存储2个字节魔法数
        Bytes.short2bytes(MAGIC, header);

        // set request and serialization flag.
        //第三个字节标识序列化id、请求信息等,对应16-23位
        header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

        if (req.isTwoWay()) {
            header[2] |= FLAG_TWOWAY;
        }
        if (req.isEvent()) {
            header[2] |= FLAG_EVENT;
        }

        // set request id.
        //请求ID   8个字节,从第4个字节开始设置
        Bytes.long2bytes(req.getId(), header, 4);

        // encode request data.
        // 获取 buffer 当前的写位置
        int savedWriteIndex = buffer.writerIndex();
        // 更新 writerIndex,为消息头预留 16 个字节的空间
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        // 创建序列化器,比如 Hessian2ObjectOutput
        //对数据进行序列化操作
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        //序列化数据
        if (req.isEvent()) {
        // 对事件数据进行序列化操作
            encodeEventData(channel, out, req.getData());
        } else {
        // 对请求数据进行序列化操作
            encodeRequestData(channel, out, req.getData(), req.getVersion());
        }
        out.flushBuffer();
        if (out instanceof Cleanable) {
            ((Cleanable) out).cleanup();
        }
        bos.flush();
        bos.close();
        // 获取写入的字节数,也就是消息体长度
        int len = bos.writtenBytes();
        //检查请求数据实体长度
        checkPayload(channel, len);
        //数据长度,占4个字节
        // 将消息体长度写入到消息头中
        Bytes.int2bytes(len, header, 12);

        // write
        // 将 buffer 指针移动到 savedWriteIndex,为写消息头做准备
        buffer.writerIndex(savedWriteIndex);
        // 从 savedWriteIndex 下标处写入消息头
        buffer.writeBytes(header); // write header.
        // 设置新的 writerIndex,writerIndex = 原写下标 + 消息头长度 + 消息体长度
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    }

以上就是请求对象的编码过程,该过程首先会通过位运算将消息头写入到 header 数组中。然后对Request 对象的 data 字段执行序列化操作,序列化后的数据最终会存储到 ChannelBuffer 中。序列化操作执行完后,可得到数据序列化后的长度 len,紧接着将 len 写入到 header 指定位置处。最后再将消息头字节数组 header 写入到 ChannelBuffer 中,整个编码过程就结束了。本节的最后,我们再来看一下 Request 对象的 data 字段序列化过程,也就是 encodeRequestData 方法的逻辑,如下:

   @Override
    protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
        RpcInvocation inv = (RpcInvocation) data;
        // 依次序列化 dubbo version、path、version
        out.writeUTF(version);
        out.writeUTF(inv.getAttachment(PATH_KEY));
        out.writeUTF(inv.getAttachment(VERSION_KEY));
        // 序列化调用方法名
        out.writeUTF(inv.getMethodName());
        // 将参数类型转换为字符串,并进行序列化
        out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
        Object[] args = inv.getArguments();
        if (args != null) {
            for (int i = 0; i < args.length; i++) {
            // 对运行时参数进行序列化
                out.writeObject(encodeInvocationArgument(channel, inv, i));
            }
        }
        // 序列化 attachments
        out.writeObject(inv.getAttachments());
    }

至此,关于服务消费方发送请求的过程就分析完了,接下来我们来看一下服务提供方是如何接收请求的。

2.3.3 提供方接收请求

(1) 请求解码
这里直接分析请求数据的解码逻辑,忽略中间过程,如下

 /***
     * 解码
     * @param channel
     * @param buffer
     * @return
     * @throws IOException
     */
    @Override
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        int readable = buffer.readableBytes();
        // 创建消息头字节数组
        byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
        // 读取消息头数据
        buffer.readBytes(header);
        // 调用重载方法进行后续解码工作
        return decode(channel, buffer, readable, header);
    }
    @Override
    protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
        // 魔法数校验
        if (readable > 0 && header[0] != MAGIC_HIGH
                || readable > 1 && header[1] != MAGIC_LOW) {
            int length = header.length;
            if (header.length < readable) {
                header = Bytes.copyOf(header, readable);
                buffer.readBytes(header, length, readable - length);
            }
            for (int i = 1; i < header.length - 1; i++) {
                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                    buffer.readerIndex(buffer.readerIndex() - header.length + i);
                    header = Bytes.copyOf(header, i);
                    break;
                }
            }
            // 通过 telnet 命令行发送的数据包不包含消息头,所以这里 
            // 调用 TelnetCodec 的 decode 方法对数据包进行解码
            return super.decode(channel, buffer, readable, header);
        }
        // check length.
        // 检测可读数据量是否少于消息头长度,若小于则立即返回 DecodeResult.NEED_MORE_INPUT
        if (readable < HEADER_LENGTH) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // 获取数据长度
        int len = Bytes.bytes2int(header, 12);
        // 检测消息体长度是否超出限制,超出则抛出异常
        checkPayload(channel, len);

        int tt = len + HEADER_LENGTH;
        // 检测可读的字节数是否小于实际的字节数
        if (readable < tt) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // limit input stream.
        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

        try {
            // 继续进行解码工作  
            return decodeBody(channel, is, header);
        } finally {
            if (is.available() > 0) {
                try {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Skip input stream " + is.available());
                    }
                    StreamUtils.skipUnusedStream(is);
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }

上面方法通过检测消息头中的魔数是否与规定的魔数相等,提前拦截掉非常规数据包,比如通过 telnet命令行发出的数据包。接着再对消息体长度,以及可读字节数进行检测。最后调用 decodeBody 方法进行后续的解码工作,ExchangeCodec 中实现了 decodeBody 方法,但因其子类 DubboCodec 覆写了该方法,所以在运行时 DubboCodec 中的 decodeBody 方法会被调用。下面我们来看一下该方法的代码

 @Override
    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        // 获取消息头中的第三个字节,并通过逻辑与运算得到序列化器编号
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        // get request id.
        // 获取请求编号
        long id = Bytes.bytes2long(header, 4);
        // 检测消息类型,若下面的条件成立,表明消息类型为 Response
        // 通过逻辑与运算得到调用类型,0 - Response,1 - Request
        if ((flag & FLAG_REQUEST) == 0) {
            // decode response.
            // 创建 Response 对象
            Response res = new Response(id);
            // 检测事件标志位
            if ((flag & FLAG_EVENT) != 0) {
                // 设置心跳事件
                res.setEvent(true);
            }
            // get status.
            // 获取响应状态
            byte status = header[3];
            // 设置响应状态
            res.setStatus(status);
            try {
                // 如果响应状态为 OK,表明调用过程正常
                if (status == Response.OK) {
                    Object data;
                    if (res.isHeartbeat()) {
                        // 反序列化心跳数据,已废弃
                        ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                        data = decodeHeartbeatData(channel, in);
                    } else if (res.isEvent()) {
                        // 反序列化事件数据
                        ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                        data = decodeEventData(channel, in);
                    } else {
                        DecodeableRpcResult result;
                        // 根据 url 参数决定是否在 IO 线程上执行解码逻辑
                        if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
                            // 创建 DecodeableRpcResult 对象
                            result = new DecodeableRpcResult(channel, res, is,
                                    (Invocation) getRequestData(id), proto);
                            result.decode();
                        } else {
                            result = new DecodeableRpcResult(channel, res,
                                    new UnsafeByteArrayInputStream(readMessageData(is)),
                                    (Invocation) getRequestData(id), proto);
                        }
                        data = result;
                    }
                    res.setResult(data);
                } else {
                    ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                    res.setErrorMessage(in.readUTF());
                }
            } catch (Throwable t) {
                if (log.isWarnEnabled()) {
                    log.warn("Decode response failed: " + t.getMessage(), t);
                }
                res.setStatus(Response.CLIENT_ERROR);
                res.setErrorMessage(StringUtils.toString(t));
            }
            return res;
        } else {
            //请求解码
            // decode request.
            // 创建 Request 对象
            Request req = new Request(id);
            req.setVersion(Version.getProtocolVersion());
            // 通过逻辑与运算得到通信方式,并设置到 Request 对象中
            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
            // 通过位运算检测数据包是否为事件类型
            if ((flag & FLAG_EVENT) != 0) {
            // 设置心跳事件到 Request 对象中
                req.setEvent(true);
            }
            try {
                Object data;
                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                if (req.isHeartbeat()) {
                    // 对心跳包进行解码,该方法已被标注为废弃
                    data = decodeHeartbeatData(channel, in);
                } else if (req.isEvent()) {
                    // 对事件数据进行解码
                    data = decodeEventData(channel, in);
                } else {
                    DecodeableRpcInvocation inv;
                    // 根据 url 参数判断是否在 IO 线程上对消息体进行解码
                    if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
                        // 在当前线程,也就是 IO 线程上进行后续的解码工作。此工作完成后,可将
                        // 调用方法名、attachment、以及调用参数解析出来
                        inv = new DecodeableRpcInvocation(channel, req, is, proto);
                        inv.decode();
                    } else {
                        // 仅创建 DecodeableRpcInvocation 对象,但不在当前线程上执行解码逻辑
                        inv = new DecodeableRpcInvocation(channel, req,
                                new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                    }
                    data = inv;
                }
                // 设置 data 到 Request 对象中
                req.setData(data);
            } catch (Throwable t) {
            // 若解码过程中出现异常,则将 broken 字段设为 true,
            // 并将异常对象设置到 Reqeust 对象中
                if (log.isWarnEnabled()) {
                    log.warn("Decode request failed: " + t.getMessage(), t);
                }
                // bad request
                req.setBroken(true);
                req.setData(t);
            }

            return req;
        }
    }

如上,decodeBody 对部分字段进行了解码,并将解码得到的字段封装到 Request 中。随后会调用DecodeableRpcInvocation 的 decode 方法进行后续的解码工作。此工作完成后,可将调用方法名、attachment、以及调用参数解析出来。

(2)调用服务
解码器将数据包解析成 Request 对象后,NettyHandler 的 messageReceived 方法紧接着会收到这个
对象,并将这个对象继续向下传递。整个调用栈如下:

NettyServerHandler#channelRead(ChannelHandlerContext, MessageEvent) 
—> AbstractPeer#received(Channel, Object) 
—> MultiMessageHandler#received(Channel, Object) 
—> HeartbeatHandler#received(Channel, Object) 
—> AllChannelHandler#received(Channel, Object) 
—> ExecutorService#execute(Runnable) // 由线程池执行后续的调用逻辑 

这里我们直接分析调用栈中的分析第一个和最后一个调用方法逻辑。如下:
考虑到篇幅,以及很多中间调用的逻辑并非十分重要,所以这里就不对调用栈中的每个方法都进行分析
了。这里我们直接分析最后一个调用方法逻辑。如下:

 @Override
    public void run() {
    // 检测通道状态,对于请求或响应消息,此时 state = RECEIVED
        if (state == ChannelState.RECEIVED) {
            try {
            // 将 channel 和 message 传给 ChannelHandler 对象,进行后续的调用
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is " + message, e);
            }
        } else {
        // 其他消息类型通过 switch 进行处理
            switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case DISCONNECTED:
                try {
                    handler.disconnected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case SENT:
                try {
                    handler.sent(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
                break;
            case CAUGHT:
                try {
                    handler.caught(channel, exception);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is: " + message + ", exception is " + exception, e);
                }
                break;
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
            }
        }

    }

如上,请求和响应消息出现频率明显比其他类型消息高,所以这里对该类型的消息进行了针对性判断。ChannelEventRunnable 仅是一个中转站,它的 run 方法中并不包含具体的调用逻辑,仅用于将参数传给其他 ChannelHandler 对象进行处理,该对象类型为 DecodeHandler


public class DecodeHandler extends AbstractChannelHandlerDelegate {

    private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class);

    public DecodeHandler(ChannelHandler handler) {
        super(handler);
    }

 @Override
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
        // 对 Decodeable 接口实现类对象进行解码
            decode(message);
        }

        if (message instanceof Request) {
        // 对 Request 的 data 字段进行解码
            decode(((Request) message).getData());
        }

        if (message instanceof Response) {
        // 对 Request 的 result 字段进行解码
            decode(((Response) message).getResult());
        }
        // 执行后续逻辑
        handler.received(channel, message);
    }


 private void decode(Object message) {
 // Decodeable 接口目前有两个实现类,
 // 分别为 DecodeableRpcInvocation 和 DecodeableRpcResult
        if (message instanceof Decodeable) {
            try {
            // 执行解码逻辑
                ((Decodeable) message).decode();
                if (log.isDebugEnabled()) {
                    log.debug("Decode decodeable message " + message.getClass().getName());
                }
            } catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
                }
            } // ~ end of catch
        } // ~ end of if
    } // ~ end of method decode

DecodeHandler 主要是包含了一些解码逻辑,完全解码后的 Request 对象会继续向后传递

public class DubboProtocol extends AbstractProtocol {


 private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        @Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

            if (!(message instanceof Invocation)) {
            // 获取 Invoker 实例
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }

            Invocation inv = (Invocation) message;
            // 通过 Invoker 调用具体的服务
            Invoker<?> invoker = getInvoker(channel, inv);
            // need to consider backward-compatibility if it's a callback
            if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                String methodsStr = invoker.getUrl().getParameters().get("methods");
                boolean hasMethod = false;
                if (methodsStr == null || !methodsStr.contains(",")) {
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
                    String[] methods = methodsStr.split(",");
                    for (String method : methods) {
                        if (inv.getMethodName().equals(method)) {
                            hasMethod = true;
                            break;
                        }
                    }
                }
                if (!hasMethod) {
                    logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                            + " not found in callback service interface ,invoke will be ignored."
                            + " please update the api interface. url is:"
                            + invoker.getUrl()) + " ,invocation is :" + inv);
                    return null;
                }
            }
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            Result result = invoker.invoke(inv);
            return result.completionFuture().thenApply(Function.identity());
        }

服务全部暴露完成之后保存到exporterMap中。这里就是通过serviceKey获取exporter之后获取Invoker,并通过 Invoker 的 invoke 方法调用服务逻辑

public abstract class AbstractProxyInvoker<T> implements Invoker<T> {

 public AbstractProxyInvoker(T proxy, Class<T> type, URL url) {
        if (proxy == null) {
            throw new IllegalArgumentException("proxy == null");
        }
        if (type == null) {
            throw new IllegalArgumentException("interface == null");
        }
        if (!type.isInstance(proxy)) {
            throw new IllegalArgumentException(proxy.getClass().getName() + " not implement interface " + type);
        }
        this.proxy = proxy;
        this.type = type;
        this.url = url;
    }

 protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;

如上,doInvoke 是一个抽象方法,这个需要由具体的 Invoker 实例实现。Invoker 实例是在运行时通过JavassistProxyFactory 创建的,创建逻辑如下:

  @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        // 创建匿名类对象
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                   // 调用 invokeMethod 方法进行后续的调用
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

Wrapper 是一个抽象类,其中 invokeMethod 是一个抽象方法。Dubbo 会在运行时通过 Javassist 框架为 Wrapper 生成实现类,并实现 invokeMethod 方法,该方法最终会根据调用信息调用具体的服务。以 DemoServiceImpl 为例,Javassist 为其生成的代理类如下。

/** Wrapper0 是在运行时生成的,大家可使用 Arthas 进行反编译 */ 
public class Wrapper0 extends Wrapper implements ClassGenerator.DC { 
public static String[] pns; 
public static Map pts; 
public static String[] mns; 
public static String[] dmns; 
public static Class[] mts0; 
// 省略其他方法 
public Object invokeMethod(Object object, String string, Class[] arrclass, 
Object[] arrobject) throws InvocationTargetException { 
DemoService demoService; 
try {
// 类型转换 
demoService = (DemoService)object; 
}
catch (Throwable throwable) { 
throw new IllegalArgumentException(throwable); 
}
try {
// 根据方法名调用指定的方法 
if ("sayHello".equals(string) && arrclass.length == 1) { 
return demoService.sayHello((String)arrobject[0]); 
} 
}
catch (Throwable throwable) { 
throw new InvocationTargetException(throwable); 
}
throw new NoSuchMethodException(new StringBuffer().append("Not found 
method \"").append(string).append("\" in class 
com.alibaba.dubbo.demo.DemoService.").toString()); 
} 
}

到这里,整个服务调用过程就分析完了。最后把调用过程贴出来,如下:

ChannelEventRunnable#run() 
—> DecodeHandler#received(Channel, Object) 
—> HeaderExchangeHandler#received(Channel, Object) 
—> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request) 
—> DubboProtocol.requestHandler#reply(ExchangeChannel, Object) 
—> Filter#invoke(Invoker, Invocation) 
—> AbstractProxyInvoker#invoke(Invocation) 
—> Wrapper0#invokeMethod(Object, String, Class[], Object[]) 
—> DemoServiceImpl#sayHello(String) 

2.3.4 提供方返回调用结果

服务提供方调用指定服务后,会将调用结果封装到 Response 对象中,并将该对象返回给服务消费方。服务提供方也是通过 NettyChannel 的 send 方法将 Response 对象返回,这里就不在重复分析了。本节我们仅需关注 Response 对象的编码过程即可

protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
        int savedWriteIndex = buffer.writerIndex();
        try {
            Serialization serialization = getSerialization(channel);
            // 创建消息头字节数组
            byte[] header = new byte[HEADER_LENGTH];
            // 设置魔数
            Bytes.short2bytes(MAGIC, header);
            // 设置序列化器编号
            header[2] = serialization.getContentTypeId();
            if (res.isHeartbeat()) {
                header[2] |= FLAG_EVENT;
            }
            // 获取响应状态
            byte status = res.getStatus();
            // 设置响应状态
            header[3] = status;
            // 设置请求编号
            Bytes.long2bytes(res.getId(), header, 4);

            // 更新 writerIndex,为消息头预留 16 个字节的空间
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
            ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
            // encode response data or error message.
            if (status == Response.OK) {
                if (res.isHeartbeat()) {
                    // 对心跳响应结果进行序列化,已废弃
                    encodeHeartbeatData(channel, out, res.getResult());
                } else {
                    // 对调用结果进行序列化
                    encodeResponseData(channel, out, res.getResult(), res.getVersion());
                }
            } else {
                // 对错误信息进行序列化
                out.writeUTF(res.getErrorMessage());
            }
            out.flushBuffer();
            if (out instanceof Cleanable) {
                ((Cleanable) out).cleanup();
            }
            bos.flush();
            bos.close();

            // 获取写入的字节数,也就是消息体长度
            int len = bos.writtenBytes();
            checkPayload(channel, len);
            // 将消息体长度写入到消息头中
            Bytes.int2bytes(len, header, 12);
            // write
            buffer.writerIndex(savedWriteIndex);
            buffer.writeBytes(header); // write header.
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
        } catch (Throwable t) {
            // clear buffer
            buffer.writerIndex(savedWriteIndex);
            // send error message to Consumer, otherwise, Consumer will wait till timeout.
            if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
                Response r = new Response(res.getId(), res.getVersion());
                r.setStatus(Response.BAD_RESPONSE);

                if (t instanceof ExceedPayloadLimitException) {
                    logger.warn(t.getMessage(), t);
                    try {
                        r.setErrorMessage(t.getMessage());
                        channel.send(r);
                        return;
                    } catch (RemotingException e) {
                        logger.warn("Failed to send bad_response info back: " + t.getMessage() + ", cause: " + e.getMessage(), e);
                    }
                } else {
                    // FIXME log error message in Codec and handle in caught() of IoHanndler?
                    logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);
                    try {
                        r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));
                        channel.send(r);
                        return;
                    } catch (RemotingException e) {
                        logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e);
                    }
                }
            }

            // Rethrow exception
            if (t instanceof IOException) {
                throw (IOException) t;
            } else if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else if (t instanceof Error) {
                throw (Error) t;
            } else {
                throw new RuntimeException(t.getMessage(), t);
            }
        }
    }


@Override
    protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
        Result result = (Result) data;
        // currently, the version value in Response records the version of Request
        // 检测当前协议版本是否支持带有 attachment 集合的 Response 对象
        boolean attach = Version.isSupportResponseAttachment(version);
        Throwable th = result.getException();
        if (th == null) {
            Object ret = result.getValue();
            if (ret == null) {
                // 序列化响应类型
                out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE);
            } else {
                // 调用结果非空
                // 序列化响应类型
                out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE);
                // 序列化调用结果
                out.writeObject(ret);
            }
        } else {
            // 序列化响应类型
            out.writeByte(attach ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION);
            // 序列化异常对象
            out.writeObject(th);
        }

        if (attach) {
            // returns current version of Response to consumer side.
            result.getAttachments().put(DUBBO_VERSION_KEY, Version.getProtocolVersion());
            out.writeObject(result.getAttachments());
        }
    }

以上就是 Response 对象编码的过程,和前面分析的 Request 对象编码过程很相似。如果大家能看Request 对象的编码逻辑,那么这里的 Response 对象的编码逻辑也不难理解,就不多说了。接下来我们再来分析双向通信的最后一环 —— 服务消费方接收调用结果。

2.3.5 消费方接收调用结果

服务消费方在收到响应数据后,首先要做的事情是对响应数据进行解码,得到 Response 对象。然后再将该对象传递给下一个入站处理器,这个入站处理器就是 NettyHandler。接下来 NettyHandler 会将这个对象继续向下传递,最后 AllChannelHandler 的 received 方法会收到这个对象,并将这个对象派发到线程池中。这个过程和服务提供方接收请求的过程是一样的,因此这里就不重复分析了
(1)响应数据解码
响应数据解码逻辑主要的逻辑封装在 DubboCodec 中,我们直接分析这个类的代码。如下:

@Override
    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        //获取消息类型
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        // get request id.
        // 获取请求编号
        long id = Bytes.bytes2long(header, 4);
        // 检测消息类型,若下面的条件成立,表明消息类型为 Response
        if ((flag & FLAG_REQUEST) == 0) {
            // decode response.
            // 创建 Response 对象
            Response res = new Response(id);
            // 检测事件标志位
            if ((flag & FLAG_EVENT) != 0) {
                // 设置心跳事件
                res.setEvent(true);
            }
            // get status.
            // 获取响应状态
            byte status = header[3];
            // 设置响应状态
            res.setStatus(status);
            try {
                // 如果响应状态为 OK,表明调用过程正常
                if (status == Response.OK) {
                    Object data;
                    if (res.isHeartbeat()) {
                        // 反序列化心跳数据,已废弃
                        ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                        data = decodeHeartbeatData(channel, in);
                    } else if (res.isEvent()) {
                        // 反序列化事件数据
                        ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                        data = decodeEventData(channel, in);
                    } else {
                        DecodeableRpcResult result;
                        // 根据 url 参数决定是否在 IO 线程上执行解码逻辑
                        if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
                            // 创建 DecodeableRpcResult 对象
                            result = new DecodeableRpcResult(channel, res, is,
                                    (Invocation) getRequestData(id), proto);
                            result.decode();
                        } else {
                            result = new DecodeableRpcResult(channel, res,
                                    new UnsafeByteArrayInputStream(readMessageData(is)),
                                    (Invocation) getRequestData(id), proto);
                        }
                        data = result;
                    }
                    res.setResult(data);
                } else {
                    ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                    res.setErrorMessage(in.readUTF());
                }
            } catch (Throwable t) {
                if (log.isWarnEnabled()) {
                    log.warn("Decode response failed: " + t.getMessage(), t);
                }
                res.setStatus(Response.CLIENT_ERROR);
                res.setErrorMessage(StringUtils.toString(t));
            }
            return res;
        } else {
            //请求解码
            // decode request.
            Request req = new Request(id);
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
            if ((flag & FLAG_EVENT) != 0) {
                req.setEvent(true);
            }
            try {
                Object data;
                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                if (req.isHeartbeat()) {
                    // 对心跳包进行解码,该方法已被标注为废弃
                    data = decodeHeartbeatData(channel, in);
                } else if (req.isEvent()) {
                    // 对事件数据进行解码
                    data = decodeEventData(channel, in);
                } else {
                    DecodeableRpcInvocation inv;
                    // 根据 url 参数判断是否在 IO 线程上对消息体进行解码
                    if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
                        // 在当前线程,也就是 IO 线程上进行后续的解码工作。此工作完成后,可将
                        // 调用方法名、attachment、以及调用参数解析出来
                        inv = new DecodeableRpcInvocation(channel, req, is, proto);
                        inv.decode();
                    } else {
                        // 仅创建 DecodeableRpcInvocation 对象,但不在当前线程上执行解码逻辑
                        inv = new DecodeableRpcInvocation(channel, req,
                                new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                    }
                    data = inv;
                }
                req.setData(data);
            } catch (Throwable t) {
                if (log.isWarnEnabled()) {
                    log.warn("Decode request failed: " + t.getMessage(), t);
                }
                // bad request
                req.setBroken(true);
                req.setData(t);
            }

            return req;
        }
    }

以上就是响应数据的解码过程,上面逻辑看起来是不是似曾相识。对的,我们在前面章节分析过DubboCodec 的 decodeBody 方法中关于请求数据的解码过程,该过程和响应数据的解码过程很相似。下面,我们继续分析调用结果的反序列化过程

public class DecodeableRpcResult extends AppResponse implements Codec, Decodeable {

    private static final Logger log = LoggerFactory.getLogger(DecodeableRpcResult.class);

    private Channel channel;

    private byte serializationType;

    private InputStream inputStream;

    private Response response;

    private Invocation invocation;

    private volatile boolean hasDecoded;

    public DecodeableRpcResult(Channel channel, Response response, InputStream is, Invocation invocation, byte id) {
        Assert.notNull(channel, "channel == null");
        Assert.notNull(response, "response == null");
        Assert.notNull(is, "inputStream == null");
        this.channel = channel;
        this.response = response;
        this.inputStream = is;
        this.invocation = invocation;
        this.serializationType = id;
    }

    @Override
    public void encode(Channel channel, OutputStream output, Object message) throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                .deserialize(channel.getUrl(), input);
		// 反序列化响应类型
        byte flag = in.readByte();
        switch (flag) {
            case DubboCodec.RESPONSE_NULL_VALUE:
                break;
            case DubboCodec.RESPONSE_VALUE:
                handleValue(in);
                break;
            case DubboCodec.RESPONSE_WITH_EXCEPTION:
                handleException(in);
                break;
	            // 返回值为空,且携带了 attachments 集合
            case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
                handleAttachment(in);
                break;
                //返回值不为空,且携带了 attachments 集合
            case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS:
                handleValue(in);
                handleAttachment(in);
                break;
	        // 异常对象不为空,且携带了 attachments 集合
            case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
                handleException(in);
                handleAttachment(in);
                break;
            default:
                throw new IOException("Unknown result flag, expect '0' '1' '2' '3' '4' '5', but received: " + flag);
        }
        if (in instanceof Cleanable) {
            ((Cleanable) in).cleanup();
        }
        return this;
    }

正常调用下,线程会进入 RESPONSE_VALUE_WITH_ATTACHMENTS 分支中。然后线程会从 invocation变量(大家探索一下 invocation 变量的由来)中获取返回值类型,接着对调用结果进行反序列化,并将序列化后的结果存储起来。最后对 attachments 集合进行反序列化,并存到指定字段中

2.3.6 异步转同步

Dubbo发送数据至服务方后,在通信层面是异步的,通信线程并不会等待结果数据返回。而我们在使用Dubbo进行RPC调用缺省就是同步的,这其中就涉及到了异步转同步的操作。
而在2.7.x版本中,这种自实现的异步转同步操作进行了修改。新的 DefaultFuture 继承了CompletableFuture ,新的 doReceived(Response res) 方法如下:

private void doReceived(Response res) {
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        }
        if (res.getStatus() == Response.OK) {
            this.complete(res.getResult());
        } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
        } else {
            this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
        }
    }

通过 CompletableFuture#complete 方法来设置异步的返回结果,且删除旧的 get() 方法,使用CompletableFuture#get() 方法:

public T get() throws InterruptedException, ExecutionException { 
Object r; 
return reportGet((r = result) == null ? waitingGet(true) : r); 
}

使用 CompletableFuture 完成了异步转同步的操作。

2.3.7 异步多线程数据一致

这里简单说明一下。一般情况下,服务消费方会并发调用多个服务,每个用户线程发送请求后,会调用get 方法进行等待。 一段时间后,服务消费方的线程池会收到多个响应对象。这个时候要考虑一个问题,如何将每个响应对象传递给相应的 Future 对象,不出错。答案是通过调用编号。Future 被创建时,会要求传入一个 Request 对象。此时 DefaultFuture 可从 Request 对象中获取调用编号,并将 <调用编号, DefaultFuture 对象> 映射关系存入到静态 Map 中,即 FUTURES。线程池中的线程在收到Response 对象后,会根据 Response 对象中的调用编号到 FUTURES 集合中取出相应的 DefaultFuture对象,然后再将 Response 对象设置到 DefaultFuture 对象中。这样用户线程即可从 DefaultFuture 对象中获取调用结果了。整个过程大致如下图:
在这里插入图片描述

private DefaultFuture(Channel channel, Request request, int timeout) { 
this.channel = channel; 
this.request = request; 
this.id = request.getId(); 
this.timeout = timeout > 0 ? timeout : 
channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); 
// put into waiting map. 
FUTURES.put(id, this); 
CHANNELS.put(id, channel); 
}

2.3.8 心跳检查

Dubbo采用双向心跳的方式检测Client端与Server端的连通性。
我们再来看看 Dubbo 是如何设计应用层心跳的。Dubbo 的心跳是双向心跳,客户端会给服务端发送心跳,反之,服务端也会向客户端发送心跳。

1. 创建定时器

public class HeaderExchangeClient implements ExchangeClient {

    private final Client client;
    private final ExchangeChannel channel;

    private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(
            new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL);
    private HeartbeatTimerTask heartBeatTimerTask;
    private ReconnectTimerTask reconnectTimerTask;

    public HeaderExchangeClient(Client client, boolean startTimer) {
		Assert.notNull(client, "Client can't be null");
		this.client = client;
		this.channel = new HeaderExchangeChannel(client);

		if (startTimer) {
			URL url = client.getUrl();
			//开启心跳失败之后处理重连,断连的逻辑定时任务
			startReconnectTask(url);
			//开启发送心跳请求定时任务
			startHeartBeatTask(url);
		}
	}

Dubbo 在 HeaderExchangeClient 初始化时开启了两个定时任务
startReconnectTask 主要用于定时发送心跳请求
startHeartBeatTask 主要用于心跳失败之后处理重连,断连的逻辑

2. 发送心跳请求
详细解析下心跳检测定时任务的逻辑 HeartbeatTimerTask#doTask :

  @Override
    protected void doTask(Channel channel) {
        try {
            Long lastRead = lastRead(channel);
            Long lastWrite = lastWrite(channel);
            if ((lastRead != null && now() - lastRead > heartbeat)
                    || (lastWrite != null && now() - lastWrite > heartbeat)) {
                Request req = new Request();
                req.setVersion(Version.getProtocolVersion());
                req.setTwoWay(true);
                req.setEvent(Request.HEARTBEAT_EVENT);
                channel.send(req);
                if (logger.isDebugEnabled()) {
                    logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                            + ", cause: The channel has no data-transmission exceeds a heartbeat period: "
                            + heartbeat + "ms");
                }
            }
        } catch (Throwable t) {
            logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
        }
    }

前面已经介绍过,Dubbo 采取的是双向心跳设计,即服务端会向客户端发送心跳,客户端也会向服务端发送心跳,接收的一方更新 lastRead 字段,发送的一方更新 lastWrite 字段,超过心跳间隙的时间,便发送心跳请求给对端。这里的 lastRead/lastWrite 同样会被同一个通道上的普通调用更新,通过更新这两个字段,实现了只在连接空闲时才会真正发送空闲报文的机制,符合我们一开始科普的做法。

3. 处理重连和断连

  @Override
    protected void doTask(Channel channel) {
        try {
            Long lastRead = lastRead(channel);
            Long now = now();

            // Rely on reconnect timer to reconnect when AbstractClient.doConnect fails to init the connection
            if (!channel.isConnected()) {
                try {
                    logger.info("Initial connection to " + channel);
                    ((Client) channel).reconnect();
                } catch (Exception e) {
                    logger.error("Fail to connect to " + channel, e);
                }
            // check pong at client
            } else if (lastRead != null && now - lastRead > idleTimeout) {
                logger.warn("Reconnect to channel " + channel + ", because heartbeat read idle time out: "
                        + idleTimeout + "ms");
                try {
                    ((Client) channel).reconnect();
                } catch (Exception e) {
                    logger.error(channel + "reconnect failed during idle time.", e);
                }
            }
        } catch (Throwable t) {
            logger.warn("Exception when reconnect to remote channel " + channel.getRemoteAddress(), t);
        }
    }

第二个定时器则负责根据客户端、服务端类型来对连接做不同的处理,当超过设置的心跳总时间之后,客户端选择的是重新连接,服务端则是选择直接断开连接。这样的考虑是合理的,客户端调用是强依赖可用连接的,而服务端可以等待客户端重新建立连接。
Dubbo 对于建立的每一个连接,同时在客户端和服务端开启了 2 个定时器,一个用于定时发送心跳,一个用于定时重连、断连,执行的频率均为各自检测周期的 1/3。定时发送心跳的任务负责在连接空闲时,向对端发送心跳包。定时重连、断连的任务负责检测 lastRead 是否在超时周期内仍未被更新,如果判定为超时,客户端处理的逻辑是重连,服务端则采取断连的措施。

3.流程总结

3.1 消费方创建NettyClient

消费方创建NettyClient
ReferenceBean
getObject()->ReferenceConfig.get()
ReferenceConfig
get()
init()->NettyClient创建
createProxy(map):创建代理
创建Invoker
ProtocolListenerWrapper->RegistryProtocol.refer()
ProtocolListenerWrapper->DubboProtocol.protocolBindingRefer()
创建代理
RegistryProtocol
refer()
获取注册中心信息
doRefer()
订阅服务directory.subscribe()
合并directory获取Invoker->cluster.join(directory)
doRefer()
DubboProtocol
1.protocolBindingRefer()
创建DubboInvoker
new DubboInvoker(serviceType, url, getClients(url), invokers)
2.getClients(url)->创建Netty客户端->initClient(url)
3.initClient(url)->创建Netty客户端
->Exchangers.connect()
->HeaderExchanger.connect()
->Transporters.connect()
->NettyTransporter.connect()
->NettyClient.NettyClient()
->AbstractClient.AbstractClient()
->NettyClient.doOpen()

3.2 消费方代理创建

消费方代理创建
ReferenceConfig.createProxy()->创建代理
->AbstractProxyFactory.getProxy()
->JavassistProxyFactory.getProxy()
InvokerInvocationHandler.invoke():代理流程
invoke():代理流程
invoker.invoke():方法调用->MockClusterInvoker.invoke()

3.3 消费方发送数据

消费方发送数据
InvokerInvocationHandler.invoke()
MockClusterInvoker
invoke()->AbstractClusterInvoker.invoke()
->FailoverClusterInvoker.doInvoke()
->invoker=select()
->doSelect()
->loadbalance.select():负载均衡查找Invoker
->发起远程调用
InvokerWrapper.invoke()
ConsumerContextFilter.invoke()
AbstractInvoker.invoke()
DubboInvoker.doInvoke()
ReferenceCountExchangeClient.request()
AbstractPeer.send()
NettyChannel.send()
NioClientSocketChannel.write()

3.4 消费方编、解码器

消费方编、解码器
NettyClient.doOpen():Netty客户端创建
bootstrap.handler()
编解码器:NettyCodecAdapter adapter = new NettyCodecAdapter()
NettyCodecAdapter.InternalEncoder:编解码器
Codec2 codec->DubboCountCodec->ExchangeCodec
ExchangeCodec-Hessian2
encode():编码
encodeResponse():响应编码
encodeRequest():请求编码
编码结构:16字节
1)前2个字节:存储魔法数头部,数据是否有效校验
2)第3个字节:存储数据包类型(0 - Response, 1 - Request)
3)第4-8字节:设置唯一请求标识符
4)第5个字节:调用方式、事件标识、序列化器编号
5)第6个字节:状态,20 - OK 30 - CLIENT_TIMEOUT…
6)第32 ~ 95位:请求编号,共8字节,运行时生成
7)第96 ~ 127位:消息体长度,运行时计算
decode():解码->DubboCodec.decodeBody()
响应解码:
1):校验魔法数
2):数据长度检查
3):调用decodeBody()解码
1):数据包类型
2):获取请求编号
3):检测消息类型
4):创建 Response 对象、检测事件标志位、获取响应状态、设置响应状态
5):反序列化
6):反序列化获取事件数据decodeEventData()
请求解码:
1:根据ID创建Request
2:CodecSupport.deserialize反序列化
3:设置Request对象的数据

3.5 服务端接收请求

服务端接收请求
NettyServer.doOpen()开启Netty服务
解码器:addLast(“decoder”, adapter.getDecoder())
->NettyCodecAdapter->ExchangeCodec.decode()
addLast(“handler”, nettyServerHandler):
->NettyServerHandler
程序流程:1)解码器解码
2)业务处理器执行业务处理流程
NettyCodecAdapter
调用解码器解码
将解码数据下沉到业务处理器
业务处理
NettyServerHandler
channelRead()读取解码器解码的结果
AbstractPeer.received()
MultiMessageHandler.received()
HeartbeatHandler.received()
AllChannelHandler.received()
线程执行->DecodeHandler.received()
HeaderExchangeHandler.received()
handleRequest():处理请求
ExchangeHandlerAdapter.reply()
DubboProtocol
获取:invoker
通过invoker执行本地调用

3.6 服务端返回消息处理

服务端返回消息处理
ExchangeCodec.encodeResponse():响应结果编码
DubboCodec.encodeResponseData():响应数据序列化

3.7 消费方接收调用结果

消费方接收调用结果
NettyClient创建代码
解码器处理完成后,调用业务组件NettyClientHandler
NettyClientHandler
channelRead()
AbstractPeer.received()
MultiMessageHandler.received()
HeartbeatHandler.received()
AllChannelHandler.received()
线程->ChannelEventRunnable.run()
DecodeHandler.received()
DecodeableRpcResult.decode()

3.8 异步转同步

异步转同步
客户端发起调用:InvokerInvocationHandler.invoke()->AsyncRpcResult
DubboInvoker.doInvoke()
AsyncRpcResult.recreate()->get()阻塞获取值

3.9 异步多线程数据一致

异步多线程数据一致
DubboInvoker.doInvoke()
1:currentClient.request(inv, timeout)
2:HeaderExchangeChannel.request():创建efaultFuture.newFuture(),拥有唯一的一个ID
DefaultFuture:
->响应结果封装Map<Long, DefaultFuture> FUTURES
->Map<Long, Channel> CHANNELS

3.10 核心对象

核心对象
Netty客户端创建:NettyClient
发送数据请求
InvokerInvocationHandler.invoke()发起调用
DubboInvoker.doInvoke()执行远程调用
NettyChannel执行调用
NioClientSocketChannel.write()数据发送

标签:调用,通信协议,invokers,源码,集群,Invoker,invocation,new,channel
来源: https://blog.csdn.net/m0_46690280/article/details/116043950