其他分享
首页 > 其他分享> > Eureka(4)--->spring-cloud生态中对服务发下的抽象以及eurekaDiscoveryClient的实现原理

Eureka(4)--->spring-cloud生态中对服务发下的抽象以及eurekaDiscoveryClient的实现原理

作者:互联网

1、spring-cloud生态中对服务发下的抽象核心如下:

      

      主要发接口就是 DiscoveryClient 以及注解 @EnableDiscoveryClient

 

2、EurekaDiscoveryClient的实现原理:

      

      EurekaDiscoveryClient 实现了spring-cloud生态中定义的 DiscoveryClient 接口,EurekaDiscoveryClient的核心属性就是Netflix的EurekaClient、Netflix的EurekaClientConfig。

      2.1、自动装配EurekaDiscoveryClient的原理解析,在 EurekaDiscoveryClientConfiguration 中如下源码:

	@Bean
	@ConditionalOnMissingBean
	public EurekaDiscoveryClient discoveryClient(EurekaClient client,
			EurekaClientConfig clientConfig) {
        使用一个Netflix的EurekaClient + 客户端配置类来实例化一个EurekaDiscoveryClient
		return new EurekaDiscoveryClient(client, clientConfig);
	}

           那么问题来了这个 Netflix的EurekaClient、Netflix的EurekaClientConfig是如何自动装配的呢????我们来到  EurekaClientAutoConfiguration 自动配置类中发现如下源码:

	配置一个EurekaClientConfigBean 的Bean, EurekaClientConfigBean 实现了EurekaClientConfig接口,因此也是一个Netflix的EurekaClientConfig实例。
    @Bean
	@ConditionalOnMissingBean(value = EurekaClientConfig.class,
			search = SearchStrategy.CURRENT)
	public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
		return new EurekaClientConfigBean();
	}


不需要动态刷新的EurekaClient:配置一个Netflix的EurekaClient Bean, 默认使用CloudEurekaClient实例,这个Bean是不需要动态刷新的。通过eureka.client.refresh.enable这个配置像可以进行设置。
	@Bean(destroyMethod = "shutdown")
	@ConditionalOnMissingBean(value = EurekaClient.class,
			search = SearchStrategy.CURRENT)
	public EurekaClient eurekaClient(ApplicationInfoManager manager,
			EurekaClientConfig config) {
		return new CloudEurekaClient(manager, config, this.optionalArgs,
				this.context);
	}

