编程语言
首页 > 编程语言> > Eureka Server源码解析回顾

Eureka Server源码解析回顾

作者:互联网

一 概述

Eureka Server作为一个开箱即用的服务注册中心,提供了满足与Eureka Client交互需求的功能:

Eureka Server同时也是一个Eureka Client,在不禁止Eureka Server的客户端行为的时候,它会向它配置文件中的其他Eureka Server进行拉取注册表,服务注册和发送心跳等操作。

二 服务实例注册表

InstanceRegistry是Eureka Server中注册表管理的核心接口。在InstanceRegistry的类图中可以发现存在两个InstanceRegistry。一个为InstanceRegistry接口,一个为InstanceRegistry类。

InstanceRegistry接口是Eureka Server注册表的最核心接口,其职责是在内存中管理注册到Eureka Server中的服务信息。它分别继承了LeaseManager接口和LookupService接口。

LeanseManager接口对注册到Eureka Server中的服务实例租约进行管理,包括服务注册,服务下线,服务租约更新以及服务剔除等操作。

public interface LeaseManager<T> {
    void register(T var1, int var2, boolean var3);

    boolean cancel(String var1, String var2, boolean var3);

    boolean renew(String var1, String var2, boolean var3);

    void evict();
}

LeaseManager中管理的对象是Lease,Lease代表一个Eureka Client服务实例信息的租约,它提供了对其内特有的类的时间有效性操作。Lease持有的类是代表服务实例信息的InstanceInfo。Lease中定义了租约的操作类型,分别是注册,下线,更新,同时提供了对租约中时间属性的各项操作。租约默认有效时长(duration)为90秒。

LookupService提供了对服务实例进行检索的功能。

public interface LookupService<T> {

    Application getApplication(String appName);

    Applications getApplications();

    List<InstanceInfo> getInstancesById(String id);

    InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);
}

三 服务注册

Eureka Client在发起注册时会将自身的服务实例元数据封装在InstanceInfo中,然后将InstanceInfo发送到Eureka Server。Eureka Server在接收到Eureka Client发送的InstanceInfo后将会尝试将其放到本地注册表中以供Eureka Client进行服务发现。

服务注册的主要实现位于AbstractInstanceRegistry的registry方法中

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
        public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        //获取锁
        read.lock();

        //这里的registry是ConcurrentHashMap<String,Map<String,Lease<InstanceInfo>>>
          registry,根据appName对服务实例集群进行分类
        try {
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());

            REGISTER.increment(isReplication);

            if (gMap == null) {

                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
        //这里有一个比较严谨的操作,防止在添加新的服务实例集群租约时把已由的其他线程添加的集群租约覆盖掉,如果存在该键值,直接返回已存在的值,否则添加该键值对,返回null。
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);

                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
           
            //根据instanceId获取实例的租约
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());

            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();

                //如果该实例的租约已经存在,比较最后更新时间戳的大小,取最大值的注册信息为有效
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
              
                    registrant = existingLease.getHolder();
                }
            } else {
                //如果租约不存在,这时一个新的注册实例
                synchronized (lock) {
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        //自我保护机制
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        updateRenewsPerMinThreshold();
                    }
                }
            }
            //创建新的租约
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                //如果租约存在,继承租约的服务上线初始时间
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            //保存租约
            gMap.put(registrant.getId(), lease);
            //添加最近注册队列
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
           
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
              
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                 
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());

            if (overriddenStatusFromMap != null) {
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            //根据覆盖状态规则得到服务实例的最终状态,并设置服务实例的当前状态
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);
            //如果服务实例状态为UP,设置租约的服务上线时间,只有第一次设置有效
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            //添加最近租约变更记录队列,标识ActionType为ADDED
            //这将用于Eureka Client增量式获取注册表信息
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            //设置服务实例信息更新时间
            registrant.setLastUpdatedTimestamp();
            //设置response缓存过期,这将用于Eureka Client全量获取注册表信息
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        } finally {
            //释放读锁
            read.unlock();
        }
    }
}

根据源码可知,在AbstractInstanceRegister类的register方法中,服务实例的InstanceInfo保存在Lease中,Lease在AbstractInstanceRegistry中统一通过ConcurrentHashMap保存在内存中。在服务注册过程中,会先获取一个读锁,防止其他线程对registry注册表进行数据操作,避免数据的不一致。然后从registry查询对应的InstanceInfo租约是否已经存在注册表中,根据appName划分服务集群,使用InstanceId唯一标记服务实例。

如果租约存在,比较两个租约中的InstanceInfo的最后更新时间lastDirtyTimestamp,保留时间戳大的服务实例信息InstanceInfo。如果租约不存在,意味这是一次全新的服务注册,将会进行自我保护的统计,创建新的租约保存InstanceInfo。接着将租约放到resgitry注册表中。

之后将进行一系列缓存操作并根据覆盖状态规则覆盖设置服务实例的状态,缓存操作包括将InstanceInfo加入用于统计Eureka Client增量式获取注册表信息的recentlyChangedQueue和失效responseCache中对应的缓存。最后设置服务实例租约的上线时间用于计算租约的有效是封建,释放读锁并完成服务注册。

四 接受服务心跳

在Eureka Client完成服务注册之后,它需要定时向Eureka Server发送心跳请求(默认30秒一次),维持自己在Eureka Server中租约的有效性。

