spring cloud 源码解析
作者:互联网
一 Eureka
1. 需要掌握的一些基础知识
“Applications”:注册在Eureka Server上的应用集合。-- 对应多个**Application**
“Application”:具体的一个应用(eureka-provider)。-- 对应多个"InstanceInfo"(localhost:8070,
localhost:8071, localhost:8072)
"InstanceInfo":应用实例。 IP + Port
源代码:
Applications.java
1 // 2 // Source code recreated from a .class file by IntelliJ IDEA 3 // (powered by Fernflower decompiler) 4 // 5 6 package com.netflix.discovery.shared; 7 8 import com.fasterxml.jackson.annotation.JsonCreator; 9 import com.fasterxml.jackson.annotation.JsonIgnore; 10 import com.fasterxml.jackson.annotation.JsonProperty; 11 import com.fasterxml.jackson.annotation.JsonRootName; 12 import com.netflix.appinfo.InstanceInfo; 13 import com.netflix.appinfo.InstanceInfo.ActionType; 14 import com.netflix.appinfo.InstanceInfo.InstanceStatus; 15 import com.netflix.discovery.EurekaClientConfig; 16 import com.netflix.discovery.InstanceRegionChecker; 17 import com.netflix.discovery.provider.Serializer; 18 import com.thoughtworks.xstream.annotations.XStreamAlias; 19 import com.thoughtworks.xstream.annotations.XStreamImplicit; 20 import java.util.AbstractQueue; 21 import java.util.ArrayList; 22 import java.util.Collections; 23 import java.util.HashSet; 24 import java.util.Iterator; 25 import java.util.List; 26 import java.util.Locale; 27 import java.util.Map; 28 import java.util.Set; 29 import java.util.TreeMap; 30 import java.util.Map.Entry; 31 import java.util.concurrent.ConcurrentHashMap; 32 import java.util.concurrent.ConcurrentLinkedQueue; 33 import java.util.concurrent.atomic.AtomicInteger; 34 import java.util.concurrent.atomic.AtomicLong; 35 import java.util.concurrent.atomic.AtomicReference; 36 import javax.annotation.Nullable; 37 import org.slf4j.Logger; 38 import org.slf4j.LoggerFactory; 39 40 @Serializer("com.netflix.discovery.converters.EntityBodyConverter") 41 @XStreamAlias("applications") 42 @JsonRootName("applications") 43 public class Applications { 44 private static final String APP_INSTANCEID_DELIMITER = "$$"; 45 private static final Logger logger = LoggerFactory.getLogger(Applications.class); 46 private static final String STATUS_DELIMITER = "_"; 47 private Long versionDelta = -1L; 48 @XStreamImplicit 49 private AbstractQueue<Application> applications = new ConcurrentLinkedQueue(); 50 private Map<String, Application> appNameApplicationMap = new ConcurrentHashMap(); 51 private Map<String, AbstractQueue<InstanceInfo>> virtualHostNameAppMap = new ConcurrentHashMap(); 52 private Map<String, AbstractQueue<InstanceInfo>> secureVirtualHostNameAppMap = new ConcurrentHashMap(); 53 private Map<String, AtomicLong> virtualHostNameIndexMap = new ConcurrentHashMap(); 54 private Map<String, AtomicLong> secureVirtualHostNameIndexMap = new ConcurrentHashMap(); 55 private Map<String, AtomicReference<List<InstanceInfo>>> shuffleVirtualHostNameMap = new ConcurrentHashMap(); 56 private Map<String, AtomicReference<List<InstanceInfo>>> shuffledSecureVirtualHostNameMap = new ConcurrentHashMap(); 57 private String appsHashCode; 58 59 public Applications() { 60 } 61 62 @JsonCreator 63 public Applications(@JsonProperty("appsHashCode") String appsHashCode, @JsonProperty("versionDelta") Long versionDelta, @JsonProperty("application") List<Application> registeredApplications) { 64 Iterator var4 = registeredApplications.iterator(); 65 66 while(var4.hasNext()) { 67 Application app = (Application)var4.next(); 68 this.addApplication(app); 69 } 70 71 this.appsHashCode = appsHashCode; 72 this.versionDelta = versionDelta; 73 } 74 75 public Applications(List<Application> apps) { 76 this.applications.addAll(apps); 77 } 78 79 public void addApplication(Application app) { 80 this.appNameApplicationMap.put(app.getName().toUpperCase(Locale.ROOT), app); 81 this.addInstancesToVIPMaps(app); 82 this.applications.add(app); 83 } 84 85 @JsonProperty("application") 86 public List<Application> getRegisteredApplications() { 87 List<Application> list = new ArrayList(); 88 list.addAll(this.applications); 89 return list; 90 } 91 92 public Application getRegisteredApplications(String appName) { 93 return (Application)this.appNameApplicationMap.get(appName.toUpperCase(Locale.ROOT)); 94 } 95 96 public List<InstanceInfo> getInstancesByVirtualHostName(String virtualHostName) { 97 AtomicReference<List<InstanceInfo>> ref = (AtomicReference)this.shuffleVirtualHostNameMap.get(virtualHostName.toUpperCase(Locale.ROOT)); 98 return (List)(ref != null && ref.get() != null ? (List)ref.get() : new ArrayList()); 99 } 100 101 public List<InstanceInfo> getInstancesBySecureVirtualHostName(String secureVirtualHostName) { 102 AtomicReference<List<InstanceInfo>> ref = (AtomicReference)this.shuffledSecureVirtualHostNameMap.get(secureVirtualHostName.toUpperCase(Locale.ROOT)); 103 return (List)(ref != null && ref.get() != null ? (List)ref.get() : new ArrayList()); 104 } 105 106 public int size() { 107 int result = 0; 108 109 Application application; 110 for(Iterator var2 = this.applications.iterator(); var2.hasNext(); result += application.size()) { 111 application = (Application)var2.next(); 112 } 113 114 return result; 115 } 116 117 /** @deprecated */ 118 @Deprecated 119 public void setVersion(Long version) { 120 this.versionDelta = version; 121 } 122 123 /** @deprecated */ 124 @Deprecated 125 @JsonIgnore 126 public Long getVersion() { 127 return this.versionDelta; 128 } 129 130 public void setAppsHashCode(String hashCode) { 131 this.appsHashCode = hashCode; 132 } 133 134 @JsonIgnore 135 public String getAppsHashCode() { 136 return this.appsHashCode; 137 } 138 139 @JsonIgnore 140 public String getReconcileHashCode() { 141 TreeMap<String, AtomicInteger> instanceCountMap = new TreeMap(); 142 this.populateInstanceCountMap(instanceCountMap); 143 return getReconcileHashCode(instanceCountMap); 144 } 145 146 public void populateInstanceCountMap(TreeMap<String, AtomicInteger> instanceCountMap) { 147 Iterator var2 = this.getRegisteredApplications().iterator(); 148 149 while(var2.hasNext()) { 150 Application app = (Application)var2.next(); 151 152 AtomicInteger instanceCount; 153 for(Iterator var4 = app.getInstancesAsIsFromEureka().iterator(); var4.hasNext(); instanceCount.incrementAndGet()) { 154 InstanceInfo info = (InstanceInfo)var4.next(); 155 instanceCount = (AtomicInteger)instanceCountMap.get(info.getStatus().name()); 156 if (instanceCount == null) { 157 instanceCount = new AtomicInteger(0); 158 instanceCountMap.put(info.getStatus().name(), instanceCount); 159 } 160 } 161 } 162 163 } 164 165 public static String getReconcileHashCode(TreeMap<String, AtomicInteger> instanceCountMap) { 166 String reconcileHashCode = ""; 167 168 Entry mapEntry; 169 for(Iterator var2 = instanceCountMap.entrySet().iterator(); var2.hasNext(); reconcileHashCode = reconcileHashCode + (String)mapEntry.getKey() + "_" + ((AtomicInteger)mapEntry.getValue()).get() + "_") { 170 mapEntry = (Entry)var2.next(); 171 } 172 173 return reconcileHashCode; 174 } 175 176 public Map<String, List<String>> getReconcileMapDiff(Applications apps) { 177 Map<String, List<String>> diffMap = new TreeMap(); 178 Set<Applications.Pair> allInstanceAppInstanceIds = new HashSet(); 179 Iterator var4 = apps.getRegisteredApplications().iterator(); 180 181 while(true) { 182 Application thisApp; 183 while(var4.hasNext()) { 184 Application otherApp = (Application)var4.next(); 185 thisApp = this.getRegisteredApplications(otherApp.getName()); 186 if (thisApp == null) { 187 logger.warn("Application not found in local cache : {}", otherApp.getName()); 188 } else { 189 Iterator var7 = thisApp.getInstancesAsIsFromEureka().iterator(); 190 191 InstanceInfo otherInstanceInfo; 192 while(var7.hasNext()) { 193 otherInstanceInfo = (InstanceInfo)var7.next(); 194 allInstanceAppInstanceIds.add(new Applications.Pair(thisApp.getName(), otherInstanceInfo.getId())); 195 } 196 197 for(var7 = otherApp.getInstancesAsIsFromEureka().iterator(); var7.hasNext(); allInstanceAppInstanceIds.remove(new Applications.Pair(otherApp.getName(), otherInstanceInfo.getId()))) { 198 otherInstanceInfo = (InstanceInfo)var7.next(); 199 InstanceInfo thisInstanceInfo = thisApp.getByInstanceId(otherInstanceInfo.getId()); 200 Object diffList; 201 if (thisInstanceInfo == null) { 202 diffList = (List)diffMap.get(ActionType.DELETED.name()); 203 if (diffList == null) { 204 diffList = new ArrayList(); 205 diffMap.put(ActionType.DELETED.name(), diffList); 206 } 207 208 ((List)diffList).add(otherInstanceInfo.getId()); 209 } else if (!thisInstanceInfo.getStatus().name().equalsIgnoreCase(otherInstanceInfo.getStatus().name())) { 210 diffList = (List)diffMap.get(ActionType.MODIFIED.name()); 211 if (diffList == null) { 212 diffList = new ArrayList(); 213 diffMap.put(ActionType.MODIFIED.name(), diffList); 214 } 215 216 ((List)diffList).add(thisInstanceInfo.getId() + "$$" + thisInstanceInfo.getStatus().name() + "$$" + otherInstanceInfo.getStatus().name()); 217 } 218 } 219 } 220 } 221 222 var4 = allInstanceAppInstanceIds.iterator(); 223 224 while(var4.hasNext()) { 225 Applications.Pair pair = (Applications.Pair)var4.next(); 226 thisApp = new Application(pair.getItem1()); 227 InstanceInfo thisInstanceInfo = thisApp.getByInstanceId(pair.getItem2()); 228 if (thisInstanceInfo != null) { 229 List<String> diffList = (List)diffMap.get(ActionType.ADDED.name()); 230 if (diffList == null) { 231 diffList = new ArrayList(); 232 diffMap.put(ActionType.ADDED.name(), diffList); 233 } 234 235 ((List)diffList).add(thisInstanceInfo.getId()); 236 } 237 } 238 239 return diffMap; 240 } 241 } 242 243 public void shuffleInstances(boolean filterUpInstances) { 244 this.shuffleInstances(filterUpInstances, false, (Map)null, (EurekaClientConfig)null, (InstanceRegionChecker)null); 245 } 246 247 public void shuffleAndIndexInstances(Map<String, Applications> remoteRegionsRegistry, EurekaClientConfig clientConfig, InstanceRegionChecker instanceRegionChecker) { 248 this.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances(), true, remoteRegionsRegistry, clientConfig, instanceRegionChecker); 249 } 250 251 private void shuffleInstances(boolean filterUpInstances, boolean indexByRemoteRegions, @Nullable Map<String, Applications> remoteRegionsRegistry, @Nullable EurekaClientConfig clientConfig, @Nullable InstanceRegionChecker instanceRegionChecker) { 252 this.virtualHostNameAppMap.clear(); 253 this.secureVirtualHostNameAppMap.clear(); 254 255 Application application; 256 for(Iterator var6 = this.appNameApplicationMap.values().iterator(); var6.hasNext(); this.addInstancesToVIPMaps(application)) { 257 application = (Application)var6.next(); 258 if (indexByRemoteRegions) { 259 application.shuffleAndStoreInstances(remoteRegionsRegistry, clientConfig, instanceRegionChecker); 260 } else { 261 application.shuffleAndStoreInstances(filterUpInstances); 262 } 263 } 264 265 this.shuffleAndFilterInstances(this.virtualHostNameAppMap, this.shuffleVirtualHostNameMap, this.virtualHostNameIndexMap, filterUpInstances); 266 this.shuffleAndFilterInstances(this.secureVirtualHostNameAppMap, this.shuffledSecureVirtualHostNameMap, this.secureVirtualHostNameIndexMap, filterUpInstances); 267 } 268 269 public AtomicLong getNextIndex(String virtualHostname, boolean secure) { 270 return secure ? (AtomicLong)this.secureVirtualHostNameIndexMap.get(virtualHostname) : (AtomicLong)this.virtualHostNameIndexMap.get(virtualHostname); 271 } 272 273 private void shuffleAndFilterInstances(Map<String, AbstractQueue<InstanceInfo>> srcMap, Map<String, AtomicReference<List<InstanceInfo>>> destMap, Map<String, AtomicLong> vipIndexMap, boolean filterUpInstances) { 274 Iterator var5 = srcMap.entrySet().iterator(); 275 276 while(var5.hasNext()) { 277 Entry<String, AbstractQueue<InstanceInfo>> entries = (Entry)var5.next(); 278 AbstractQueue<InstanceInfo> instanceInfoQueue = (AbstractQueue)entries.getValue(); 279 List<InstanceInfo> l = new ArrayList(instanceInfoQueue); 280 if (filterUpInstances) { 281 Iterator it = l.iterator(); 282 283 while(it.hasNext()) { 284 InstanceInfo instanceInfo = (InstanceInfo)it.next(); 285 if (!InstanceStatus.UP.equals(instanceInfo.getStatus())) { 286 it.remove(); 287 } 288 } 289 } 290 291 Collections.shuffle(l); 292 AtomicReference<List<InstanceInfo>> instanceInfoList = (AtomicReference)destMap.get(entries.getKey()); 293 if (instanceInfoList == null) { 294 instanceInfoList = new AtomicReference(l); 295 destMap.put(entries.getKey(), instanceInfoList); 296 } 297 298 instanceInfoList.set(l); 299 vipIndexMap.put(entries.getKey(), new AtomicLong(0L)); 300 } 301 302 Set<String> srcVips = srcMap.keySet(); 303 Set<String> destVips = destMap.keySet(); 304 destVips.retainAll(srcVips); 305 } 306 307 private void addInstanceToMap(InstanceInfo info, String vipAddresses, Map<String, AbstractQueue<InstanceInfo>> vipMap) { 308 if (vipAddresses != null) { 309 String[] vipAddressArray = vipAddresses.split(","); 310 String[] var5 = vipAddressArray; 311 int var6 = vipAddressArray.length; 312 313 for(int var7 = 0; var7 < var6; ++var7) { 314 String vipAddress = var5[var7]; 315 String vipName = vipAddress.toUpperCase(Locale.ROOT); 316 AbstractQueue<InstanceInfo> instanceInfoList = (AbstractQueue)vipMap.get(vipName); 317 if (instanceInfoList == null) { 318 instanceInfoList = new ConcurrentLinkedQueue(); 319 vipMap.put(vipName, instanceInfoList); 320 } 321 322 ((AbstractQueue)instanceInfoList).add(info); 323 } 324 } 325 326 } 327 328 private void addInstancesToVIPMaps(Application app) { 329 Iterator var2 = app.getInstances().iterator(); 330 331 while(true) { 332 InstanceInfo info; 333 String vipAddresses; 334 String secureVipAddresses; 335 do { 336 if (!var2.hasNext()) { 337 return; 338 } 339 340 info = (InstanceInfo)var2.next(); 341 vipAddresses = info.getVIPAddress(); 342 secureVipAddresses = info.getSecureVipAddress(); 343 } while(vipAddresses == null && secureVipAddresses == null); 344 345 this.addInstanceToMap(info, vipAddresses, this.virtualHostNameAppMap); 346 this.addInstanceToMap(info, secureVipAddresses, this.secureVirtualHostNameAppMap); 347 } 348 } 349 350 private static final class Pair { 351 private final String item1; 352 private final String item2; 353 354 public Pair(String item1, String item2) { 355 this.item1 = item1; 356 this.item2 = item2; 357 } 358 359 public int hashCode() { 360 int prime = true; 361 int result = 1; 362 int result = 31 * result + (this.item1 == null ? 0 : this.item1.hashCode()); 363 result = 31 * result + (this.item2 == null ? 0 : this.item2.hashCode()); 364 return result; 365 } 366 367 public boolean equals(Object obj) { 368 if (this == obj) { 369 return true; 370 } else if (obj == null) { 371 return false; 372 } else if (this.getClass() != obj.getClass()) { 373 return false; 374 } else { 375 Applications.Pair other = (Applications.Pair)obj; 376 if (this.item1 == null) { 377 if (other.item1 != null) { 378 return false; 379 } 380 } else if (!this.item1.equals(other.item1)) { 381 return false; 382 } 383 384 if (this.item2 == null) { 385 if (other.item2 != null) { 386 return false; 387 } 388 } else if (!this.item2.equals(other.item2)) { 389 return false; 390 } 391 392 return true; 393 } 394 } 395 396 public String getItem1() { 397 return this.item1; 398 } 399 400 public String getItem2() { 401 return this.item2; 402 } 403 } 404 }
源代码49行为Applications的主要数据结构,可见其数据结构为一个Queue,Queue中的是
Application。
Application.java:
1 // 2 // Source code recreated from a .class file by IntelliJ IDEA 3 // (powered by Fernflower decompiler) 4 // 5 6 package com.netflix.discovery.shared; 7 8 import com.fasterxml.jackson.annotation.JsonCreator; 9 import com.fasterxml.jackson.annotation.JsonIgnore; 10 import com.fasterxml.jackson.annotation.JsonProperty; 11 import com.fasterxml.jackson.annotation.JsonRootName; 12 import com.netflix.appinfo.InstanceInfo; 13 import com.netflix.appinfo.InstanceInfo.InstanceStatus; 14 import com.netflix.discovery.EurekaClientConfig; 15 import com.netflix.discovery.InstanceRegionChecker; 16 import com.netflix.discovery.provider.Serializer; 17 import com.netflix.discovery.util.StringCache; 18 import com.thoughtworks.xstream.annotations.XStreamAlias; 19 import com.thoughtworks.xstream.annotations.XStreamImplicit; 20 import com.thoughtworks.xstream.annotations.XStreamOmitField; 21 import java.util.ArrayList; 22 import java.util.Collections; 23 import java.util.Iterator; 24 import java.util.LinkedHashSet; 25 import java.util.List; 26 import java.util.Map; 27 import java.util.Set; 28 import java.util.concurrent.ConcurrentHashMap; 29 import java.util.concurrent.atomic.AtomicReference; 30 import javax.annotation.Nullable; 31 32 @Serializer("com.netflix.discovery.converters.EntityBodyConverter") 33 @XStreamAlias("application") 34 @JsonRootName("application") 35 public class Application { 36 private String name; 37 @XStreamOmitField 38 private volatile boolean isDirty; 39 @XStreamImplicit 40 private final Set<InstanceInfo> instances; 41 private AtomicReference<List<InstanceInfo>> shuffledInstances; 42 private Map<String, InstanceInfo> instancesMap; 43 44 public String toString() { 45 return "Application [name=" + this.name + ", isDirty=" + this.isDirty + ", instances=" + this.instances + ", shuffledInstances=" + this.shuffledInstances + ", instancesMap=" + this.instancesMap + "]"; 46 } 47 48 public Application() { 49 this.isDirty = false; 50 this.shuffledInstances = new AtomicReference(); 51 this.instances = new LinkedHashSet(); 52 this.instancesMap = new ConcurrentHashMap(); 53 } 54 55 public Application(String name) { 56 this.isDirty = false; 57 this.shuffledInstances = new AtomicReference(); 58 this.name = StringCache.intern(name); 59 this.instancesMap = new ConcurrentHashMap(); 60 this.instances = new LinkedHashSet(); 61 } 62 63 @JsonCreator 64 public Application(@JsonProperty("name") String name, @JsonProperty("instance") List<InstanceInfo> instances) { 65 this(name); 66 Iterator var3 = instances.iterator(); 67 68 while(var3.hasNext()) { 69 InstanceInfo instanceInfo = (InstanceInfo)var3.next(); 70 this.addInstance(instanceInfo); 71 } 72 73 } 74 75 public void addInstance(InstanceInfo i) { 76 this.instancesMap.put(i.getId(), i); 77 Set var2 = this.instances; 78 synchronized(this.instances) { 79 this.instances.remove(i); 80 this.instances.add(i); 81 this.isDirty = true; 82 } 83 } 84 85 public void removeInstance(InstanceInfo i) { 86 this.removeInstance(i, true); 87 } 88 89 @JsonProperty("instance") 90 public List<InstanceInfo> getInstances() { 91 return this.shuffledInstances.get() == null ? this.getInstancesAsIsFromEureka() : (List)this.shuffledInstances.get(); 92 } 93 94 @JsonIgnore 95 public List<InstanceInfo> getInstancesAsIsFromEureka() { 96 Set var1 = this.instances; 97 synchronized(this.instances) { 98 return new ArrayList(this.instances); 99 } 100 } 101 102 public InstanceInfo getByInstanceId(String id) { 103 return (InstanceInfo)this.instancesMap.get(id); 104 } 105 106 public String getName() { 107 return this.name; 108 } 109 110 public void setName(String name) { 111 this.name = StringCache.intern(name); 112 } 113 114 public int size() { 115 return this.instances.size(); 116 } 117 118 public void shuffleAndStoreInstances(boolean filterUpInstances) { 119 this._shuffleAndStoreInstances(filterUpInstances, false, (Map)null, (EurekaClientConfig)null, (InstanceRegionChecker)null); 120 } 121 122 public void shuffleAndStoreInstances(Map<String, Applications> remoteRegionsRegistry, EurekaClientConfig clientConfig, InstanceRegionChecker instanceRegionChecker) { 123 this._shuffleAndStoreInstances(clientConfig.shouldFilterOnlyUpInstances(), true, remoteRegionsRegistry, clientConfig, instanceRegionChecker); 124 } 125 126 private void _shuffleAndStoreInstances(boolean filterUpInstances, boolean indexByRemoteRegions, @Nullable Map<String, Applications> remoteRegionsRegistry, @Nullable EurekaClientConfig clientConfig, @Nullable InstanceRegionChecker instanceRegionChecker) { 127 Set var7 = this.instances; 128 ArrayList instanceInfoList; 129 synchronized(this.instances) { 130 instanceInfoList = new ArrayList(this.instances); 131 } 132 133 if (indexByRemoteRegions || filterUpInstances) { 134 Iterator it = instanceInfoList.iterator(); 135 136 label50: 137 while(true) { 138 while(true) { 139 if (!it.hasNext()) { 140 break label50; 141 } 142 143 InstanceInfo instanceInfo = (InstanceInfo)it.next(); 144 if (filterUpInstances && !InstanceStatus.UP.equals(instanceInfo.getStatus())) { 145 it.remove(); 146 } else if (indexByRemoteRegions && null != instanceRegionChecker && null != clientConfig && null != remoteRegionsRegistry) { 147 String instanceRegion = instanceRegionChecker.getInstanceRegion(instanceInfo); 148 if (!instanceRegionChecker.isLocalRegion(instanceRegion)) { 149 Applications appsForRemoteRegion = (Applications)remoteRegionsRegistry.get(instanceRegion); 150 if (null == appsForRemoteRegion) { 151 appsForRemoteRegion = new Applications(); 152 remoteRegionsRegistry.put(instanceRegion, appsForRemoteRegion); 153 } 154 155 Application remoteApp = appsForRemoteRegion.getRegisteredApplications(instanceInfo.getAppName()); 156 if (null == remoteApp) { 157 remoteApp = new Application(instanceInfo.getAppName()); 158 appsForRemoteRegion.addApplication(remoteApp); 159 } 160 161 remoteApp.addInstance(instanceInfo); 162 this.removeInstance(instanceInfo, false); 163 it.remove(); 164 } 165 } 166 } 167 } 168 } 169 170 Collections.shuffle(instanceInfoList); 171 this.shuffledInstances.set(instanceInfoList); 172 } 173 174 private void removeInstance(InstanceInfo i, boolean markAsDirty) { 175 this.instancesMap.remove(i.getId()); 176 Set var3 = this.instances; 177 synchronized(this.instances) { 178 this.instances.remove(i); 179 if (markAsDirty) { 180 this.isDirty = true; 181 } 182 183 } 184 } 185 }
源代码第40行,是Application的主要数据结构,其定义的是一个Set,Set中是是
InstanceInfo
2. 源码解析
(1)任务的初始化
任务的初始化是由DiscoveryClient.java的构造方法
DiscoveryClient(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) 创建并执行一些线程容器
完成的(scheduler线程池)。该定时任务用于从注册中心获取服务列表、心跳、
向注册中心注册自己。创建容器代码片段
this.scheduler = Executors.newScheduledThreadPool(2, (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-%d").setDaemon(true).build()); this.heartbeatExecutor = new ThreadPoolExecutor(1, this.clientConfig.getHeartbeatExecutorThreadPoolSize(), 0L, TimeUnit.SECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build()); this.cacheRefreshExecutor = new ThreadPoolExecutor(1, this.clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0L, TimeUnit.SECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build());
执行定时任务代码片段(构造方法调用initScheduledTasks()方法完成的)
1 1、一个从注册中心获取服务列表的任务,频率:registryFetchIntervalSeconds(默认30s) 2 if (clientConfig.shouldFetchRegistry()) { 3 // registry cache refresh timer 4 int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); 5 int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); 6 scheduler.schedule( 7 new TimedSupervisorTask( 8 "cacheRefresh", 9 scheduler, 10 cacheRefreshExecutor, 11 registryFetchIntervalSeconds, 12 TimeUnit.SECONDS, 13 expBackOffBound, 14 new CacheRefreshThread() 15 ), 16 registryFetchIntervalSeconds, TimeUnit.SECONDS); 17 } 18 ``` 19 20 21 22 2、客户端启动一个心跳线程,向注册中心发送心跳,频率:renewalIntervalInSecs(默认30s) 23 // Heartbeat timer 24 scheduler.schedule( 25 new TimedSupervisorTask( 26 "heartbeat", 27 scheduler, 28 heartbeatExecutor, 29 renewalIntervalInSecs, 30 TimeUnit.SECONDS, 31 expBackOffBound, 32 new HeartbeatThread() 33 ), 34 renewalIntervalInSecs, TimeUnit.SECONDS); 35 ``` 36 37 38 39 40 3、客户端向注册中心注册自己,频率:InstanceInfoReplicationIntervalSeconds(默认30s)定时的上报自己的信息 。
注册是为了定时的向注册中心上报自己的信息,所以他需要不停的注册,而不是注册一次 41 instanceInfoReplicator = new InstanceInfoReplicator( 42 this, 43 instanceInfo, 44 clientConfig.getInstanceInfoReplicationIntervalSeconds(), 45 2); // burstSize 46 ```
(2)向服务端获取服务列表
思路:(a)客户端先缓存中获取服务列表
(b)根据缓存的情况进行增量或是全量更新服务列表
(c)发送http请求到服务端
(d)服务端先去读缓存中获取服务列表如果没有则去写缓存中获取服务列表并将写缓存中
服务列表缓存到读缓存中
(e)服务端返回结果给客户端
(f)客户端将结果缓存到自己的缓存中
引入一个问题:
客户端A(A1、A2、A3)调用客户端C(C1、C2、C3),当A1调用C1的时候,C1宕机,那么
这时候调用失败,这个时候需要加入降级来保证可用性
(2)向服务端注册
(1)客户端通过http请求将 “服务名称”+“ip”+"端口号"(instance)发送给服务端(post请求,参数
为InstanceInfo)
(2)服务端存储信息并将这些信息同步给其它Eureka服务器
注:注册传的是instanceInfo。
-- 服务端根据appName、instanceInfo获取Instance
-- 对比同一instanceInfo节点下的的的节点(服务端存储的和客户端新传过去的)的时间戳谁
的比较新,将较新节点的存储到缓存中(gMap)。
注意:客户端创建的节点是临时节点。
(3)对比时间戳,得到一个真正的instanceinfo
服务端处理Instance源码:
AbstractInstanceRegistry.java
1 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { 2 try { 3 this.read.lock(); 4 Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName()); 5 EurekaMonitors.REGISTER.increment(isReplication); 6 if (gMap == null) { 7 ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap(); 8 gMap = (Map)this.registry.putIfAbsent(registrant.getAppName(), gNewMap); 9 if (gMap == null) { 10 gMap = gNewMap; 11 } 12 } 13 14 Lease<InstanceInfo> existingLease = (Lease)((Map)gMap).get(registrant.getId()); 15 if (existingLease != null && existingLease.getHolder() != null) { 16 Long existingLastDirtyTimestamp = ((InstanceInfo)existingLease.getHolder()).getLastDirtyTimestamp(); 17 Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); 18 logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); 19 if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { 20 logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); 21 logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); 22 registrant = (InstanceInfo)existingLease.getHolder(); 23 } 24 } else { 25 Object var6 = this.lock; 26 synchronized(this.lock) { 27 if (this.expectedNumberOfRenewsPerMin > 0) { 28 this.expectedNumberOfRenewsPerMin += 2; 29 this.numberOfRenewsPerMinThreshold = (int)((double)this.expectedNumberOfRenewsPerMin * this.serverConfig.getRenewalPercentThreshold()); 30 } 31 } 32 33 logger.debug("No previous lease information found; it is new registration"); 34 } 35 36 Lease<InstanceInfo> lease = new Lease(registrant, leaseDuration); 37 if (existingLease != null) { 38 lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); 39 } 40 41 ((Map)gMap).put(registrant.getId(), lease); 42 AbstractInstanceRegistry.CircularQueue var20 = this.recentRegisteredQueue; 43 synchronized(this.recentRegisteredQueue) { 44 this.recentRegisteredQueue.add(new Pair(System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); 45 } 46 47 if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { 48 logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the overrides", registrant.getOverriddenStatus(), registrant.getId()); 49 if (!this.overriddenInstanceStatusMap.containsKey(registrant.getId())) { 50 logger.info("Not found overridden id {} and hence adding it", registrant.getId()); 51 this.overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); 52 } 53 } 54 55 InstanceStatus overriddenStatusFromMap = (InstanceStatus)this.overriddenInstanceStatusMap.get(registrant.getId()); 56 if (overriddenStatusFromMap != null) { 57 logger.info("Storing overridden status {} from map", overriddenStatusFromMap); 58 registrant.setOverriddenStatus(overriddenStatusFromMap); 59 } 60 61 InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(registrant, existingLease, isReplication); 62 registrant.setStatusWithoutDirty(overriddenInstanceStatus); 63 if (InstanceStatus.UP.equals(registrant.getStatus())) { 64 lease.serviceUp(); 65 } 66 67 registrant.setActionType(ActionType.ADDED); 68 this.recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(lease)); 69 registrant.setLastUpdatedTimestamp(); 70 this.invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); 71 logger.info("Registered instance {}/{} with status {} (replication={})", new Object[]{registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication}); 72 } finally { 73 this.read.unlock(); 74 } 75 76 }
(3)心跳
(a)客户端向服务端发送http请求(参数包括appName,instanceInfo)
源代码:
AbstractJerseyEurekaHttpClient.java
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) { String urlPath = "apps/" + appName + '/' + id; ClientResponse response = null; EurekaHttpResponse var10; try { WebResource webResource = this.jerseyClient.resource(this.serviceUrl).path(urlPath).queryParam("status", info.getStatus().toString()).queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString()); if (overriddenStatus != null) { webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name()); } Builder requestBuilder = webResource.getRequestBuilder(); this.addExtraHeaders(requestBuilder); response = (ClientResponse)requestBuilder.put(ClientResponse.class); EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response)); if (response.hasEntity()) { eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class)); } var10 = eurekaResponseBuilder.build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", new Object[]{this.serviceUrl, urlPath, response == null ? "N/A" : response.getStatus()}); } if (response != null) { response.close(); } } return var10; }
(b)服务端接收到请求,进行续约并同步给其它Eureka
-- 判断实例是否存在,如果存在就进行续约,如果不存在续约失败
-- 续约(更新时间戳lastUpdateTimestamp)
(c)更新时间戳完成续约
续约源代码:Lease.java
public void renew() { this.lastUpdateTimestamp = System.currentTimeMillis() + this.duration; }
(4)服务下线
(a)客户端停掉初始化时创建的定时任务
(b)向服务端发送http请求
(c)服务端执行下线并复制给其它的Eureka
下线其实就是把gMap中需要移除的实例设置一个被移除的时间戳。
下线源代码:Lease.java
public void cancel() { if (this.evictionTimestamp <= 0L) { this.evictionTimestamp = System.currentTimeMillis(); } }
(d)通过租约超时来判断超时,代码
public boolean isExpired(long additionalLeaseMs) { return this.evictionTimestamp > 0L || System.currentTimeMillis() > this.lastUpdateTimestamp + this.duration + additionalLeaseMs; }
标签:String,spring,InstanceInfo,源码,new,import,null,public,cloud 来源: https://www.cnblogs.com/jialanshun/p/10993695.html