所以还有一个需要动态刷新的Netflix 的EurekaClient实例的配置:
	@Bean(destroyMethod = "shutdown")
	@ConditionalOnMissingBean(value = EurekaClient.class,
			search = SearchStrategy.CURRENT)
	@org.springframework.cloud.context.config.annotation.RefreshScope
	@Lazy
	public EurekaClient eurekaClient(ApplicationInfoManager manager,
			EurekaClientConfig config, EurekaInstanceConfig instance,
				@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
			// If we use the proxy of the ApplicationInfoManager we could run into a
			// problem
			// when shutdown is called on the CloudEurekaClient where the
			// ApplicationInfoManager bean is
			// requested but wont be allowed because we are shutting down. To avoid this
			// we use the
			// object directly.
			ApplicationInfoManager appManager;
			if (AopUtils.isAopProxy(manager)) {
				appManager = ProxyUtils.getTargetObject(manager);
			}
			else {
				appManager = manager;
			}
			CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
					config, this.optionalArgs, this.context);
			cloudEurekaClient.registerHealthCheck(healthCheckHandler);
			return cloudEurekaClient;
	}
    


           默认是配置动态刷新的 CloudEurekaClient,因为 eureka.client.refresh.enable 默认是true。

           以上就完成了EurekaDiscoveryClient实例的自动装配。

 

   2.2、EurekaDiscoveryClient 实例中的Netflix的EurekaClient实例初始化原理解析,为什么来说这个呢?那是因为 EurekaClient 初始化的时候做了很多事情,接下来我们来揭晓:

		CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
					config, this.optionalArgs, this.context);

            我么来到 CloudEurekaClient 的构造函数中:

	public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
			EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
			ApplicationEventPublisher publisher) {
		super(applicationInfoManager, config, args);
		this.applicationInfoManager = applicationInfoManager;
		this.publisher = publisher;
		this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
				"eurekaTransport");
		ReflectionUtils.makeAccessible(this.eurekaTransportField);
	}

            来到父类中的构造函数:


    public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
        this(applicationInfoManager, config, args, ResolverUtils::randomize);
    }

    public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
        this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
            private volatile BackupRegistry backupRegistryInstance;

            @Override
            public synchronized BackupRegistry get() {
                if (backupRegistryInstance == null) {
                    String backupRegistryClassName = config.getBackupRegistryImpl();
                    if (null != backupRegistryClassName) {
                        try {
                            backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
                            logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
                        } catch (InstantiationException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        } catch (IllegalAccessException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        } catch (ClassNotFoundException e) {
                            logger.error("Error instantiating BackupRegistry.", e);
                        }
                    }

                    if (backupRegistryInstance == null) {
                        logger.warn("Using default backup registry implementation which does not do anything.");
                        backupRegistryInstance = new NotImplementedRegistryImpl();
                    }
                }

                return backupRegistryInstance;
            }
        }, randomizer);
    }

          再来到this(...)方法中:藏得够深吧,这个方法里面做了注册、心跳定时任务初始化,本地服务列表缓存更新任务初始化等核心操作。

    @Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
        if (args != null) {
            this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
            this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
            this.eventListeners.addAll(args.getEventListeners());
            this.preRegistrationHandler = args.preRegistrationHandler;
        } else {
            this.healthCheckCallbackProvider = null;
            this.healthCheckHandlerProvider = null;
            this.preRegistrationHandler = null;
        }
        
        this.applicationInfoManager = applicationInfoManager;
        InstanceInfo myInfo = applicationInfoManager.getInfo();

        clientConfig = config;
        staticClientConfig = clientConfig;
        transportConfig = config.getTransportConfig();
        instanceInfo = myInfo;
        if (myInfo != null) {
            appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
        } else {
            logger.warn("Setting instanceInfo to a passed in null value");
        }

        this.backupRegistryProvider = backupRegistryProvider;
        this.endpointRandomizer = endpointRandomizer;
        this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
        localRegionApps.set(new Applications());

        fetchRegistryGeneration = new AtomicLong(0);

        remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
        remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

        如果需要从服务端拉取服务
        if (config.shouldFetchRegistry()) {
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }

        如果需要注册到eureka 服务端
        if (config.shouldRegisterWithEureka()) {
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }

        logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

        如果不从eureka服务端拉取数据,也注册到eureka服务端,那就不开启心跳、本地缓存刷新等定时任务。
        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
            logger.info("Client configured to neither register nor query for data.");
            scheduler = null;
            heartbeatExecutor = null;
            cacheRefreshExecutor = null;
            eurekaTransport = null;
            instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);

            initTimestampMs = System.currentTimeMillis();
            initRegistrySize = this.getApplications().size();
            registrySize = initRegistrySize;
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, initRegistrySize);

            return;  // no need to setup up an network tasks and we are done
        }

        如果既需要从eureka服务端拉取数据,也需要注册到服务端,那么就初始化相关定时任务。
        try {
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            1、创建一个策略类型的执行器
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());

            2、创建一个心跳执行器
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            3、创建一个缓存刷新执行器,缓存指的是,客户端从服务端获取的数据进行缓存,然后会定时去重新获取进行刷新。
            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args);

            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }

        if (clientConfig.shouldFetchRegistry()) {
            try {
                boolean primaryFetchRegistryResult = fetchRegistry(false);
                if (!primaryFetchRegistryResult) {
                    logger.info("Initial registry fetch from primary servers failed");
                }
                boolean backupFetchRegistryResult = true;
                if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
                    backupFetchRegistryResult = false;
                    logger.info("Initial registry fetch from backup servers failed");
                }
                if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
                    throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
                }
            } catch (Throwable th) {
                logger.error("Fetch registry error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }

        // call and execute the pre registration handler before all background tasks (inc registration) is started
        if (this.preRegistrationHandler != null) {
            this.preRegistrationHandler.beforeRegistration();
        }

        如果需要在初始化阶段进行服务注册,那就进行注册,但是如果注册失败,服务将启动失败。
        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
                if (!register() ) {
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {
                logger.error("Registration error at startup: {}", th.getMessage());
                throw new IllegalStateException(th);
            }
        }

        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch

        最后 初始化定时任务,例如:集群解析、心跳、拉取数据更新缓存等定时调度。
        initScheduledTasks();

        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        initRegistrySize = this.getApplications().size();
        registrySize = initRegistrySize;
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, initRegistrySize);
    }

            初始化定时调度的实现如下:

    private void initScheduledTasks() {

        1、如果需要从服务端拉取数据
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            2、先获取配置的 客户端拉取服务数据的间格时间(默认30秒)。
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();

            3、使用调度器+缓存刷新执行器来构建一个缓存刷新任务TimedSupervisorTask,之所以这样做的原因是失败重试的时候 做衰减重试。
            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new CacheRefreshThread()
            );

           4、 启动缓存刷新定时任务,默认30s执行一次。
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        5、如果需要将自己注册到服务端
        if (clientConfig.shouldRegisterWithEureka()) {

            6、获取服务续约的间隔时间(默认30秒)
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // Heartbeat timer
            7、使用调度器 + 心跳执行器 来构建一个心跳定时任务,原理与缓存刷新一直,做重试衰减。
            heartbeatTask = new TimedSupervisorTask(
                    "heartbeat",
                    scheduler,
                    heartbeatExecutor,
                    renewalIntervalInSecs,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new HeartbeatThread()
            );

            8、启动心跳定时任务。
            scheduler.schedule(
                    heartbeatTask,
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
            9、构建一个实例信息复制器,作用就是在实例信息发生改变的时候,进行服务端的数据刷新。默认也是30秒。
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            10、构建一个状态改变监听器,当当前应用状态发生改变的时候进行通知。
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            11、如果配置了应该在状态改变的时候进行服务端更新,那就将上面构建的状态改变监听器注册到applicationInfoManager中。
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            } 


           12、启动实例信息赋值到服务端的定时调度。 
          instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

 

3、当该实例化+初始化的类都好了以后,那就开始工作了,怎么工作的呢?spring-cloud-eureka-client中提供了一个生命周期的接口的实现类 EurekaAutoServiceRegistration ,类图如下:

        

        其中 AutoServiceRegistration 是spring-cloud生态中提供的服务自动注册的接口。

        我们主要看看EurekaAutoServiceRegistration这个类中的start()方法,源码如下:

	@Override
	public void start() {
		// only set the port if the nonSecurePort or securePort is 0 and this.port != 0
		if (this.port.get() != 0) {
			if (this.registration.getNonSecurePort() == 0) {
				this.registration.setNonSecurePort(this.port.get());
			}

			if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
				this.registration.setSecurePort(this.port.get());
			}
		}

		// only initialize if nonSecurePort is greater than 0 and it isn't already running
		// because of containerPortInitializer below
		if (!this.running.get() && this.registration.getNonSecurePort() > 0) {

            使用服务注册器注册,这里的注册器就是EurekaServiceRegistry
			this.serviceRegistry.register(this.registration);

			this.context.publishEvent(new InstanceRegisteredEvent<>(this,
					this.registration.getInstanceConfig()));
			this.running.set(true);
		}
	}

           来到服务注册器的注册方法,在 EurekaServiceRegistry ,这个类也实现了spring-cloud中的服务注册接口 ServiceRegistry,源码如下:

