其他分享
首页 > 其他分享> > 服务治理 Eureka

服务治理 Eureka

作者:互联网

Spring Cloud Eureka 是 Spring Cloud Netflix 微服务套件中的一部分,它基于 Netflix 做了二次封装,主要负责完成微服务架构中的服务治理功能。   服务治理是微服务架构中最为核心和基础的模块,主要用来实现各个微服务实例的自动化注册与发现。   对于小型系统而言,整个生态体系中可能只有几个或者十几个服务,此时,通过手工静态配置,我们就可以维护好服务实例清单,管理好服务之间的接口调用,以及服务的升级部署等工作。但是对于中大型系统,尤其是业务功能复杂的公司,需要管理的服务实例数量可能会非常多,此时,静态配置就变得越来越难以维护。   另外,随着业务的发展,我们服务的集群规模、服务位置、命名也可能发生变化,这时候静态配置就是件非常头疼的事情。整个架构体系就是‘牵一发而动全身’的状态。   服务治理框架的实现都是围绕着服务注册服务发现机制来完成对微服务应用实例的自动化管理。

服务注册

在服务治理框架中,通常都会构建一个服务注册中心,每个服务实例向注册中心登记自己提供的服务,将主机、端口号、版本号、通信协议等一些附加信息作为元数据告知注册中心。注册中心会按照服务名分类组织服务清单,并且会以心跳的方式监测清单中的服务实例是否可用,若不可用,则从服务清单中剔除。

服务发现

在微服务中,每个服务都可能存在多个实例,服务之间的调用不再通过指定具体的实例地址来实现,而是通过向服务名发起请求调用实现。因此,服务调用方在调用服务提供方提供的接口时,需要从注册中心获取服务提供方的所有服务实例信息,然后从中选择一个服务实例调用。   服务治理组件 Eureka 的基础架构中有三个核心要素:

基础架构

当架构中存在多个服务注册中心时,通过配置让它们相互注册,组成高可用集群。  

服务提供者

 

服务注册

服务提供者在启动时,会通过 REST 请求的方式,将自己注册到服务注册中心(Eureka Server)中,同时带上了一些服务自身的元数据信息。Eureka Server 接收到这个 REST 请求后,将服务的元数据存储在一个双层的 Map 结构中:其中第一层的 key 是服务名,第二层的 key 是具体的服务名。

服务同步

为了保证服务的高可用性,一般会搭建多个服务注册中心,让它们相互注册,构成一个高可用的服务注册中心集群。 此时,由于服务注册中心之间互相同步服务,所以无论服务提供者注册到任何一个注册中心,该注册中心都会将注册请求转发到服务集群中相连的其他服务注册中心,从而实现注册中心之间的服务同步。

服务续约

服务提供者在向服务注册中心注册完服务后,会维护一个‘心跳’用来持续告诉服务注册中心:‘我还活着’,以防止服务注册中心在做‘剔除任务’时将该服务实例从服务列表中剔除。 服务提供者为对应实例维护‘心跳’的行为称为服务续约(Renew)。 在Eureka中有两个与服务续约相关的重要属性,可以通过配置文件进行修改:
eureka.instance.lease-renewal-interval-in-seconds=30        #心跳时间间隔,默认为 30 秒
eureka.instance.lease-expiration-duration-in-seconds=90        #服务实例的有效时间,默认为 90 秒

 

服务消费者

 

获取服务

服务消费者在启动的时候,会向服务注册中心发送一个 REST 请求,来获取上面注册的服务清单。 为了性能考虑,Eureka Server 会维护一份只读的缓存服务清单,并且这份清单会每隔 30 秒更新一次,剔除其中的实效服务。 获取服务是服务消费的基础,所以必须通过配置确保服务消费者能够从注册中心获取服务清单:
eureka.client.fetch-registry=true        #从服务注册中心拉取服务,默认为 true
eureka.client.egistry-fetch-interval-seconds=30        #缓存清单更新时间,默认为 30 秒

服务调用

服务消费者在获取到服务清单后,通过服务名就可以获取到对应的所有服务实例和实例的元数据信息。拥有这些信息后,服务消费者就可以根据需求选择具体的调用实例。 在 Ribbon 中,默认采用轮询方式调用服务,从而实现客户端负载均衡。当然,它也提供了其他均衡策略。 在 Eureka 中,有 Region 和 Zone 的概念,一个 Region 对应多个 Zone。每个服务实例都会被注册到一个 Zone 中,所以每个服务实例会对应一个 Region 和一个 Zone。在服务调用的时候,会优先访问同一个 Zone 中的服务提供方,如果找不到,再访问其他的 Zone。

