编程语言
首页 > 编程语言> > Spring Cloud源码分析——Ribbon客户端负载均衡

Spring Cloud源码分析——Ribbon客户端负载均衡

作者:互联网

年前聊了Eureka和Zookeeper的区别,然后微服务架构系列就鸽了三个多月,一直沉迷逛B站,无法自拔。最近公司复工,工作状态慢慢恢复(又是元气满满地划水)。本文从以下3个方面进行分析(参考了翟永超[程序猿DD])的《Spring Cloud微服务实战》

 

  1. LoadBalancerInterceptor拦截器对RestTemplate的请求拦截;
  2. RibbonLoadBalancerClient实际接口实现;
  3. 负载均衡策略

1、LoadBalancerInterceptor源码

 

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

    private LoadBalancerClient loadBalancer;

    private LoadBalancerRequestFactory requestFactory;

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
            LoadBalancerRequestFactory requestFactory) {
        this.loadBalancer = loadBalancer;
        this.requestFactory = requestFactory;
    }

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        // for backwards compatibility
        this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
    }

    @Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
            final ClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null,
                "Request URI does not contain a valid hostname: " + originalUri);
        return this.loadBalancer.execute(serviceName,
                this.requestFactory.createRequest(request, body, execution));
    }

}

可以看出,该拦截器注入了LoadBalancerClient实例,当一个被@LoadBalanced修饰的RestTemplate对象发起Http请求,会被LoadBalancerInterceptor中的intercept函数拦截。该函数会通过getHost()获取Http请求的服务名,恰巧我们使用的RestTemplate对象采用服务名作为Host,接着loadBalancer查找到对应服务名的服务,调用execute函数对该服务发起请求。

2、RibbonLoadBalancerClient实现

 

/**
     * New: Execute a request by selecting server using a 'key'. The hint will have to be
     * the last parameter to not mess with the `execute(serviceId, ServiceInstance,
     * request)` method. This somewhat breaks the fluent coding style when using a lambda
     * to define the LoadBalancerRequest.
     * @param <T> returned request execution result type
     * @param serviceId id of the service to execute the request to
     * @param request to be executed
     * @param hint used to choose appropriate {@link Server} instance
     * @return request execution result
     * @throws IOException executing the request may result in an {@link IOException}
     */
    public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
            throws IOException {
        ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
        Server server = getServer(loadBalancer, hint);
        if (server == null) {
            throw new IllegalStateException("No instances available for " + serviceId);
        }
        RibbonServer ribbonServer = new RibbonServer(serviceId, server,
                isSecure(server, serviceId),
                serverIntrospector(serviceId).getMetadata(server));

        return execute(serviceId, ribbonServer, request);
    }

经过LoadBalancerInterceptor拦截器后,调用LoadBalancerClient的execute函数去发起对应服务的请求。(LoadBalancerClient只是个抽象的负载均衡接口,RibbonLoadBalancerClient则是该接口的具体实现)
execute函数的作用,如官方所说:通过‘key’找到对应的服务并执行请求。
从源码中可以看出,execute函数具体实现首先是定义一个传入serviceId的loadBalancer对象,再getServer获取对应的具体服务,最后通过ribbonServer整合一系列服务信息发起请求。
其中getServer()是关键操作,来看看对应的源码:

 

protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
        if (loadBalancer == null) {
            return null;
        }
        // Use 'default' on a null hint, or just pass it on?
        return loadBalancer.chooseServer(hint != null ? hint : "default");
    }

显然,需要再深入看下loadBalancer 。

 

public interface ILoadBalancer {

    /**
     * Initial list of servers.
     * This API also serves to add additional ones at a later time
     * The same logical server (host:port) could essentially be added multiple times
     * (helpful in cases where you want to give more "weightage" perhaps ..)
     * 
     * @param newServers new servers to add
     */
    public void addServers(List<Server> newServers);
    
    /**
     * Choose a server from load balancer.
     * 
     * @param key An object that the load balancer may use to determine which server to return. null if 
     *         the load balancer does not use this parameter.
     * @return server chosen
     */
    public Server chooseServer(Object key);
    
    /**
     * To be called by the clients of the load balancer to notify that a Server is down
     * else, the LB will think its still Alive until the next Ping cycle - potentially
     * (assuming that the LB Impl does a ping)
     * 
     * @param server Server to mark as down
     */
    public void markServerDown(Server server);
    
    /**
     * @deprecated 2016-01-20 This method is deprecated in favor of the
     * cleaner {@link #getReachableServers} (equivalent to availableOnly=true)
     * and {@link #getAllServers} API (equivalent to availableOnly=false).
     *
     * Get the current list of servers.
     *
     * @param availableOnly if true, only live and available servers should be returned
     */
    @Deprecated
    public List<Server> getServerList(boolean availableOnly);

    /**
     * @return Only the servers that are up and reachable.
     */
    public List<Server> getReachableServers();

    /**
     * @return All known servers, both reachable and unreachable.
     */
    public List<Server> getAllServers();
}

ILoadBalancer定义了客户端负载均衡器的一系列抽象操作接口,从官方说明看出:

来看看具体实现BaseLoadBalancer,

 

