springCloud-Eureka—服务注册与服务续约(二)
作者:互联网
系列文章目录
springCloud实践
springCloud实践之浅谈Feign原理
springCloud-Eureka(一)
springCloud-Eureka—服务注册与服务续约(二)
springCloud-Eureka—服务注销与剔除(三)
springCloud-Eureka—服务获取(四)
springCloud-Eureka—服务同步(五)
文章目录
前言
前一篇文章我们介绍了eureka的特点、适用场景以及数据的存储结构,本文继上文结合源码讲解eureka的服务注册、续约机制
一、服务注册机制
服务提供者、服务消费者以及注册中心自己,启动后都会向注册中心注册服务(配置了注册)
注册中心服务接收到register请求后:
- 保存服务信息,将服务信息保存到registry中;
- 更新队列,将此事件添加到更新队列中,供eureka Client增量同步服务信息使用;
- 清空二级缓存,即readWriteCacheMap,用于保证数据一致性;
- 更新阈值,供剔除服务使用;
- 同步服务信息,将此事件同步至其他的Eureka Server节点。
我应用的是netflix版spring-cloud-starter-netflix-eureka-server,它整合了spring-cloud-netflix-eureka-client
入口:包引入后,通过META-INF/spring.factories里配置的
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration完成自动注入+注册
@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@Import(DiscoveryClientOptionalArgsConfiguration.class)
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"})
public class EurekaClientAutoConfiguration
// 构造实例信息并注入
@Bean
public EurekaServiceRegistry eurekaServiceRegistry() {
return new EurekaServiceRegistry();
}
//通过上面这个注解的自动配置先构建实例信息,然后@AutoConfigureAfter进入下一步
@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
public class EurekaDiscoveryClientConfiguration{
class Marker {}
@Bean
public Marker eurekaDiscoverClientMarker() {
return new Marker();
}
@Configuration
@ConditionalOnClass(RefreshScopeRefreshedEvent.class)
protected static class EurekaClientConfigurationRefresher {
@Autowired(required = false)
private EurekaClient eurekaClient;
@Autowired(required = false)
private EurekaAutoServiceRegistration autoRegistration;
//这个类完成了服务的真正注册
@EventListener(RefreshScopeRefreshedEvent.class)
public void onApplicationEvent(RefreshScopeRefreshedEvent event) {
//This will force the creation of the EurkaClient bean if not already created
//to make sure the client will be reregistered after a refresh event
if(eurekaClient != null) {
eurekaClient.getApplications();
}
if (autoRegistration != null) {
// register in case meta data changed
this.autoRegistration.stop();
this.autoRegistration.start();
}
}
}
}
//EurekaAutoServiceRegistration 类
public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered {
/**
* 省略部分代码
*/
@Override
public void start() {
// only set the port if the nonSecurePort or securePort is 0 and this.port != 0
if (this.port.get() != 0) {
if (this.registration.getNonSecurePort() == 0) {
this.registration.setNonSecurePort(this.port.get());
}
if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
this.registration.setSecurePort(this.port.get());
}
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
//完成注册
this.serviceRegistry.register(this.registration);
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
this.running.set(true);
}
}
}
//EurekaRegistration类
@Bean
@org.springframework.cloud.context.config.annotation.RefreshScope
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(
value = "spring.cloud.service-registry.auto-registration.enabled",
matchIfMissing = true)
public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
CloudEurekaInstanceConfig instanceConfig,
ApplicationInfoManager applicationInfoManager, @Autowired(
required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
// 注册方法跟进去看到的是代理类$Proxy,代理的目标对象是
// interface com.netflix.discovery.EurekaClient
return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager)
.with(eurekaClient).with(healthCheckHandler).build();
}
// 查看EurekaClient代码
@ImplementedBy(DiscoveryClient.class)
public interface EurekaClient extends LookupService {
这里可以看到引入的是DiscoveryClient类
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
if (args != null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
this.preRegistrationHandler = args.preRegistrationHandler;
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
this.preRegistrationHandler = null;
}
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
//客户端路径唯一标识符:xx-service/120.0.0.1:8080
if (myInfo != null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
this.backupRegistryProvider = backupRegistryProvider;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
localRegionApps.set(new Applications());
//获取远程eureka服务注册列表的次数
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
//判断配置是否需要获取远程eureka服务注册列表
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
//判断是否需要注册到远程eureka服务
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
//客户端配置为既不注册也不查询数据
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
return; // no need to setup up an network tasks and we are done
}
//# 开始定义定时任务调度器 scheduler
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
//通过定时调度每隔30s,处理心跳
//线程池的核心线程数为1,最大线程数默认为2(HeartbeatExecutorThreadPoolSize)
//使用同步队列:SynchronousQueue,每次提交都要阻塞等待处理
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
//通过定时调度每隔30s,处理eureka提供的服务列表缓存
//线程池的核心线程数为1,最大线程数默认为2(HeartbeatExecutorThreadPoolSize)
//使用同步队列:SynchronousQueue,每次提交都要阻塞等待处理
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
//初始化一个eureka请求传输器
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
//如果配置需要注册为eureka的服务,并且初始化强制注册,则进行注册
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
//最后,初始化调度任务(例如,集群解析器、心跳、instanceInfo复制器、fetch)
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
}
/**重点,开始启动定时任务调度器 scheduler
* Initializes all scheduled tasks.
* (例如,集群解析器、心跳、instanceInfo复制器、fetch)
*/
private void initScheduledTasks() {
//根据客户端配置是否需要获取eureka的服务列表,需要的话就开启注册表缓存刷新
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
//注册表缓存刷新定时器
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
//注册表缓存刷新任务,超时时间和执行时间30s
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
//注册表缓存定时调度,30s一次
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
//根据客户端配置是否需要注册到eureka,需要的话就开启心跳续约
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
//心跳续约定时器,超时时间和执行时间30s
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
//心跳续约定时调度,30s执行一次
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
//instanceInfo复制,实力信息复制定时器
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
//状态更新监听器
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
//这里是开启了在需要的时候更新状态变化的开关才会添加监听器,此处当开关开启时,状态发生变化,会立即收到通知,调用onDemandUpdate方法
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
//启动周期性实例信息复制到远程定时器,默认延迟40s执行
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
//取消调度和任务
private void cancelScheduledTasks() {
if (instanceInfoReplicator != null) {
instanceInfoReplicator.stop();
}
if (heartbeatExecutor != null) {
heartbeatExecutor.shutdownNow();
}
if (cacheRefreshExecutor != null) {
cacheRefreshExecutor.shutdownNow();
}
if (scheduler != null) {
scheduler.shutdownNow();
}
}
/**
* Register with the eureka service by making the appropriate REST call.
* 通过发出rest调用向eureka服务注册
*/
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
二、服务心跳续约
/**
* The heartbeat task that renews the lease in the given intervals.
*/
private class HeartbeatThread implements Runnable {
public void run() {
//心跳续约租期
if (renew()) {
//如果心跳正常,更新发起上次心跳的时间戳为当前时间
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
/**通过发送rest调用更新eureka服务
* Renew with the eureka service by making the appropriate REST call
*/
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
//客户端请求eureka服务器,维持客户端与eureka服务器的心跳
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
//如果eureka服务器未发现当前客户端,则进行注册
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
//客户端发起注册eureka
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
标签:clientConfig,服务,springCloud,null,eureka,instanceInfo,new,logger,Eureka 来源: https://blog.csdn.net/weixin_43622097/article/details/116015889