服务下线

在系统运行过程中,必然会存在关闭或者重启某个服务实例的情况,此时我们肯定不希望服务消费者向已关闭的服务发送请求。因此,需要将该服务实例从服务清单中剔除。 当服务正常关闭时,会向服务注册中心(Eureka Server)发送一个服务下线的 REST 请求。注册中心接收到该请求后,会将该服务实例置为下线(DOWN)状态,并把下线请求传播出去。

服务注册中心

失效剔除

有些时候,服务注册中心不一定会正常下线,可能因为内存溢出、网络故障等原因,使得服务不能正常工作。此时,服务实例无法提供服务,但是注册中心并没有收到服务下线的通知。 为了及时将服务列表中的故障服务实例剔除,Eureka Server 在启动的时候会创建一个定时任务,默认每隔一定时间(默认 60 秒)将当前列表中超时(默认 90 秒)而未续约的服务实例剔除出去。

自我保护

服务实例注册到服务注册中心 Eureka Server 中后,会维护一个心跳,告诉注册中心自己还活着。Eureka Server 在运行期间,会统计心跳失败的比例在 15 分钟内是否低于 85%(在单机调试的时候很容易满足,实际生产环境通常是网络不稳定导致)。如果出现低于的情况,Eureka Server 会将当前的服务实例保护起来,让这些实例不会过期。 在服务实例被保护的期间,如果服务实例出现了问题,那么消费者会拿到实际上已不存在的服务实例,在服务调用时会出现调用失败的情况。所以客户端必须要有容错机制,比如使用请求重试、断路器等机制。 由于本地开发时,非常容易触发注册中心的保护机制,因此,可以通过配置关闭 Eureka Server 的保护机制:
eureka.server.enable-self-preservation=false        #关闭注册中心的服务保护机制

服务治理原理分析

Eureka 服务治理是微服务中的基础组件,那么它是如何实现的呢?

对于服务注册中心、服务提供者、服务消费者这三个主要元素来说,后两者(也就是 Eureka 客户端)在整个运行机制中是大部分通信行为的主动发起者,而注册中心主要是处理客户端发送的请求。所以,我们可以从 Eureka 客户端开始,看看它是如何完成这些主动通信行为的。

开发过程中,要将一个普通的 Spring Boot 应用注册到 Eureka Server,需要做两件事:

  1. 在应用主类上添加 @EnableDiscoveryClient 注解;
  2. 在配置文件 application.properties 或者 application.yml 中配置 Eureka Server 的地址信息;

从 @EnableDiscoveryClient 的命名上可以看出,这是一个与服务发现机制有关的注解。所以我们可以查看下它的源码,从它开始进行分析:

@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {

// 如果设置为 true,当前服务会自动注册到 Eureka Server boolean autoRegister() default true; }

 在 @EnableDiscoveryClient 中只有一个 autoRegister 属性,它会开启 Eureka Client 的自动注册功能。在后面的处理逻辑中,程序会获取 autoRegister 的属性值,如果为 true,就向 Eureka Server 发送一个 REST 请求,将当前服务注册到服务中心。而在此之前,它首先需要获取到 Eureka Server 的地址。

那么 Eureka Client 是如何获取到 Eureka Server 的地址信息呢?

这里就用到了我们将 Spring Boot 应用注册到 Eureka Server 时,做的第二件事:在配置文件中配置服务注册中心的地址信息。在服务注册时,Eureka Client 会读取配置文件,从中获取到我们配置的 Eureka Server 地址,然后将自己注册进去。获取 Eureka Server 地址的逻辑原本在 com.netflix.discovery.DiscoveryClient 的 getServiceUrlsFromConfig 函数中,但是在 SR5 版本中,这个函数已经被废弃掉,并 @link 到了替代类 com.netflix.discovery.endpoint.EndpointUtils 中,下面是 EndpointUtils 中的 getServiceUrlsFromConfig 函数代码,它负责从配置文件中读取服务注册中心地址:

public static final String DEFAULT_REGION = "default";  // 服务实例的默认Region
public static final String DEFAULT_ZONE = "default;  // 服务实例的默认Zone

public static List<String> getServiceUrlsFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {
    List<String> orderedUrls = new ArrayList<String>();
    String region = getRegion(clientConfig);  // 获取服务实例对应的 Region
    String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());  // 获取服务实例对应的 Zone
    if (availZones == null || availZones.length == 0) {
        availZones = new String[1];
        availZones[0] = DEFAULT_ZONE;
    }
    int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);  // 
    List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[myZoneOffset]);
    if (serviceUrls != null) {
        orderedUrls.addAll(serviceUrls);
    }
    int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1);
    while (currentOffset != myZoneOffset) {
        serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[currentOffset]);
        if (serviceUrls != null) {
            orderedUrls.addAll(serviceUrls);
        }
        if (currentOffset == (availZones.length - 1)) {
            currentOffset = 0;
        } else {
            currentOffset++;
        }
    }

    if (orderedUrls.size() < 1) {
        throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!");
    }
    return orderedUrls;
}