@Override
	public void register(EurekaRegistration reg) {
		maybeInitializeClient(reg);

		if (log.isInfoEnabled()) {
			log.info("Registering application "
					+ reg.getApplicationInfoManager().getInfo().getAppName()
					+ " with eureka with status "
					+ reg.getInstanceConfig().getInitialStatus());
		}

        先设置ApplicationInfoManager中的当前实例InstanceInfo的状态为初始状态UP, 也就是
InstanceStatus.UP。在这里设置之前,InstanceInfo的状态是STARTING,也就是
InstanceStatus.STARTING。这里设置实例状态的方法会触发状态改变监听器StatusChangeListener的notify
方法。
		reg.getApplicationInfoManager()
				.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());

        如果有健康检查处理器,那就将其注册到CloudEurekaClient 中,默认情况下是没有健康检查处理器的。
		reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg
				.getEurekaClient().registerHealthCheck(healthCheckHandler));
	}

           我们来看看 ApplicationInfoManager 的 setInstanceStatus(InstanceStatus status)方法:

    public synchronized void setInstanceStatus(InstanceStatus status) {

        1、转换为最新状态,从上面调用栈来说,就是UP状态。
        InstanceStatus next = instanceStatusMapper.map(status);
        if (next == null) {
            return;
        }

        2、获取前面一次的状态,也就是STARTING
        InstanceStatus prev = instanceInfo.setStatus(next);

        3、通知所有的状态改变监听器,状态有STARTING --> UP
        if (prev != null) {
            for (StatusChangeListener listener : listeners.values()) {
                try {
                    listener.notify(new StatusChangeEvent(prev, next));
                } catch (Exception e) {
                    logger.warn("failed to notify listener: {}", listener.getId(), e);
                }
            }
        }
    }

           如果还记得 CloudEurekaClient 在构造函数中构建了一个匿名内部的状态改变监听器的话,这里就简单多了,我们把CloudEurekaClient的父类的构造函数中构建 StatusChangeEvent 以及将监听器注册到ApplicationInfoManager中的核心代码进行展示:

                statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

                           那么理所当然会来到这个匿名对象的notify方法咯:

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {

                    如果最新状态为DOWN 或者 前一次状态为DOWN,就答应警告日志,否则做info日志。 
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }

                    然后使用实例信息复制器进行服务端状态更新通知。
                    instanceInfoReplicator.onDemandUpdate();
                }

                       顺利成章来到 InstanceInfoReplicator 这个类,这个类实现了 Runnable 接口,本生就是一个Task:

    核心实现就是手动开启一条线程去执行InstanceInfoReplicator的run方法。
    public boolean onDemandUpdate() {
        if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
            if (!scheduler.isShutdown()) {
                scheduler.submit(new Runnable() {
                    @Override
                    public void run() {
                        logger.debug("Executing on-demand update of local InstanceInfo");
    
                        Future latestPeriodic = scheduledPeriodicRef.get();
                        if (latestPeriodic != null && !latestPeriodic.isDone()) {
                            logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                            latestPeriodic.cancel(false);
                        }
    
                        InstanceInfoReplicator.this.run();
                    }
                });
                return true;
            } else {
                logger.warn("Ignoring onDemand update due to stopped scheduler");
                return false;
            }
        } else {
            logger.warn("Ignoring onDemand update due to rate limiter");
            return false;
        }
    }

            理所当然来到run()方法:

    public void run() {
        try {

            1、先刷新实例信息,主要就是使用HealthCheckHandler再次获取当前实例的状态并更新实例信息InstanceInfo的状态信息。
            discoveryClient.refreshInstanceInfo();

            2、设置当前instanceInfo的变脏的时间,变脏的含义就是当前实例的信息以及发生改变,还没
通知服务端,所以叫脏的。如果已经在脏的状态的话,在下一次定时复制实例信息到服务端的时候再进行复制。
            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {

                3、使用服务发现客户端进行注册,此处核心。
                discoveryClient.register();
                4、注册后将脏实例设置为不脏。
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {

           4、 不管注册成功还是失败,都会开启间歇性复制当前实例到服务端的定时任务,默认30秒执行一次。
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

            理所当然我们来到discoveryClient的registry()方法:这里的discoveryClient是CloudEurekaClient:

    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {

            使用eureka的传输实例中的注册器客户端发起注册,参数是需要注册的实例信息instanceInfo,
默认是使用AbstractJerseyEurekaHttpClient这个客户但进行http请求的构建与发送。
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }

             理所当然来到 AbstractJerseyEurekaHttpClient的register(InstanceInfo info) : 

    @Override
    public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        try {

            构建请求URL=http://localhost:9090/eureka/apps/{AppName转大写}
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);

            请求服务端并得到响应,这里就会调用服务端ApplicationResource提供的addInstance(...)接口。这就完成了服务注册。
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip")
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .accept(MediaType.APPLICATION_JSON)
                    .post(ClientResponse.class, info);
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }

 

   

4、服务注册讲完了,接下来我们来讲解心跳:在初始化CloudEurekaClient的时候,在其父类的构造函数里面构建了心跳的任务,我们在看看源码:

            // Heartbeat timer
            heartbeatTask = new TimedSupervisorTask(
                    "heartbeat",
                    scheduler,
                    heartbeatExecutor,
                    renewalIntervalInSecs,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new HeartbeatThread()
            );
            scheduler.schedule(
                    heartbeatTask,
                    renewalIntervalInSecs, TimeUnit.SECONDS);

           心跳所做的事情在 HeartbeatThread 这个Task里面,源码如下:

    private class HeartbeatThread implements Runnable {

        public void run() {

            就是发起续约,续约成功的话就修改最后续约成功的时间
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }

           续约的实现如下:

    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {

            发送续约请求,只要响应200就表示续约成功。
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

           发送心跳的实现:

    @Override
    public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
        String urlPath = "apps/" + appName + '/' + id;
        ClientResponse response = null;
        try {

            构建心跳的http请求,url=http://localhost:9090/eureka/apps/CONSUMER/DESKTOP-
IHK6B18:consumer:8080?status=UP&lastDirtyTimestamp=1608777683806,这样就会调用服务端
InstanceResource的renewLease(...)的rest接口。

            WebResource webResource = jerseyClient.resource(serviceUrl)
                    .path(urlPath)
                    .queryParam("status", info.getStatus().toString())
                    .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
            if (overriddenStatus != null) {
                webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
            }
            Builder requestBuilder = webResource.getRequestBuilder();
            addExtraHeaders(requestBuilder);
            response = requestBuilder.put(ClientResponse.class);
            EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
            if (response.hasEntity() &&
                    !HTML.equals(response.getType().getSubtype())) { //don't try and deserialize random html errors from the server
                eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
            }
            return eurekaResponseBuilder.build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }

 

5、心跳讲解完成,我们来分析本地缓存刷新的原理,默认eurekaClient会全量获取服务列表,并缓存在本地,然后使用定时任务去拉取最新数据然后更新本地缓存,这个就是使用缓存更新任务来实现的,我们在CloudEurekaClient初始化的时候知道了,在其父类的构造函数中构建并启动了这个任务,我们来看看源码:

            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new CacheRefreshThread()
            );
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);

             缓存刷新业务在 CacheRefreshThread 中,源码如下:

    class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }

            刷新服务列表实现如下:

    @VisibleForTesting
    void refreshRegistry() {
        try {
            boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

            boolean remoteRegionsModified = false;
            // This makes sure that a dynamic change to remote regions to fetch is honored.
            String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
            if (null != latestRemoteRegions) {
                String currentRemoteRegions = remoteRegionsToFetch.get();
                if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                    // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                    synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                        if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                            String[] remoteRegions = latestRemoteRegions.split(",");
                            remoteRegionsRef.set(remoteRegions);
                            instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                            remoteRegionsModified = true;
                        } else {
                            logger.info("Remote regions to fetch modified concurrently," +
                                    " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                        }
                    }
                } else {
                    // Just refresh mapping to reflect any DNS/Property change
                    instanceRegionChecker.getAzToRegionMapper().refreshMapping();
                }
            }

            1、主要看这里,拉取服务列表,从eureka 服务端。
            boolean success = fetchRegistry(remoteRegionsModified);
            2、如果拉取成功,那就修改当前记录的服务数量,那服务数据怎么更新的呢??,我们进入到fetchRegistry一探究竟。
            if (success) {
                registrySize = localRegionApps.get().size();
                lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
            }

            if (logger.isDebugEnabled()) {
                StringBuilder allAppsHashCodes = new StringBuilder();
                allAppsHashCodes.append("Local region apps hashcode: ");
                allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
                allAppsHashCodes.append(", is fetching remote regions? ");
                allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
                for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                    allAppsHashCodes.append(", Remote region: ");
                    allAppsHashCodes.append(entry.getKey());
                    allAppsHashCodes.append(" , apps hashcode: ");
                    allAppsHashCodes.append(entry.getValue().getAppsHashCode());
                }
                logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                        allAppsHashCodes);
            }
        } catch (Throwable e) {
            logger.error("Cannot fetch registry from server", e);
        }
    }

            理所当然来到 fetchRegistry(boolean forceFullRegistryFetch):

 private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
            // If the delta is disabled or if it is the first time, get all
            // applications
            1、先获取本地缓存的服务
            Applications applications = getApplications();

            2、如果禁用了增量拉取 || 强制全量拉取(默认true) || 本地缓存的服务是空 || 
            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                logger.info("Application is null : {}", (applications == null));
                logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
                logger.info("Application version is -1: {}", (applications.getVersion() == -1));

                3、获取并缓存全量的服务数据。
                getAndStoreFullRegistry();
            } else {

                4、否则就获取增量服务数据然后更新到本地缓存中。
                getAndUpdateDelta(applications);
            }
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }

        // Notify about cache refresh before updating the instance remote status
        onCacheRefreshed();

        // Update remote status based on refreshed data held in the cache
        updateInstanceRemoteStatus();

        // registry was fetched successfully, so return true
        return true;
    }

            我们先来看看全量获取并缓存的实现:

    private void getAndStoreFullRegistry() throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        logger.info("Getting all instance registry info from the eureka server");

        Applications apps = null;
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                  1、获取所有的应用注册服务数据,这里就会调用服务端ApplicationsResource.getContainers(...)的rest接口。
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());

        2、如果获取成功,将注册的应用数据取出。
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }
        logger.info("The response status is {}", httpResponse.getStatusCode());

        3、如果注册的应用数据是空的,那就记录error日志
        if (apps == null) {
            logger.error("The application is null for some reason. Not storing this information");
        4、如果注册的应用数据不是空的,那就设置到本地缓存中。
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            localRegionApps.set(this.filterAndShuffle(apps));
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
            logger.warn("Not updating applications as another thread is updating it already");
        }
    }

             增量的也差不多是这个流程,我们就不查看源码了。

 

 6、服务注册、状态改变复制到服务端、心跳、本地缓存更新都进行了讲解,接下来我们来讲解服务的一种DOWN状态,还是在 EurekaAutoServiceRegistration这个类里面,上面说了,它实现了 SmartLifecycle 接口,因此我们这次看它的stop()方法,源码如下:注意这里只是说EurekaAutoServiceRegistration这个Bean停止工作了,但是不代表真个应用shutdown了,所以这里只是实例状态DOWN,因此操作就是更新当前实例的DOWN状态到服务端。而真正的下线在 CloudEurekaClient 的 shutdown()方法中:

	@Override
	public void stop() {
		this.serviceRegistry.deregister(this.registration);
		this.running.set(false);
	}

            服务由于EurekaAutoServiceRegistration Bean停止工作而状态为DOWN的源码如下:

	@Override
	public void deregister(EurekaRegistration reg) {
		if (reg.getApplicationInfoManager().getInfo() != null) {

			if (log.isInfoEnabled()) {
				log.info("Unregistering application "
						+ reg.getApplicationInfoManager().getInfo().getAppName()
						+ " with eureka with status DOWN");
			}

            设置实例的状态为DOWN状态,然后还是会触发StatusChangeListener,然后还是调用到实例复制器的run()方法,进行实例状态刷新然后注册,只不过这个时候状态是DOWN。
			reg.getApplicationInfoManager()
					.setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);

			// shutdown of eureka client should happen with EurekaRegistration.close()
			// auto registration will create a bean which will be properly disposed
			// manual registrations will need to call close()
		}
	}