public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
            IPing ping, IPingStrategy pingStrategy) {
    
        logger.debug("LoadBalancer [{}]:  initialized", name);
        
        this.name = name;
        this.ping = ping;
        this.pingStrategy = pingStrategy;
        setRule(rule);
        setupPingTask();
        lbStats = stats;
        init();
    }

默认构造函数ping设为null,rule策略默认设为轮询(RoundRobin)。该构造函数除了基本的赋值之外,主要是setRule(设置负载均衡策略)和setupPingTask(启动ping心跳任务)。

 

void setupPingTask() {
        if (canSkipPing()) {
            return;
        }
        if (lbTimer != null) {
            lbTimer.cancel();
        }
        lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
                true);
        lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
        forceQuickPing();
    }

setupPingTask逻辑主要是定义ShutdownEnabledTimer实例来执行一个10秒间隔的schedule。timer定时器还定义了个PingTask任务

 

class PingTask extends TimerTask {
        public void run() {
            try {
                new Pinger(pingStrategy).runPinger();
            } catch (Exception e) {
                logger.error("LoadBalancer [{}]: Error pinging", name, e);
            }
        }
    }

官方注释中,TimerTask会在自定义的时间间隔内检查服务实例列表中每个服务实例的运行状态。
再看看PingTask 任务里runPinger方法的关键逻辑:

 

                results = pingerStrategy.pingServers(ping, allServers);

                final List<Server> newUpList = new ArrayList<Server>();
                final List<Server> changedServers = new ArrayList<Server>();

                for (int i = 0; i < numCandidates; i++) {
                    boolean isAlive = results[i];
                    Server svr = allServers[i];
                    boolean oldIsAlive = svr.isAlive();

                    svr.setAlive(isAlive);

                    if (oldIsAlive != isAlive) {
                        changedServers.add(svr);
                        logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                            name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
                    }

                    if (isAlive) {
                        newUpList.add(svr);
                    }
                }
                upLock = upServerLock.writeLock();
                upLock.lock();
                upServerList = newUpList;
                upLock.unlock();

                notifyServerStatusChangeListener(changedServers);

从源码可以看出,PingTask运行runPinger方法,根据pingerStrategy.pingServers(ping, allServers)来获取服务的可用性,然后对比前后服务的状态,如果状态一致,则不去EurekaClient(一般用Eureka作为注册中心,可换成其他注册中心)获取注册列表;否则,则调用notifyServerStatusChangeListener通知EurekaClient更新或重新拉取。

简单总结下完整的过程:
RibbonLoadBalancerClient(负载均衡客户端)初始化(调用execute),通过ILoadBalance从Eureka注册中心获取服务注册列表,同时以10s为间隔往EurekaClient发送ping,来保证服务的可用性,如果服务前后发生改变,则ILoadBalance重新从Eureka注册中心获取。RibbonLoadBalancerClient拿到服务注册列表之后,再根据IRule具体的策略,去获取对应的服务实例。

3、负载均衡策略

前面讲到RibbonLoadBalancerClient获取具体服务实例的过程,这里就需要了解下负载均衡策略。众所周知,使用负载均衡的好处主要有:当一台或多台机器宕机之后,剩余的机器可以保证服务正常运行;分担机器运行的压力,防止某一高峰机器CPU负载过高。
常见的策略有:随机(Random)、轮询(RoundRobin)、一致性哈希(ConsistentHash)、哈希(Hash)、加权(Weighted)

 

public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            log.warn("no load balancer");
            return null;
        }

        Server server = null;
        int count = 0;
        while (server == null && count++ < 10) {
            List<Server> reachableServers = lb.getReachableServers();
            List<Server> allServers = lb.getAllServers();
            int upCount = reachableServers.size();
            int serverCount = allServers.size();

            if ((upCount == 0) || (serverCount == 0)) {
                log.warn("No up servers available from load balancer: " + lb);
                return null;
            }

            int nextServerIndex = incrementAndGetModulo(serverCount);
            server = allServers.get(nextServerIndex);

            if (server == null) {
                /* Transient. */
                Thread.yield();
                continue;
            }

            if (server.isAlive() && (server.isReadyToServe())) {
                return (server);
            }

            // Next.
            server = null;
        }

        if (count >= 10) {
            log.warn("No available alive servers after 10 tries from load balancer: "
                    + lb);
        }
        return server;
    }

 

private int incrementAndGetModulo(int modulo) {
        for (;;) {
            int current = nextServerCyclicCounter.get();
            int next = (current + 1) % modulo;
            if (nextServerCyclicCounter.compareAndSet(current, next))
                return next;
        }
    }

轮询算法其实就一句(current + 1) % modulo,每次都取下一台服务器。

 

protected int chooseRandomInt(int serverCount) {
        return ThreadLocalRandom.current().nextInt(serverCount);
    }

ThreadLocalRandom获取随机数即可

Ribbon的源码分析大概就这样,后面可能会不定期更新,有兴趣的朋友可以继续深入了解下,有啥问题也可以在评论中一起讨论下。
最后有件很重要的事,那就是麻烦点赞关注赞赏,谢谢(๑•̀ㅂ•́)و✧



本文首发于java黑洞网,csdn同步更新

标签:return,Spring,server,源码,Cloud,服务,null,public,loadBalancer
来源: https://blog.csdn.net/weixin_39363245/article/details/115011239