Eureka 中有一个区(Region)和域(Zone)的概念,Region 和 Zone 是一对多的关系:

在使用 Ribbon 来实现服务调用时,通过对 Zone 的配置可以在负载均衡时实现区域亲和特性:Ribbon 默认策略是优先访问和客户端处于同一个 Zone 的服务端实例,只有当同一个 Zone 中没有可用的服务端实例时,才会访问其他 Zone 中的实例。所以通过 Zone 属性,配合实际部署的物理结构,我们可以有效的设计出对区域性故障的容错集群。

eureka.client.region=east-1
eureka.client.availability-zones=east-zone, south-zone

Eureka Client 在获取服务注册中心 Eureka Server 的地址信息时,会先通过 org.springframework.cloud.netflix.eureka.EurekaClientConfigBean(负责读取配置文件)读取 Client 的 region 属性,

public static String getRegion(EurekaClientConfig clientConfig) {
    String region = clientConfig.getRegion();  // 通过 EurekaClientConfigBean 获取配置文件中的 eureka.client.region 属性值
    if (region == null) {  // 如果服务实例没有配置 eureka.client.region 属性,则使用默认的 Region 值:default
        region = DEFAULT_REGION;
    }
    region = region.trim().toLowerCase();
    return region;
}

读取到服务实例对应的 Region 信息后,应用程序会根据 region 值获取对应的 Zone 数据。

public String[] getAvailabilityZones(String region) {
    String value = this.availabilityZones.get(region);
    if (value == null) {
        value = DEFAULT_ZONE;
    }
    return value.split(",");
}

在获取了 Region 和 Zone 的信息后,才开始真正加载 Eureka Server 的地址。Eureka 会根据传入的参数按照一定算法确定加载哪一个 Zone 中的 serviceUrls。

int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);
List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[myZoneOffset]);

org.springframework.cloud.netflix.eureka.EurekaClientConfigBean 是 EurekaClientConfig 接口的实现,用于加载配置文件的内容。在该类中,我们可以看到获取 Eureka Server 地址信息的具体实现逻辑:

public static final String DEFAULT_ZONE = "defaultZone";
public List<String> getEurekaServerServiceUrls(String myZone) {
    String serviceUrls = this.serviceUrl.get(myZone);
    if (serviceUrls == null || serviceUrls.isEmpty()) {
        serviceUrls = this.serviceUrl.get(DEFAULT_ZONE);
    }
    if (!StringUtils.isEmpty(serviceUrls)) {
        final String[] serviceUrlsSplit = StringUtils.commaDelimitedListToStringArray(serviceUrls);
        List<String> eurekaServiceUrls = new ArrayList<>(serviceUrlsSplit.length);
        for (String eurekaServiceUrl : serviceUrlsSplit) {
            if (!endsWithSlash(eurekaServiceUrl)) {
                eurekaServiceUrl += "/";
            }
            eurekaServiceUrls.add(eurekaServiceUrl.trim());
        }
        return eurekaServiceUrls;
    }
return new ArrayList<>(); }
public static String[] commaDelimitedListToStringArray(@Nullable String str) {  // 根据逗号分割
return delimitedListToStringArray(str, ",");
}

eureka.client.serviceUrl.defaultZone 可以配置多个值,值之间需要通过逗号分隔。

服务注册

应用在启动时,会将自己注册到 Eureka Servery 中,这部分功能是如何实现的呢?

在 DiscoveryClient 的构造函数中,调用了一个初始化函数 initScheduledTasks。在这个函数中,程序会判断应用是否开启了向注册中心注册的功能,如果开启了,就会向 Eureka Server 发送一个 REST 请求。

