Eureka(4)--->spring-cloud生态中对服务发下的抽象以及eurekaDiscoveryClient的实现原理
作者:互联网
1、spring-cloud生态中对服务发下的抽象核心如下:
主要发接口就是 DiscoveryClient 以及注解 @EnableDiscoveryClient
2、EurekaDiscoveryClient的实现原理:
EurekaDiscoveryClient 实现了spring-cloud生态中定义的 DiscoveryClient 接口,EurekaDiscoveryClient的核心属性就是Netflix的EurekaClient、Netflix的EurekaClientConfig。
2.1、自动装配EurekaDiscoveryClient的原理解析,在 EurekaDiscoveryClientConfiguration 中如下源码:
@Bean
@ConditionalOnMissingBean
public EurekaDiscoveryClient discoveryClient(EurekaClient client,
EurekaClientConfig clientConfig) {
使用一个Netflix的EurekaClient + 客户端配置类来实例化一个EurekaDiscoveryClient
return new EurekaDiscoveryClient(client, clientConfig);
}
那么问题来了这个 Netflix的EurekaClient、Netflix的EurekaClientConfig是如何自动装配的呢????我们来到 EurekaClientAutoConfiguration 自动配置类中发现如下源码:
配置一个EurekaClientConfigBean 的Bean, EurekaClientConfigBean 实现了EurekaClientConfig接口,因此也是一个Netflix的EurekaClientConfig实例。
@Bean
@ConditionalOnMissingBean(value = EurekaClientConfig.class,
search = SearchStrategy.CURRENT)
public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
return new EurekaClientConfigBean();
}
不需要动态刷新的EurekaClient:配置一个Netflix的EurekaClient Bean, 默认使用CloudEurekaClient实例,这个Bean是不需要动态刷新的。通过eureka.client.refresh.enable这个配置像可以进行设置。
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,
search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
所以还有一个需要动态刷新的Netflix 的EurekaClient实例的配置:
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,
search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config, EurekaInstanceConfig instance,
@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
// If we use the proxy of the ApplicationInfoManager we could run into a
// problem
// when shutdown is called on the CloudEurekaClient where the
// ApplicationInfoManager bean is
// requested but wont be allowed because we are shutting down. To avoid this
// we use the
// object directly.
ApplicationInfoManager appManager;
if (AopUtils.isAopProxy(manager)) {
appManager = ProxyUtils.getTargetObject(manager);
}
else {
appManager = manager;
}
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
config, this.optionalArgs, this.context);
cloudEurekaClient.registerHealthCheck(healthCheckHandler);
return cloudEurekaClient;
}
默认是配置动态刷新的 CloudEurekaClient,因为 eureka.client.refresh.enable 默认是true。
以上就完成了EurekaDiscoveryClient实例的自动装配。
2.2、EurekaDiscoveryClient 实例中的Netflix的EurekaClient实例初始化原理解析,为什么来说这个呢?那是因为 EurekaClient 初始化的时候做了很多事情,接下来我们来揭晓:
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
config, this.optionalArgs, this.context);
我么来到 CloudEurekaClient 的构造函数中:
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,
ApplicationEventPublisher publisher) {
super(applicationInfoManager, config, args);
this.applicationInfoManager = applicationInfoManager;
this.publisher = publisher;
this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
"eurekaTransport");
ReflectionUtils.makeAccessible(this.eurekaTransportField);
}
来到父类中的构造函数:
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
this(applicationInfoManager, config, args, ResolverUtils::randomize);
}
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
private volatile BackupRegistry backupRegistryInstance;
@Override
public synchronized BackupRegistry get() {
if (backupRegistryInstance == null) {
String backupRegistryClassName = config.getBackupRegistryImpl();
if (null != backupRegistryClassName) {
try {
backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
} catch (InstantiationException e) {
logger.error("Error instantiating BackupRegistry.", e);
} catch (IllegalAccessException e) {
logger.error("Error instantiating BackupRegistry.", e);
} catch (ClassNotFoundException e) {
logger.error("Error instantiating BackupRegistry.", e);
}
}
if (backupRegistryInstance == null) {
logger.warn("Using default backup registry implementation which does not do anything.");
backupRegistryInstance = new NotImplementedRegistryImpl();
}
}
return backupRegistryInstance;
}
}, randomizer);
}
再来到this(...)方法中:藏得够深吧,这个方法里面做了注册、心跳定时任务初始化,本地服务列表缓存更新任务初始化等核心操作。
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
if (args != null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
this.preRegistrationHandler = args.preRegistrationHandler;
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
this.preRegistrationHandler = null;
}
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
this.backupRegistryProvider = backupRegistryProvider;
this.endpointRandomizer = endpointRandomizer;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
如果需要从服务端拉取服务
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
如果需要注册到eureka 服务端
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
如果不从eureka服务端拉取数据,也注册到eureka服务端,那就不开启心跳、本地缓存刷新等定时任务。
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
initRegistrySize = this.getApplications().size();
registrySize = initRegistrySize;
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, initRegistrySize);
return; // no need to setup up an network tasks and we are done
}
如果既需要从eureka服务端拉取数据,也需要注册到服务端,那么就初始化相关定时任务。
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
1、创建一个策略类型的执行器
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
2、创建一个心跳执行器
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
3、创建一个缓存刷新执行器,缓存指的是,客户端从服务端获取的数据进行缓存,然后会定时去重新获取进行刷新。
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
if (clientConfig.shouldFetchRegistry()) {
try {
boolean primaryFetchRegistryResult = fetchRegistry(false);
if (!primaryFetchRegistryResult) {
logger.info("Initial registry fetch from primary servers failed");
}
boolean backupFetchRegistryResult = true;
if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
backupFetchRegistryResult = false;
logger.info("Initial registry fetch from backup servers failed");
}
if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
}
} catch (Throwable th) {
logger.error("Fetch registry error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
如果需要在初始化阶段进行服务注册,那就进行注册,但是如果注册失败,服务将启动失败。
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
最后 初始化定时任务,例如:集群解析、心跳、拉取数据更新缓存等定时调度。
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
initRegistrySize = this.getApplications().size();
registrySize = initRegistrySize;
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, initRegistrySize);
}
初始化定时调度的实现如下:
private void initScheduledTasks() {
1、如果需要从服务端拉取数据
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
2、先获取配置的 客户端拉取服务数据的间格时间(默认30秒)。
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
3、使用调度器+缓存刷新执行器来构建一个缓存刷新任务TimedSupervisorTask,之所以这样做的原因是失败重试的时候 做衰减重试。
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
4、 启动缓存刷新定时任务,默认30s执行一次。
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
5、如果需要将自己注册到服务端
if (clientConfig.shouldRegisterWithEureka()) {
6、获取服务续约的间隔时间(默认30秒)
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
7、使用调度器 + 心跳执行器 来构建一个心跳定时任务,原理与缓存刷新一直,做重试衰减。
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
8、启动心跳定时任务。
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
9、构建一个实例信息复制器,作用就是在实例信息发生改变的时候,进行服务端的数据刷新。默认也是30秒。
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
10、构建一个状态改变监听器,当当前应用状态发生改变的时候进行通知。
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
11、如果配置了应该在状态改变的时候进行服务端更新,那就将上面构建的状态改变监听器注册到applicationInfoManager中。
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
12、启动实例信息赋值到服务端的定时调度。
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
3、当该实例化+初始化的类都好了以后,那就开始工作了,怎么工作的呢?spring-cloud-eureka-client中提供了一个生命周期的接口的实现类 EurekaAutoServiceRegistration ,类图如下:
其中 AutoServiceRegistration 是spring-cloud生态中提供的服务自动注册的接口。
我们主要看看EurekaAutoServiceRegistration这个类中的start()方法,源码如下:
@Override
public void start() {
// only set the port if the nonSecurePort or securePort is 0 and this.port != 0
if (this.port.get() != 0) {
if (this.registration.getNonSecurePort() == 0) {
this.registration.setNonSecurePort(this.port.get());
}
if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
this.registration.setSecurePort(this.port.get());
}
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
使用服务注册器注册,这里的注册器就是EurekaServiceRegistry
this.serviceRegistry.register(this.registration);
this.context.publishEvent(new InstanceRegisteredEvent<>(this,
this.registration.getInstanceConfig()));
this.running.set(true);
}
}
来到服务注册器的注册方法,在 EurekaServiceRegistry ,这个类也实现了spring-cloud中的服务注册接口 ServiceRegistry,源码如下:
@Override
public void register(EurekaRegistration reg) {
maybeInitializeClient(reg);
if (log.isInfoEnabled()) {
log.info("Registering application "
+ reg.getApplicationInfoManager().getInfo().getAppName()
+ " with eureka with status "
+ reg.getInstanceConfig().getInitialStatus());
}
先设置ApplicationInfoManager中的当前实例InstanceInfo的状态为初始状态UP, 也就是
InstanceStatus.UP。在这里设置之前,InstanceInfo的状态是STARTING,也就是
InstanceStatus.STARTING。这里设置实例状态的方法会触发状态改变监听器StatusChangeListener的notify
方法。
reg.getApplicationInfoManager()
.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
如果有健康检查处理器,那就将其注册到CloudEurekaClient 中,默认情况下是没有健康检查处理器的。
reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg
.getEurekaClient().registerHealthCheck(healthCheckHandler));
}
我们来看看 ApplicationInfoManager 的 setInstanceStatus(InstanceStatus status)方法:
public synchronized void setInstanceStatus(InstanceStatus status) {
1、转换为最新状态,从上面调用栈来说,就是UP状态。
InstanceStatus next = instanceStatusMapper.map(status);
if (next == null) {
return;
}
2、获取前面一次的状态,也就是STARTING
InstanceStatus prev = instanceInfo.setStatus(next);
3、通知所有的状态改变监听器,状态有STARTING --> UP
if (prev != null) {
for (StatusChangeListener listener : listeners.values()) {
try {
listener.notify(new StatusChangeEvent(prev, next));
} catch (Exception e) {
logger.warn("failed to notify listener: {}", listener.getId(), e);
}
}
}
}
如果还记得 CloudEurekaClient 在构造函数中构建了一个匿名内部的状态改变监听器的话,这里就简单多了,我们把CloudEurekaClient的父类的构造函数中构建 StatusChangeEvent 以及将监听器注册到ApplicationInfoManager中的核心代码进行展示:
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
那么理所当然会来到这个匿名对象的notify方法咯:
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
如果最新状态为DOWN 或者 前一次状态为DOWN,就答应警告日志,否则做info日志。
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
然后使用实例信息复制器进行服务端状态更新通知。
instanceInfoReplicator.onDemandUpdate();
}
顺利成章来到 InstanceInfoReplicator 这个类,这个类实现了 Runnable 接口,本生就是一个Task:
核心实现就是手动开启一条线程去执行InstanceInfoReplicator的run方法。
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
if (!scheduler.isShutdown()) {
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);
}
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to stopped scheduler");
return false;
}
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}
理所当然来到run()方法:
public void run() {
try {
1、先刷新实例信息,主要就是使用HealthCheckHandler再次获取当前实例的状态并更新实例信息InstanceInfo的状态信息。
discoveryClient.refreshInstanceInfo();
2、设置当前instanceInfo的变脏的时间,变脏的含义就是当前实例的信息以及发生改变,还没
通知服务端,所以叫脏的。如果已经在脏的状态的话,在下一次定时复制实例信息到服务端的时候再进行复制。
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
3、使用服务发现客户端进行注册,此处核心。
discoveryClient.register();
4、注册后将脏实例设置为不脏。
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
4、 不管注册成功还是失败,都会开启间歇性复制当前实例到服务端的定时任务,默认30秒执行一次。
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
理所当然我们来到discoveryClient的registry()方法:这里的discoveryClient是CloudEurekaClient:
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
使用eureka的传输实例中的注册器客户端发起注册,参数是需要注册的实例信息instanceInfo,
默认是使用AbstractJerseyEurekaHttpClient这个客户但进行http请求的构建与发送。
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
理所当然来到 AbstractJerseyEurekaHttpClient的register(InstanceInfo info) :
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
构建请求URL=http://localhost:9090/eureka/apps/{AppName转大写}
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
请求服务端并得到响应,这里就会调用服务端ApplicationResource提供的addInstance(...)接口。这就完成了服务注册。
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
4、服务注册讲完了,接下来我们来讲解心跳:在初始化CloudEurekaClient的时候,在其父类的构造函数里面构建了心跳的任务,我们在看看源码:
// Heartbeat timer
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
心跳所做的事情在 HeartbeatThread 这个Task里面,源码如下:
private class HeartbeatThread implements Runnable {
public void run() {
就是发起续约,续约成功的话就修改最后续约成功的时间
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
续约的实现如下:
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
发送续约请求,只要响应200就表示续约成功。
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
发送心跳的实现:
@Override
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
String urlPath = "apps/" + appName + '/' + id;
ClientResponse response = null;
try {
构建心跳的http请求,url=http://localhost:9090/eureka/apps/CONSUMER/DESKTOP-
IHK6B18:consumer:8080?status=UP&lastDirtyTimestamp=1608777683806,这样就会调用服务端
InstanceResource的renewLease(...)的rest接口。
WebResource webResource = jerseyClient.resource(serviceUrl)
.path(urlPath)
.queryParam("status", info.getStatus().toString())
.queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
if (overriddenStatus != null) {
webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.put(ClientResponse.class);
EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
if (response.hasEntity() &&
!HTML.equals(response.getType().getSubtype())) { //don't try and deserialize random html errors from the server
eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
}
return eurekaResponseBuilder.build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
5、心跳讲解完成,我们来分析本地缓存刷新的原理,默认eurekaClient会全量获取服务列表,并缓存在本地,然后使用定时任务去拉取最新数据然后更新本地缓存,这个就是使用缓存更新任务来实现的,我们在CloudEurekaClient初始化的时候知道了,在其父类的构造函数中构建并启动了这个任务,我们来看看源码:
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
缓存刷新业务在 CacheRefreshThread 中,源码如下:
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
刷新服务列表实现如下:
@VisibleForTesting
void refreshRegistry() {
try {
boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
boolean remoteRegionsModified = false;
// This makes sure that a dynamic change to remote regions to fetch is honored.
String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
if (null != latestRemoteRegions) {
String currentRemoteRegions = remoteRegionsToFetch.get();
if (!latestRemoteRegions.equals(currentRemoteRegions)) {
// Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
synchronized (instanceRegionChecker.getAzToRegionMapper()) {
if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
String[] remoteRegions = latestRemoteRegions.split(",");
remoteRegionsRef.set(remoteRegions);
instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
remoteRegionsModified = true;
} else {
logger.info("Remote regions to fetch modified concurrently," +
" ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
}
}
} else {
// Just refresh mapping to reflect any DNS/Property change
instanceRegionChecker.getAzToRegionMapper().refreshMapping();
}
}
1、主要看这里,拉取服务列表,从eureka 服务端。
boolean success = fetchRegistry(remoteRegionsModified);
2、如果拉取成功,那就修改当前记录的服务数量,那服务数据怎么更新的呢??,我们进入到fetchRegistry一探究竟。
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
if (logger.isDebugEnabled()) {
StringBuilder allAppsHashCodes = new StringBuilder();
allAppsHashCodes.append("Local region apps hashcode: ");
allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
allAppsHashCodes.append(", is fetching remote regions? ");
allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
allAppsHashCodes.append(", Remote region: ");
allAppsHashCodes.append(entry.getKey());
allAppsHashCodes.append(" , apps hashcode: ");
allAppsHashCodes.append(entry.getValue().getAppsHashCode());
}
logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
allAppsHashCodes);
}
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
理所当然来到 fetchRegistry(boolean forceFullRegistryFetch):
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
1、先获取本地缓存的服务
Applications applications = getApplications();
2、如果禁用了增量拉取 || 强制全量拉取(默认true) || 本地缓存的服务是空 ||
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
3、获取并缓存全量的服务数据。
getAndStoreFullRegistry();
} else {
4、否则就获取增量服务数据然后更新到本地缓存中。
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
我们先来看看全量获取并缓存的实现:
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
1、获取所有的应用注册服务数据,这里就会调用服务端ApplicationsResource.getContainers(...)的rest接口。
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
2、如果获取成功,将注册的应用数据取出。
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
3、如果注册的应用数据是空的,那就记录error日志
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
4、如果注册的应用数据不是空的,那就设置到本地缓存中。
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
增量的也差不多是这个流程,我们就不查看源码了。
6、服务注册、状态改变复制到服务端、心跳、本地缓存更新都进行了讲解,接下来我们来讲解服务的一种DOWN状态,还是在 EurekaAutoServiceRegistration这个类里面,上面说了,它实现了 SmartLifecycle 接口,因此我们这次看它的stop()方法,源码如下:注意这里只是说EurekaAutoServiceRegistration这个Bean停止工作了,但是不代表真个应用shutdown了,所以这里只是实例状态DOWN,因此操作就是更新当前实例的DOWN状态到服务端。而真正的下线在 CloudEurekaClient 的 shutdown()方法中:
@Override
public void stop() {
this.serviceRegistry.deregister(this.registration);
this.running.set(false);
}
服务由于EurekaAutoServiceRegistration Bean停止工作而状态为DOWN的源码如下:
@Override
public void deregister(EurekaRegistration reg) {
if (reg.getApplicationInfoManager().getInfo() != null) {
if (log.isInfoEnabled()) {
log.info("Unregistering application "
+ reg.getApplicationInfoManager().getInfo().getAppName()
+ " with eureka with status DOWN");
}
设置实例的状态为DOWN状态,然后还是会触发StatusChangeListener,然后还是调用到实例复制器的run()方法,进行实例状态刷新然后注册,只不过这个时候状态是DOWN。
reg.getApplicationInfoManager()
.setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);
// shutdown of eureka client should happen with EurekaRegistration.close()
// auto registration will create a bean which will be properly disposed
// manual registrations will need to call close()
}
}
InstanceInfoReplicator的run方法,上面服务注册的时候也是讲解过:这个时候由于实例的状态是DOWN,所
以会发起实例信息复制到服务端也就是重新注册。
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
7、服务的下线前面提了一嘴,也就是当 CloudEurekaClient 这个Bean被销毁的时候会进行服务下线,在配置 CloudEurekaClient 这个Bean的时候定义的其 destroyMethod属性:
@Bean(destroyMethod = "shutdown") 定义了销毁的方法
@ConditionalOnMissingBean(value = EurekaClient.class,
search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config, EurekaInstanceConfig instance,
@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
// If we use the proxy of the ApplicationInfoManager we could run into a
// problem
// when shutdown is called on the CloudEurekaClient where the
// ApplicationInfoManager bean is
// requested but wont be allowed because we are shutting down. To avoid this
// we use the
// object directly.
ApplicationInfoManager appManager;
if (AopUtils.isAopProxy(manager)) {
appManager = ProxyUtils.getTargetObject(manager);
}
else {
appManager = manager;
}
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
config, this.optionalArgs, this.context);
cloudEurekaClient.registerHealthCheck(healthCheckHandler);
return cloudEurekaClient;
}
来到销毁的方法:
@PreDestroy
@Override
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
删除之前注册的状态改变监听器
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
取消定时任务
cancelScheduledTasks();
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
向服务端发起实例下线请求
unregister();
}
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
Monitors.unregisterObject(this);
logger.info("Completed shut down of DiscoveryClient");
}
}
下线请求实现:
void unregister() {
// It can be null if shouldRegisterWithEureka == false
if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {
logger.info("Unregistering ...");
调用服务端的服务取消接口,这里将会调用服务端的"apps/" + appName + '/' + id请求,
请求类型为DELETE,也就是调用服务端InstanceResource.cancelLease(...)rest接口。这样就将服务进行了下
线。当然这是正常的下线方式,暴力下线将是使用服务端的剔除的定时任务(默认60s剔除90s内没有续约的实例)来实现的
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
} catch (Exception e) {
logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
}
}
}
8、ribbon 如何与eurekaClient进行整合的:
ribbon如何让获取到eurekaClient缓存在本地的服务列表数据呢? 我们知道eurekaClient会将服务数据缓存在 DiscoveryClient 的 localRegionApps的实例中:
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
这个缓存会使用定时任务进行刷新,那么ribbon是如何来获取这个本地缓存的呢?我们知道在ribbon中使用了Netflix 中的ServerList 中提供了这个接口来更新服务提供者数据,那么eureka里面是不是提供了相关的实现呢? 我们来看看ServerList 类图:不止eureka做了实现,zookeeper、consul相关的集成也实现了相关的服务提供者获取。
在spring-cloud层面实现类是 org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList 源码入下:
public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {
真正的服务列表实现类是ribbon-eureka 中的DiscoveryEnabledNIWSServerList
private ServerList<DiscoveryEnabledServer> list;
private final RibbonProperties ribbon;
private boolean approximateZoneFromHostname;
public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
IClientConfig clientConfig, boolean approximateZoneFromHostname) {
this.list = list;
this.ribbon = RibbonProperties.from(clientConfig);
this.approximateZoneFromHostname = approximateZoneFromHostname;
}
@Override
public List<DiscoveryEnabledServer> getInitialListOfServers() {
List<DiscoveryEnabledServer> servers = setZones(
this.list.getInitialListOfServers());
return servers;
}
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
List<DiscoveryEnabledServer> servers = setZones(
this.list.getUpdatedListOfServers());
return servers;
}
. . .
}
因此我们来到 DiscoveryEnabledNIWSServerList中:我们主要看实现的ServerList接口的 getUpdatedListOfServers()方法:
public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
return this.obtainServersViaDiscovery();
}
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList();
if (this.eurekaClientProvider != null && this.eurekaClientProvider.get() != null) {
1、 获取一个eurekaClient
EurekaClient eurekaClient = (EurekaClient)this.eurekaClientProvider.get();
if (this.vipAddresses != null) {
String[] var3 = this.vipAddresses.split(",");
int var4 = var3.length;
for(int var5 = 0; var5 < var4; ++var5) {
String vipAddress = var3[var5];
2、获取实例列表数据,此处核心
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, this.isSecure, this.targetRegion);
Iterator var8 = listOfInstanceInfo.iterator();
while(var8.hasNext()) {
InstanceInfo ii = (InstanceInfo)var8.next();
if (ii.getStatus().equals(InstanceStatus.UP)) {
if (this.shouldUseOverridePort) {
if (logger.isDebugEnabled()) {
logger.debug("Overriding port on client name: " + this.clientName + " to " + this.overridePort);
}
InstanceInfo copy = new InstanceInfo(ii);
if (this.isSecure) {
ii = (new Builder(copy)).setSecurePort(this.overridePort).build();
} else {
ii = (new Builder(copy)).setPort(this.overridePort).build();
}
}
DiscoveryEnabledServer des = this.createServer(ii, this.isSecure, this.shouldUseIpAddr);
serverList.add(des);
}
}
if (serverList.size() > 0 && this.prioritizeVipAddressBasedServers) {
break;
}
}
}
3、返回服务列表数据
return serverList;
} else {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList();
}
}
获取实例数据通过VipAddress的实现:
@Override
public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure,
@Nullable String region) {
if (vipAddress == null) {
throw new IllegalArgumentException(
"Supplied VIP Address cannot be null");
}
Applications applications;
if (instanceRegionChecker.isLocalRegion(region)) {
看见没,在这里,直接获取当前EurekaClient中缓存的服务数据,毫无疑问此处的
EurekaClient就是之前配置的CloudEurekaClient,那么此处获取的服务列表数据也肯定是从服务端拉取后缓
存的数据。
applications = this.localRegionApps.get();
} else {
applications = remoteRegionVsApps.get(region);
if (null == applications) {
logger.debug("No applications are defined for region {}, so returning an empty instance list for vip "
+ "address {}.", region, vipAddress);
return Collections.emptyList();
}
}
if (!secure) {
return applications.getInstancesByVirtualHostName(vipAddress);
} else {
return applications.getInstancesBySecureVirtualHostName(vipAddress);
}
}
以上就是真个eureka-client的原理分析。
标签:info,clientConfig,spring,Eureka,---,new,logger,null,public 来源: https://blog.csdn.net/qq_34978129/article/details/111544848