InstanceInfoReplicator的run方法,上面服务注册的时候也是讲解过:这个时候由于实例的状态是DOWN,所
以会发起实例信息复制到服务端也就是重新注册。
    public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

             

    7、服务的下线前面提了一嘴,也就是当 CloudEurekaClient 这个Bean被销毁的时候会进行服务下线,在配置 CloudEurekaClient 这个Bean的时候定义的其 destroyMethod属性:

		@Bean(destroyMethod = "shutdown")  定义了销毁的方法
		@ConditionalOnMissingBean(value = EurekaClient.class,
				search = SearchStrategy.CURRENT)
		@org.springframework.cloud.context.config.annotation.RefreshScope
		@Lazy
		public EurekaClient eurekaClient(ApplicationInfoManager manager,
				EurekaClientConfig config, EurekaInstanceConfig instance,
				@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
			// If we use the proxy of the ApplicationInfoManager we could run into a
			// problem
			// when shutdown is called on the CloudEurekaClient where the
			// ApplicationInfoManager bean is
			// requested but wont be allowed because we are shutting down. To avoid this
			// we use the
			// object directly.
			ApplicationInfoManager appManager;
			if (AopUtils.isAopProxy(manager)) {
				appManager = ProxyUtils.getTargetObject(manager);
			}
			else {
				appManager = manager;
			}
			CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
					config, this.optionalArgs, this.context);
			cloudEurekaClient.registerHealthCheck(healthCheckHandler);
			return cloudEurekaClient;
		}

                  来到销毁的方法:

    @PreDestroy
    @Override
    public synchronized void shutdown() {
        if (isShutdown.compareAndSet(false, true)) {
            logger.info("Shutting down DiscoveryClient ...");

            删除之前注册的状态改变监听器
            if (statusChangeListener != null && applicationInfoManager != null) {
                applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
            }

            取消定时任务
            cancelScheduledTasks();

            // If APPINFO was registered
            if (applicationInfoManager != null
                    && clientConfig.shouldRegisterWithEureka()
                    && clientConfig.shouldUnregisterOnShutdown()) {
                applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);

                向服务端发起实例下线请求
                unregister();
            }

            if (eurekaTransport != null) {
                eurekaTransport.shutdown();
            }

            heartbeatStalenessMonitor.shutdown();
            registryStalenessMonitor.shutdown();

            Monitors.unregisterObject(this);

            logger.info("Completed shut down of DiscoveryClient");
        }
    }

            下线请求实现:

    void unregister() {
        // It can be null if shouldRegisterWithEureka == false
        if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
            try {
                logger.info("Unregistering ...");

                调用服务端的服务取消接口,这里将会调用服务端的"apps/" + appName + '/' + id请求,
请求类型为DELETE,也就是调用服务端InstanceResource.cancelLease(...)rest接口。这样就将服务进行了下
线。当然这是正常的下线方式,暴力下线将是使用服务端的剔除的定时任务(默认60s剔除90s内没有续约的实例)来实现的
                EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
                logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
            } catch (Exception e) {
                logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
            }
        }
    }

 