Eureka Server处理心跳请求的核心逻辑位于AbstractIntanceRegistry中的renew方法中。renew方法是对Eureka Client位于注册表中的租约的续约操作,不像register方法需要服务实例信息,仅仅是根据服务实例的服务名和服务实例id即可更新对一个你租约的有效时间。

public boolean renew(String appName, String id, boolean isReplication) {
       
        RENEW.increment(isReplication);
        //根据appName获取服务集群的租约集合
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        //租约不存在,直接返回false
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            return false;
        } else {
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                //根据覆盖状态规则得到服务实例的最终状态
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    //如果得到的服务实例最后状态时UNKNOWN,取消续约
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { 
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
                }
            }
            //统计每分钟续租的次数,用于自我保护
            renewsLastMin.increment();
            //更新租约中的有效时间
            leaseToRenew.renew();
            return true;
        }
    }

在renew方法中,不关注InstanceInfo,仅关注租约本身以及租约的服务实例状态。如果更具服务实例的appName和instanceInfoId查询服务实例的租约,并且根据getOverriddenInstanceStatus方法得到的instanceStatus不为InstanceStatus.UNKNOWN,那么跟新租约中的有效时间,即更新租约Lease中的lastUpdateTimestamp,达到续约的目的;如果租约不存在,那么返回续租失败的结果。

五 服务剔除

当Eureka Client在注册后,既没有续约,由于服务崩溃或者网络异常等原因也没有下线,那么服务的状态就处于不可知的状态,不能保证能够从该服务实例中获取到回馈,所以需要将该服务剔除。AbstractInstanceRegistry中的evict方法定时清理这些不稳定的服务,该方法会批量将注册表中所有过期租约剔除。

    @Override
    public void evict() {
        evict(0l);
    }

    public void evict(long additionalLeaseMs) {
        
        //自我保护相关,如果出现该状态,不允许剔除服务。
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }
        
        //遍历注册表register,一次性获取所有的过期租约。
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }
        
        //计算最大允许剔除的租约的数量,获取注册表租约总数
        int registrySize = (int) getLocalRegistrySize();
        //计算注册表租约的阈值,与自我保护相关
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;

        //计算剔除租约的数量
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {

            Random random = new Random(System.currentTimeMillis());
            //逐个随机剔除
            for (int i = 0; i < toEvict; i++) {
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                //逐个剔除
                internalCancel(appName, id, false);
            }
        }
    }

服务剔除将会遍历registry注册表,找出其中所有的过期租约,然后根据配置文件中续租百分比阈值和当前注册表的租约总数量计算出最大允许的剔除租约的数量(当前注册表中租约总数减去当前注册表租约表阈值),分批次剔除过期的服务实例租约。对过期的服务实例租约调用internalCancel服务下线的方法将其从注册表中清除掉。

    @Override
    public boolean cancel(String appName, String id, boolean isReplication) {
        return internalCancel(appName, id, isReplication);
    }

    protected boolean internalCancel(String appName, String id, boolean isReplication) {
        //获取读锁,防止被其他线程进行修改
        read.lock();
        try {
            CANCEL.increment(isReplication);
            //根据appName获取服务实例的集群
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            //移除服务实例的租约
            if (gMap != null) {
                leaseToCancel = gMap.remove(id);
            }
            //将服务实例信息添加到最近下线服务实例统计队列
            recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
            InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
            if (instanceStatus != null) {
     
            }
            //租约不存在,返回false
            if (leaseToCancel == null) {
                CANCEL_NOT_FOUND.increment(isReplication);
                return false;
            } else {
                //设置租约的下线时间
                leaseToCancel.cancel();
                InstanceInfo instanceInfo = leaseToCancel.getHolder();
                String vip = null;
                String svip = null;
                if (instanceInfo != null) {
                    instanceInfo.setActionType(ActionType.DELETED);
                    //添加最近租约变更记录队列,标识ActionType为DELETED
                    //这将用于Eureka Client增量式获取注册表信息
                    recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                    instanceInfo.setLastUpdatedTimestamp();
                    vip = instanceInfo.getVIPAddress();
                    svip = instanceInfo.getSecureVipAddress();
                }
                //设置response缓存过期
                invalidateCache(appName, vip, svip);
            }
        } finally {
            //释放读锁
            read.unlock();
        }

        synchronized (lock) {
            if (this.expectedNumberOfClientsSendingRenews > 0) {
                this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
                updateRenewsPerMinThreshold();
            }
        }
        //下线成功
        return true;
    }

internalCancel方法与register方法的行为过程很类似,首先通过registery根据服务名和服务实例id查询关于服务实例的租约Lease是否存在,统计最近请求下线的服务实例用于Eureka Server主页展示。如果租约不存在,返回下线失败;如果租约存在,从registery注册表中移除,设置租约的下线时间,同时在最近租约变更记录队列中添加新的下线记录,以用于Eureka Client的增量式获取注册表信息,最后设置response缓存过期。

internalCancel方法中国同样通过读锁保证registery注册表中数据的一致性,避免脏读。

源码阅读至此,并未完全理解通透,待续!

标签:服务,实例,Eureka,源码,注册表,租约,Server,registrant
来源: https://blog.csdn.net/calm_encode/article/details/112728932