private void initScheduledTasks() {
    ...
    if (clientConfig.shouldRegisterWithEureka()) {  // 根据 eureka.client.register-with-eureka 属性值,判断是否要向 Eureka Server 发送注册请求
        ...
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); 
        ...
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

在 initScheduledTasks 函数中,有一个 if 语句,当 eureka.client.register-with-eureka 的属性值为 true 时,会创建一个 InstanceInfoReplicator 类的实例,它会执行一个定时任务,而这个定时任务的工作内容就是 InstanceInfoReplicator  中的 run 函数中的逻辑:

public void run() {
    try {
        discoveryClient.refreshInstanceInfo();

        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            discoveryClient.register();  // 调用 DiscoveryClient 中的 register 函数,这也是服务注册真正触发的地方
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

在定时任务 InstanceInfoReplicator 的 run 函数中,我们看到有一行代码调用了 DiscoveryClient 的 register 函数,这里会触发 Eureka Client 的注册逻辑,向 Eureka Server 发送注册请求。

boolean register() throws Throwable {
    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);  // 发送一个 REST 注册请求,将服务实例信息发送给 Eureka Server
    } catch (Exception e) {
        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
        throw e;
    }
   ...return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}

 

服务获取与服务续约

在上面查看 DiscoveryClient 类的 initScheduledTasks 函数时,我们发现它里面有两个定时任务,分别用于服务获取服务续约。其中,服务续约和服务注册在同一个 if 逻辑中,只要应用会被注册到 Eureka Server 中,它就会维护一个 “心跳”(创建一个定时任务),定时向 Eureka Server 发送 REST 请求,告诉 Eureka Server 自己还活着。

if (clientConfig.shouldRegisterWithEureka()) {
    int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
    int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
    scheduler.schedule(  // 创建一个定时任务,定时向 Eureka Server 发送心跳
            new TimedSupervisorTask(
                    "heartbeat",
                    scheduler,
                    heartbeatExecutor,
                    renewalIntervalInSecs,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new HeartbeatThread()  // 服务续约的具体逻辑在线程类 HeartbeatThread 中
            ),
            renewalIntervalInSecs, TimeUnit.SECONDS);
}

服务续约的实现比较简单,它会向 Eureka Server 发送一个 REST 请求,告诉 Eureka Server 自己还活着。如果请求成功,就将

private class HeartbeatThread implements Runnable {
    public void run() {
        if (renew()) {  // 发送 “心跳” 续约,如果续约成功,则更新存活记录为当前时间
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}
boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); // 发送一个 HTTP 请求,进行服务续约if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment();long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } }

服务获取的实现会复杂一些,会根据是否是第一次获取发起不同的 REST 请求和响应的处理。

if (clientConfig.shouldFetchRegistry()) {  // 如果配置文件中 eureka.client.fetch-registry=true 属性允许客户端从 Eureka Server 中获取服务实例,则创建一个定时任务,定时获取实例数据
    int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();  // 获取配置文件中的 eureka.client.registry-fetch-interval-seconds 属性(单位:秒),每隔多长时间从 Eureka Server 中获取一次实例数据
    int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
    scheduler.schedule(  // 创建一个定时任务,定期获取服务实例数据
            new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new CacheRefreshThread()
            ),
            registryFetchIntervalSeconds, TimeUnit.SECONDS);
}

服务拉取时,会创建一个 CacheRefreshThread 任务,间隔一定时间周期执行。这里间隔的周期就是 eureka.client.registry-fetch-interval-seconds 属性值,单位为秒。

void refreshRegistry() {
    try {
        boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

        boolean remoteRegionsModified = false;
        // This makes sure that a dynamic change to remote regions to fetch is honored.
        String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
        if (null != latestRemoteRegions) {
            String currentRemoteRegions = remoteRegionsToFetch.get();
            if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                    if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                        String[] remoteRegions = latestRemoteRegions.split(",");
                        remoteRegionsRef.set(remoteRegions);
                        instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                        remoteRegionsModified = true;
                    } else {
                        logger.info("Remote regions to fetch modified concurrently," +
                                " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                    }
                }
            } else {
                // Just refresh mapping to reflect any DNS/Property change
                instanceRegionChecker.getAzToRegionMapper().refreshMapping();
            }
        }

        boolean success = fetchRegistry(remoteRegionsModified);
        if (success) {
            registrySize = localRegionApps.get().size();
            lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
        }

        if (logger.isDebugEnabled()) {
            StringBuilder allAppsHashCodes = new StringBuilder();
            allAppsHashCodes.append("Local region apps hashcode: ");
            allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
            allAppsHashCodes.append(", is fetching remote regions? ");
            allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
            for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                allAppsHashCodes.append(", Remote region: ");
                allAppsHashCodes.append(entry.getKey());
                allAppsHashCodes.append(" , apps hashcode: ");
                allAppsHashCodes.append(entry.getValue().getAppsHashCode());
            }
            logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                    allAppsHashCodes);
        }
    } catch (Throwable e) {
        logger.error("Cannot fetch registry from server", e);
    }
}

 

服务注册中心处理