8、ribbon 如何与eurekaClient进行整合的:

      ribbon如何让获取到eurekaClient缓存在本地的服务列表数据呢? 我们知道eurekaClient会将服务数据缓存在 DiscoveryClient 的 localRegionApps的实例中:

    private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();

      这个缓存会使用定时任务进行刷新,那么ribbon是如何来获取这个本地缓存的呢?我们知道在ribbon中使用了Netflix 中的ServerList 中提供了这个接口来更新服务提供者数据,那么eureka里面是不是提供了相关的实现呢? 我们来看看ServerList 类图:不止eureka做了实现,zookeeper、consul相关的集成也实现了相关的服务提供者获取。

                        

                     在spring-cloud层面实现类是 org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList 源码入下:

public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {

    真正的服务列表实现类是ribbon-eureka 中的DiscoveryEnabledNIWSServerList
	private ServerList<DiscoveryEnabledServer> list;

	private final RibbonProperties ribbon;

	private boolean approximateZoneFromHostname;

	public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
			IClientConfig clientConfig, boolean approximateZoneFromHostname) {
		this.list = list;
		this.ribbon = RibbonProperties.from(clientConfig);
		this.approximateZoneFromHostname = approximateZoneFromHostname;
	}

	@Override
	public List<DiscoveryEnabledServer> getInitialListOfServers() {
		List<DiscoveryEnabledServer> servers = setZones(
				this.list.getInitialListOfServers());
		return servers;
	}

	@Override
	public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
		List<DiscoveryEnabledServer> servers = setZones(
				this.list.getUpdatedListOfServers());
		return servers;
	}

    . . .
}

      因此我们来到 DiscoveryEnabledNIWSServerList中:我们主要看实现的ServerList接口的 getUpdatedListOfServers()方法:


    public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
        return this.obtainServersViaDiscovery();
    }

          

    private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList();
        if (this.eurekaClientProvider != null && this.eurekaClientProvider.get() != null) {

           1、 获取一个eurekaClient
            EurekaClient eurekaClient = (EurekaClient)this.eurekaClientProvider.get();
            if (this.vipAddresses != null) {
                String[] var3 = this.vipAddresses.split(",");
                int var4 = var3.length;

                for(int var5 = 0; var5 < var4; ++var5) {
                    String vipAddress = var3[var5];

                    2、获取实例列表数据,此处核心
                    List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, this.isSecure, this.targetRegion);
                    Iterator var8 = listOfInstanceInfo.iterator();

                    while(var8.hasNext()) {
                        InstanceInfo ii = (InstanceInfo)var8.next();
                        if (ii.getStatus().equals(InstanceStatus.UP)) {
                            if (this.shouldUseOverridePort) {
                                if (logger.isDebugEnabled()) {
                                    logger.debug("Overriding port on client name: " + this.clientName + " to " + this.overridePort);
                                }

                                InstanceInfo copy = new InstanceInfo(ii);
                                if (this.isSecure) {
                                    ii = (new Builder(copy)).setSecurePort(this.overridePort).build();
                                } else {
                                    ii = (new Builder(copy)).setPort(this.overridePort).build();
                                }
                            }

                            DiscoveryEnabledServer des = this.createServer(ii, this.isSecure, this.shouldUseIpAddr);
                            serverList.add(des);
                        }
                    }

                    if (serverList.size() > 0 && this.prioritizeVipAddressBasedServers) {
                        break;
                    }
                }
            }

            3、返回服务列表数据
            return serverList;
        } else {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList();
        }
    }

            获取实例数据通过VipAddress的实现:

    @Override
    public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure,
                                                       @Nullable String region) {
        if (vipAddress == null) {
            throw new IllegalArgumentException(
                    "Supplied VIP Address cannot be null");
        }
        Applications applications;
        if (instanceRegionChecker.isLocalRegion(region)) {

            看见没,在这里,直接获取当前EurekaClient中缓存的服务数据,毫无疑问此处的
EurekaClient就是之前配置的CloudEurekaClient,那么此处获取的服务列表数据也肯定是从服务端拉取后缓
存的数据。
            applications = this.localRegionApps.get();
        } else {
            applications = remoteRegionVsApps.get(region);
            if (null == applications) {
                logger.debug("No applications are defined for region {}, so returning an empty instance list for vip "
                        + "address {}.", region, vipAddress);
                return Collections.emptyList();
            }
        }

        if (!secure) {
            return applications.getInstancesByVirtualHostName(vipAddress);
        } else {
            return applications.getInstancesBySecureVirtualHostName(vipAddress);

        }

    }

 

以上就是真个eureka-client的原理分析。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

标签:info,clientConfig,spring,Eureka,---,new,logger,null,public
来源: https://blog.csdn.net/qq_34978129/article/details/111544848