通过上面的分析,可以看到 Eureka Client 与 Eureka Server 的交互都是通过 REST 请求进行的。对于 Eureka Client 的各种请求的处理逻辑都定义在  com.netflix.eureka.resources 包下,以 “服务注册” 请求的处理逻辑为例,它对应的处理逻辑在 ApplicationResource.class 中。


private final PeerAwareInstanceRegistry registry;

@POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // 校验服务实例是否包含所有必须的属性信息,比如:服务名、IP地址等 ... registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }

在对注册的服务实例信息进行一堆校验后,会调用 org.springframework.cloud.netflix.eureka.server.InstanceRegistry 中的 register 函数进行服务注册。InstanceRegistry 中的 register 函数中主要做了两件事:

  1. 调用 publishEvent 函数,将新服务注册的事件传播出去;
  2. 调用 com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl 中的 register 函数完成实例注册;
public void register(final InstanceInfo info, final boolean isReplication) {
    this.handleRegistration(info, this.resolveInstanceLeaseDuration(info), isReplication);
    super.register(info, isReplication);
}

private void handleRegistration(InstanceInfo info, int leaseDuration, boolean isReplication) {
    this.log("register " + info.getAppName() + ", vip " + info.getVIPAddress() + ", leaseDuration " + leaseDuration + ", isReplication " + isReplication);
    this.publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration, isReplication));
}

 PeerAwareInstanceRegistryImpl中的 register 函数也不是服务注册逻辑真正实现的地方,这里它会调用父类(com.netflix.eureka.registry.AbstractInstanceRegistry )的 register 实现:

public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

AbstractInstanceRegistry 会将新的服务实例信息保存在一个 ConcurrentHashMap 容器 registry 中,registry 是一个两层的 Map 结构,第一层的 key 存储服务名(InstanceInfo 中的 appName 属性);第二层的 key 存储实例名(InstanceInfo 中的 instanceId 属性)。

private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { read.lock(); Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted // InstanceInfo instead of the server local copy. if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } } else { // The lease does not exist and hence it is a new registration synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to register it, increase the number of clients sending renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; updateRenewsPerMinThreshold(); } } logger.debug("No previous lease information found; it is new registration"); } Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } gMap.put(registrant.getId(), lease); synchronized (recentRegisteredQueue) { recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); } // This is where the initial state transfer of overridden status happens if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // If the lease is registered with UP status, set lease service up timestamp if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } registrant.setActionType(ActionType.ADDED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } }

服务端的请求和接收处理逻辑类似,这里就不再对服务续约和服务获取等逻辑做具体说明。

总结

如果想将一个 Spring Boot 应用注册到 Eureka 服务注册中心,只需要做两件事:

  1. 在应用主类上添加 @EnableDiscoveryClient 注解;
  2. 在配置文件 application.properties 或者 application.yml 中配置 Eureka Server 的地址信息;

Eureka 中有 Region 和 Zone 的概念,一个 Region 可以对应多个 Zone。Ribbon 在实现服务调用时,默认策略是优先访问和客户端处于同一个 Zone 的服务端实例,只有当同一个 Zone 中没有可用的服务端实例时,才会访问其他 Zone 中的实例。所以通过 Zone 属性,配合实际部署的物理结构,我们可以有效的设计出对区域性故障的容错集群。

Eureka Client 在启动服务时,会向 Eureka Server 发送注册请求,同时会创建两个定时任务,一个定时任务用于发送 “心跳”,完成服务续约功能;另一个定时任务用于读取服务实例列表。

Eureka Server 收到服务实例的注册请求后,会将新的服务实例信息保存在一个双层的 Map 中(这是一个 ConcurrentHashMap,且添加元素时,会使用 synchronize 关键字进行同步控制):

如果要搭建一个高可用的 Eureka 服务注册中心,可以启动多个 Eureka Server 实例,通过配置文件(application.yml)配置,将每个 Eureka Server 实例作为一个客户端注册到其他 Eureka Server 实例中,形成一个相互注册的网状结构。同时,每个 Eureka Client 服务实例需要配置一个 Eureka Server 地址列表,包含所有的 Eureka Server 实例地址。

Eureka Server 遵循 CAP 定理,与 Zookeep 不同,它选择的是 AP,也就是可用性和分区容错性。在 Eureka 高可用架构中,只要有一个 Eureka Server 实例能提供服务,服务注册和管理功能就可以正常使用,当其他故障 Eureka Server 重启后,会从当前的 Eureka Server 中同步(通过 REST 请求)服务实例信息。

 

 

标签:服务,Server,实例,治理,注册,Eureka,registrant
来源: https://www.cnblogs.com/juxiemushu/p/12504510.html