[从源码学设计]蚂蚁金服SOFARegistry 之 服务注册和操作日志
作者:互联网
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。 本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。 本文为第十四篇,介绍SOFARegistry服务上线和操作日志。上文是从Session Server角度,本文从 Data Server 角度介绍。
[从源码学设计]蚂蚁金服SOFARegistry之服务注册和操作日志
目录
- [从源码学设计]蚂蚁金服SOFARegistry之服务注册和操作日志
- 9.1 DataChangeHandler
- 8.1 SyncDataRequest
- 8.2 syncDataHandler
- 8.3 SyncDataServiceImpl
- 8.4 AbstractAcceptorStore
- 8.5 Acceptor
- 8.1.1 SyncDataRequest 从哪里来
- 7.1 NotifyDataSyncHandler
- 7.1.1 doHandle
- 7.1.2 executorRequest
- 7.1.3 GetSyncDataHandler
- 7.1.4 SyncDataCallback
- 6.1 appendOperator
- 6.2 checkExpired
- 5.1 Bean
- 5.2 StoreServiceFactory
- 5.3 AbstractAcceptorStore
- 5.4 加入
- 5.5 使用
- 5.5.2.1 通知NotifyDataSyncRequest
- 5.5.1 Scheduler
- 5.5.2 changeDataCheck
- 5.5.3 checkAcceptorsChangAndExpired
0x00 摘要
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。
本文为第十四篇,介绍SOFARegistry服务上线和操作日志。上文是从Session Server角度,本文从 Data Server 角度介绍。
0x01 整体业务流程
我们首先回顾总体业务流程,这部分属于数据分片。
1.1 服务注册过程
回顾下“一次服务注册过程”的服务数据在内部流转过程。
- Client 调用 publisher.register 向 SessionServer 注册服务。
- SessionServer 收到服务数据 (PublisherRegister) 后,将其写入内存 (SessionServer 会存储 Client 的数据到内存,用于后续可以跟 DataServer 做定期检查),再根据 dataInfoId 的一致性 Hash 寻找对应的 DataServer,将 PublisherRegister 发给 DataServer。
- DataServer 接收到 PublisherRegister 数据,首先也是将数据写入内存 ,DataServer 会以 dataInfoId 的维度汇总所有 PublisherRegister。同时,DataServer 将该 dataInfoId 的变更事件通知给所有 SessionServer,变更事件的内容是 dataInfoId 和版本号信息 version。
- 同时,异步地,DataServer 以 dataInfoId 维度增量地同步数据给其他副本。因为 DataServer 在一致性 Hash 分片的基础上,对每个分片保存了多个副本(默认是3个副本)。
- SessionServer 接收到变更事件通知后,对比 SessionServer 内存中存储的 dataInfoId 的 version,若发现比 DataServer 发过来的小,则主动向 DataServer 获取 dataInfoId 的完整数据,即包含了所有该 dataInfoId 具体的 PublisherRegister 列表。
- 最后,SessionServer 将数据推送给相应的 Client,Client 就接收到这一次服务注册之后的最新的服务列表数据。
因为篇幅所限,上文讨论的是前两点,本文介绍第三,第四点。
1.2 数据分片
当服务上线时,会计算新增服务的 dataInfoId Hash 值,从而对该服务进行分片,最后寻找最近的一个节点,存储到相应的节点上。
DataServer 服务在启动时添加了 publishDataProcessor 来处理相应的服务发布者数据发布请求,该 publishDataProcessor 就是 PublishDataHandler。当有新的服务发布者上线,DataServer 的 PublishDataHandler 将会被触发。
该 Handler 首先会判断当前节点的状态,若是非工作状态则返回请求失败。若是工作状态,则触发数据变化事件中心 DataChangeEventCenter 的 onChange 方法。
DataChangeEventQueue 中维护着一个 DataChangeEventQueue 队列数组,数组中的每个元素是一个事件队列。当上文中的 onChange 方法被触发时,会计算该变化服务的 dataInfoId 的 Hash 值,从而进一步确定出该服务注册数据所在的队列编号,进而把该变化的数据封装成一个数据变化对象,传入到队列中。
DataChangeEventQueue#start 方法在 DataChangeEventCenter 初始化的时候被一个新的线程调用,该方法会源源不断地从队列中获取新增事件,并且进行分发。新增数据会由此添加进节点内,实现分片。
与此同时,DataChangeHandler 会把这个事件变更信息通过 ChangeNotifier 对外发布,通知其他节点进行数据同步。
0x02 基础数据结构
这里需要首先讲解几个相关数据结构。
2.1 Publisher
Publisher是数据发布者信息。
public class Publisher extends BaseInfo { private ListdataList; private PublishType publishType = PublishType.NORMAL; }
2.2 Datum
是从SOFARegistry本身出发而汇集的数据发布者信息,里面核心是 :
- dataInfoId:服务唯一标识,由``
- dataCenter:一个物理机房,包含多个逻辑单元(zone)。zone:是一种单元化架构下的概念,代表一个机房内的逻辑单元。在服务发现场景下,发布服务时需指定逻辑单元(zone),而订阅服务者可以订阅逻辑单元(zone)维度的服务数据,也可以订阅物理机房(datacenter)维度的服务数据,即订阅该 datacenter 下的所有 zone 的服务数据。;
- pubMap:包括的Publisher;
- version:对应的版本
具体代码如下:
public class Datum implements Serializable { private String dataInfoId; private String dataCenter; private String dataId; private String instanceId; private String group; private MappubMap = new ConcurrentHashMap<>(); private long version; private boolean containsUnPub = false; }
2.3 DatumCache
DatumCache 是最新的Datum。
public class DatumCache { @Autowired private DatumStorage localDatumStorage; }
具体存储是在LocalDatumStorage中完成。
public class LocalDatumStorage implements DatumStorage { /** * row: dataCenter * column: dataInfoId * value: datum */ protected final Map<String, Map> DATUM_MAP = new ConcurrentHashMap<>(); /** * all datum index * * row: ip:port * column: registerId * value: publisher */ protected final Map<String, Map> ALL_CONNECT_ID_INDEX = new ConcurrentHashMap<>(); @Autowired private DataServerConfig dataServerConfig; }
2.4 Operator
Operator 是每一步Datum对应的操作。
public class Operator { private Long version; private Long sourceVersion; private Datum datum; private DataSourceTypeEnum sourceType; }
2.5 Acceptor
记录了所有的Datum操作。其中:
- logOperatorsOrder记录了操作的顺序;
- logOperators是所有的操作;
public class Acceptor { private final String dataInfoId; private final String dataCenter; private int maxBufferSize; static final int DEFAULT_DURATION_SECS = 30; private final DequelogOperatorsOrder = new ConcurrentLinkedDeque<>(); private MaplogOperators = new ConcurrentHashMap<>(); private final DatumCache datumCache; }
2.6 总结
总结下这几个数据结构的联系:
- Publisher是数据发布者信息。
- Datum是从SOFARegistry本身出发而汇集的数据发布者信息。
- DatumCache 是最新的Datum。
- Operator 是每一步Datum对应的操作。
- Acceptor记录了所有的Datum操作。
0x03 Datum的来龙去脉
我们先回顾下 Datum 的来龙去脉。
3.1 Session Server 内部
首先,我们讲讲Session Server 内部如何获取Datum
在 Session Server 内部,Datum存储在 SessionCacheService 之中。
比如在 DataChangeFetchCloudTask 内部,可以这样获取 Datum。
private MapgetDatumsCache() { Mapmap = new HashMap<>(); NodeManager nodeManager = NodeManagerFactory.getNodeManager(NodeType.META); CollectiondataCenters = nodeManager.getDataCenters(); if (dataCenters != null) { Collectionkeys = dataCenters.stream(). map(dataCenter -> new Key(KeyType.OBJ, DatumKey.class.getName(), new DatumKey(fetchDataInfoId, dataCenter))). collect(Collectors.toList()); Mapvalues = null; values = sessionCacheService.getValues(keys); if (values != null) { values.forEach((key, value) -> { if (value != null && value.getPayload() != null) { map.put(((DatumKey) key.getEntityType()).getDataCenter(), (Datum) value.getPayload()); } }); } } return map; }
Session Server 会向 Data Server 发送 PublishDataRequest 请求。
3.2 PublishDataHandler
在DataServer内部,PublishDataHandler 是用来处理 PublishDataRequest。
public class PublishDataHandler extends AbstractServerHandler{ @Autowired private ForwardService forwardService; @Autowired private SessionServerConnectionFactory sessionServerConnectionFactory; @Autowired private DataChangeEventCenter dataChangeEventCenter; @Autowired private DataServerConfig dataServerConfig; @Autowired private DatumLeaseManager datumLeaseManager; @Autowired private ThreadPoolExecutor publishProcessorExecutor; @Override public Object doHandle(Channel channel, PublishDataRequest request) { Publisher publisher = Publisher.internPublisher(request.getPublisher()); if (forwardService.needForward()) { CommonResponse response = new CommonResponse(); response.setSuccess(false); response.setMessage("Request refused, Server status is not working"); return response; } dataChangeEventCenter.onChange(publisher, dataServerConfig.getLocalDataCenter()); if (publisher.getPublishType() != PublishType.TEMPORARY) { String connectId = WordCache.getInstance().getWordCache( publisher.getSourceAddress().getAddressString()); sessionServerConnectionFactory.registerConnectId(request.getSessionServerProcessId(), connectId); // record the renew timestamp datumLeaseManager.renew(connectId); } return CommonResponse.buildSucce***esponse(); } }
3.3 DataChangeEventCenter
在 DataChangeEventCenter 的 onChange 函数中,会进行投放。
public void onChange(Publisher publisher, String dataCenter) { int idx = hash(publisher.getDataInfoId()); Datum datum = new Datum(publisher, dataCenter); if (publisher instanceof UnPublisher) { datum.setContainsUnPub(true); } if (publisher.getPublishType() != PublishType.TEMPORARY) { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB, datum)); } else { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB_TEMP, datum)); } }
3.4 DataChangeEventQueue
在DataChangeEventQueue之中,会调用 handleDatum 来处理。在这里对Datum进行存储。
3.5 DataChangeHandler
在 DataChangeHandler 之中,会提取ChangeData,然后进行Notify。
public void start() { DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues(); int queueCount = queues.length; Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName()); Executor notifyExecutor = ExecutorFactory .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName()); for (int idx = 0; idx < queueCount; idx++) { final DataChangeEventQueue dataChangeEventQueue = queues[idx]; final String name = dataChangeEventQueue.getName(); executor.execute(() -> { while (true) { final ChangeData changeData = dataChangeEventQueue.take(); notifyExecutor.execute(new ChangeNotifier(changeData, name)); } }); } }
具体如下:
+ Session Server | Data Server | | | | +--------------------------+ PublishDataRequest +--------------------+ | DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler | +-----------+--------------+ | +------+-------------+ ^ | | | getValues | | onChange(Publisher) | | v | | +--------+--------------+ +---------+----------+ | | DataChangeEventCenter | |sessionCacheService | | +--------+--------------+ +--------------------+ | | | | Datum | | | v | +--------+-------------+ | | DataChangeEventQueue | | +--------+-------------+ | | | | | | ChangeData | v | +-------+-----------+ | | DataChangeHandler | + +-------------------+
0x04 DataChangeHandler处理
于是我们接着进行 DataChangeHandler 处理。即总述中提到的:DataChangeHandler 会把这个事件变更信息:
- 把这个事件变更信息变成Operator,放到AbstractAcceptorStore;
- 通过 ChangeNotifier 对外发布,通知其他节点进行数据同步;
下面我们从第一部分 :把这个事件变更信息变成Operator,放到AbstractAcceptorStore 出发,进行讲解日志操作。
即如图所示:
+ Session Server | Data Server | | | + +--------------------------+ PublishDataRequest +--------------------+ | DataChangeFetchCloudTask +---------------+-----> | PublishDataHandler | +-----------+--------------+ | +------+-------------+ ^ | | | getValues | | onChange(Publisher) | | v | | +--------+--------------+ +---------+----------+ | | DataChangeEventCenter | |sessionCacheService | | +--------+--------------+ +--------------------+ | | | | Datum | | | v | +--------+-------------+ | | DataChangeEventQueue | | +--------+-------------+ | | | | | | ChangeData | v | +-------+-----------+ | | DataChangeHandler | | +-------+-----------+ | | | | | v | +-------+---------+ | | ChangeNotifier | | +-------+---------+ | | | | | v | +----------+------------+ | | AbstractAcceptorStore | | +-----------------------+ +
Acceptor的appendOperator谁来调用?在Notifier 里面有,比如:
public class BackUpNotifier implements IDataChangeNotifier { @Autowired private SyncDataService syncDataService; @Override public void notify(Datum datum, Long lastVersion) { syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum, DataSourceTypeEnum.BACKUP)); } }
以及另一个:
public class SnapshotBackUpNotifier implements IDataChangeNotifier { @Autowired private SyncDataService syncDataService; @Override public void notify(Datum datum, Long lastVersion) { syncDataService.appendOperator(new SnapshotOperator(datum.getVersion(), lastVersion, datum, DataSourceTypeEnum.BACKUP)); } }
0x05 AbstractAcceptorStore存储
AbstractAcceptorStore是日志存储,我们下面详细分析。
5.1 Bean
对于操作信息,提供了一个Bean来存储。
@Bean public AcceptorStore localAcceptorStore() { return new LocalAcceptorStore(); }
5.2 StoreServiceFactory
作用是在 storeServiceMap 中存放各种 AcceptorStore,目前只有LocalAcceptorStore 这一个。
public class StoreServiceFactory implements ApplicationContextAware { private static MapstoreServiceMap = new HashMap<>(); /** * get AcceptorStore by storeType * @param storeType * @return */ public static AcceptorStore getStoreService(String storeType) { return storeServiceMap.get(storeType); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Mapmap = applicationContext.getBeansOfType(AcceptorStore.class); map.forEach((key, value) -> storeServiceMap.put(value.getType(), value)); } }
5.3 AbstractAcceptorStore
AbstractAcceptorStore 是存储的基本实现类,几个基本成员是。
acceptors :是一个矩阵,按照dataCenter,dataInfoId维度来分类,存储了此维度下的Acceptor;就是说,针对每一个dataCenter,dataInfoId的组合,都有一个Acceptor,用来存储这下面的Operator。
notifyAcceptorsCache :是一个矩阵,按照dataCente,dataInfoId维度来分类,缓存了此维度下需要进一步处理的Acceptor;
delayQueue :配合notifyAcceptorsCache使用,针对notifyAcceptorsCache的每一个新acceptor,系统会添加一个消息进入queue,这个queue等延时到了,就会取出,并且从notifyAcceptorsCache取出对应的新acceptor进行相应处理;
按说应该是 cache 有东西,所以dequeue 时候就会取出来,但是如果这期间多放入了几个进入 Cache,原有cache 的 value 只是被替换而已,等时间到了,也会取出来。
notifyAcceptorsCache 也是按照 data center 来控制的,只有定期 removeCache。
public abstract class AbstractAcceptorStore implements AcceptorStore { private static final int DEFAULT_MAX_BUFFER_SIZE = 30; @Autowired protected IMetaServerService metaServerService; @Autowired private Exchange boltExchange; @Autowired private DataServerConfig dataServerConfig; @Autowired private DataServerConnectionFactory dataServerConnectionFactory; @Autowired private DatumCache datumCache; private Map<String/*dataCenter*/, Map> acceptors = new ConcurrentHashMap<>(); private Map<String/*dataCenter*/, Map> notifyAcceptorsCache = new ConcurrentHashMap<>(); private DelayQueue<DelayItem> delayQueue }
具体如下图:
+-----------------------------+ +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> |[AbstractAcceptorStore] | | | | +-> dataCenter +---+ | | | | | acceptors +--------------->+ +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> | | | | notifyAcceptorsCache | | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> | + | +-> dataCenter +-->+ +-----------------------------+ | | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> | | | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> | +-> dataCenter +-->+ | | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> +-------------------->+ | +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator> +-> dataCenter +---+ +--> dataInfoId +---> Acceptor +--> Map<>ersion, Operator>
手机如图:
有一点需要说明,就是delayQueue 为何要延迟队列。这是由于SOFA的“秒级服务上下线通知“特性造成的。
因为要实现此特性,所以涉及到了一个连接敏感性问题,即在 SOFARegistry 里,所有 Client 都与 SessionServer 保持长连接,每条长连接都会有基于 bolt 的连接心跳,如果连接断开,Client 会马上重新建连,时刻保证 Client 与 SessionServer 之间有可靠的连接。
因为强烈的连接敏感性,所以导致如果只是网络问题导致连接断开,实际的进程并没有宕机,那么 Client 会马上重连 SessionServer 并重新注册所有服务数据。这种大量的短暂的服务下线后又重新上线会给用户带来困扰和麻烦。
因此在 DataServer 内部实现了数据延迟合并的功能,就是这里的DelayQueue。
5.4 加入
addOperator的基本逻辑是:
- 从Operator的Datum中提取dataCenter和dataInfoId;
- 从acceptors取出dataCenter对应的Map
- 从acceptorMap中提取dataInfoId对应的existAcceptor;
- 如果新operator是SnapshotOperator类型,则清除之前的 opeator queue。
- 否则加入新operator;
- 使用putCache(existAcceptor);把目前的Acceptor加入Cache,定时任务会处理;
在操作中,都是使用putIfAbsent,这样短期内若有多个同样value插入,则不会替换原有的value,这样 起到了归并作用。
@Override public void addOperator(Operator operator) { Datum datum = operator.getDatum(); String dataCenter = datum.getDataCenter(); String dataInfoId = datum.getDataInfoId(); try { MapacceptorMap = acceptors.get(dataCenter); if (acceptorMap == null) { MapnewMap = new ConcurrentHashMap<>(); acceptorMap = acceptors.putIfAbsent(dataCenter, newMap); if (acceptorMap == null) { acceptorMap = newMap; } } Acceptor existAcceptor = acceptorMap.get(dataInfoId); if (existAcceptor == null) { Acceptor newAcceptor = new Acceptor(DEFAULT_MAX_BUFFER_SIZE, dataInfoId, dataCenter, datumCache); existAcceptor = acceptorMap.putIfAbsent(dataInfoId, newAcceptor); if (existAcceptor == null) { existAcceptor = newAcceptor; } } if (operator instanceof SnapshotOperator) { //snapshot: clear the queue, Make other data retrieve the latest memory data existAcceptor.clearBefore(); } else { existAcceptor.appendOperator(operator); } //put cache putCache(existAcceptor); } }
putCache的作用是:
- 从acceptor中提取dataCenter和dataInfoId;
- 从notifyAcceptorsCache中取出dataCenter对应的Map
- 向acceptorMap中放入dataInfoId对应的acceptor;
- 如果acceptorMap中之前没有对应的value,则把acceptor放入delayQueue;
这里也使用putIfAbsent,这样短期内若有多个同样value插入,则不会替换原有的value,这样 起到了归并作用。
private void putCache(Acceptor acceptor) { String dataCenter = acceptor.getDataCenter(); String dataInfoId = acceptor.getDataInfoId(); try { MapacceptorMap = notifyAcceptorsCache.get(dataCenter); if (acceptorMap == null) { MapnewMap = new ConcurrentHashMap<>(); acceptorMap = notifyAcceptorsCache.putIfAbsent(dataCenter, newMap); if (acceptorMap == null) { acceptorMap = newMap; } } Acceptor existAcceptor = acceptorMap.putIfAbsent(dataInfoId, acceptor); if (existAcceptor == null) { addQueue(acceptor); } } }
5.5 使用
具体消费是在定期任务中完成。消费日志的目的就是同步日志操作给其他 DataServer。
5.5.1 Scheduler
Scheduler类是定期任务,会启动两个线程池定期调用AcceptorStore的函数。
public class Scheduler { private final ScheduledExecutorService scheduler; public final ExecutorService versionCheckExecutor; private final ThreadPoolExecutor expireCheckExecutor; @Autowired private AcceptorStore localAcceptorStore; public Scheduler() { scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler")); expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck")); versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory( "SyncDataScheduler-versionChangeCheck")); } public void startScheduler() { scheduler.schedule( new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3, TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()), 30, TimeUnit.SECONDS); versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck()); } }
AbstractAcceptorStore中函数如下:
5.5.2 changeDataCheck
changeDataCheck 内部是一个while true,所以不需要再使用线程池。
changeDataCheck绑定在delayQueue上,如果有新消息,则会取出Acceptor,也从notifyAcceptorsCache取出Acceptor,调用notifyChange(acceptor);进行处理 。
@Override public void changeDataCheck() { while (true) { try { DelayItemdelayItem = delayQueue.take(); Acceptor acceptor = delayItem.getItem(); removeCache(acceptor); // compare and remove } catch (InterruptedException e) { break; } catch (Throwable e) { LOGGER.error(e.getMessage(), e); } } }
消费Cache用到的是removeCache。
private void removeCache(Acceptor acceptor) { String dataCenter = acceptor.getDataCenter(); String dataInfoId = acceptor.getDataInfoId(); try { MapacceptorMap = notifyAcceptorsCache.get(dataCenter); if (acceptorMap != null) { boolean result = acceptorMap.remove(dataInfoId, acceptor); if (result) { //data change notify notifyChange(acceptor); } } } }
5.5.2.1 通知NotifyDataSyncRequest
在removeCache中,也使用notifyChange进行了通知,逻辑如下:
- 从acceptor中提取 DataInfoId;
- 根据DataInfoId从meta service中获取dataServerNodes的ip;
- 遍历ip,通过bolt server进行通知syncServer.sendSync,就是给ip对应的data center发送 NotifyDataSyncRequest;
private void notifyChange(Acceptor acceptor) { Long lastVersion = acceptor.getLastVersion(); NotifyDataSyncRequest request = new NotifyDataSyncRequest(acceptor.getDataInfoId(), acceptor.getDataCenter(), lastVersion, getType()); ListtargetDataIps = getTargetDataIp(acceptor.getDataInfoId()); for (String targetDataIp : targetDataIps) { if (DataServerConfig.IP.equals(targetDataIp)) { continue; } Server syncServer = boltExchange.getServer(dataServerConfig.getSyncDataPort()); for (int tryCount = 0; tryCount < dataServerConfig.getDataSyncNotifyRetry(); tryCount++) { try { Connection connection = dataServerConnectionFactory.getConnection(targetDataIp); if (connection == null) { TimeUtil.randomDelay(1000); continue; } syncServer.sendSync(syncServer.getChannel(connection.getRemoteAddress()), request, 1000); break; } } } }
这部分的调用逻辑为:versionCheckExecutor.execute ------- localAcceptorStore.changeDataChheck ------ removeCache ----- notifyChange ------ NotifyDataSyncRequest
。
具体如下图:
+--------------------------+ | | +----------------------------------------------------------------------+ | versionCheckExecutor | | [AbstractAcceptorStore] | | | | | +--------+-----------------+ | | | | | | | | | | | | | Map<dataCenter, Map> acceptors | | changeDataCheck | | +---------------------------> Map<dataCenter, Map> notifyAcceptorsCache | removeCache / notifyChange | | + +----------------------------------------------------------------------+ | | | | NotifyDataSyncRequest | v +------+-----------+ | Other DataServer | +------------------+
手机如下图:
5.5.3 checkAcceptorsChangAndExpired
checkAcceptorsChangAndExpired作用是遍历acceptors每个acceptor,看看是否expired,进行处理。
@Override public void checkAcceptorsChangAndExpired() { acceptors.forEach((dataCenter, acceptorMap) -> { if (acceptorMap != null && !acceptorMap.isEmpty()) { acceptorMap.forEach((dataInfoId, acceptor) -> acceptor.checkExpired(0)); } }); }
此时,逻辑如下:
+--------------------------+ +------------------------+ | | +----------------------------------------------------------------------+ | | | versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor | | | | | | | +--------+-----------------+ | | +--------------+---------+ | | | | | | | | | | | | | | Map<dataCenter, Map> acceptors Map<dataCenter, Map> notifyAcceptorsCache | removeCache / notifyChange | | + +----------------------------------------------------------------------+ | | | | NotifyDataSyncRequest | v +------+-----------+ | Other DataServer | +------------------+
手机如下:
0x06 Acceptor日志操作
这里记录了日志,即记录了所有的Datum操作。
操作日志存储采用Queue方式,获取日志时候通过当前版本号在堆栈内所在位置,把所有版本之后的操作日志同步过来执行。
public class Acceptor { private final String dataInfoId; private final String dataCenter; private int maxBufferSize; static final int DEFAULT_DURATION_SECS = 30; private final DequelogOperatorsOrder = new ConcurrentLinkedDeque<>(); private MaplogOperators = new ConcurrentHashMap<>(); private final DatumCache datumCache; }
关键变量是:
- logOperators:按照版本号为key存储的map,用来存储所有的Operator;
- logOperatorsOrder:因为map没有办法排序,所以设置此queue来存储版本号;
Operator 就是每一步操作对应的Datum。
public class Operator { private Long version; private Long sourceVersion; private Datum datum; private DataSourceTypeEnum sourceType; }
6.1 appendOperator
此函数作用是:添加一个操作日志。
- 如果queue已经满了,则取出第一个消息,为了向后段插入一个新的 。
- 如果Operator版本号为空,则设置为0L;
- 如果Operator的前一个版本号与queue尾部Operator版本号不一致,说明queue里面不对了,需要清空map和queue。
- 向map中加入Operator;
- 如果是新版本的Operator,则把版本加入queue;
具体代码如下:
/** * append operator to queue,if queue is full poll the first element and append. * Process will check version sequence,it must append with a consequent increase in version, * otherwise queue will be clean * * @param operator */ public void appendOperator(Operator operator) { write.lock(); try { if (isFull()) { logOperators.remove(logOperatorsOrder.poll()); } if (operator.getSourceVersion() == null) { operator.setSourceVersion(0L); } Long tailVersion = logOperatorsOrder.peekLast(); if (tailVersion != null) { //operation add not by solid sequence if (tailVersion.longValue() != operator.getSourceVersion().longValue()) { clearBefore(); } } Operator previousOperator = logOperators.put(operator.getVersion(), operator); if (previousOperator == null) { logOperatorsOrder.add(operator.getVersion()); } } finally { write.unlock(); } }
appendOperator谁来调用?在Notifier 里面有,比如:
public class BackUpNotifier implements IDataChangeNotifier { @Autowired private SyncDataService syncDataService; @Override public void notify(Datum datum, Long lastVersion) { syncDataService.appendOperator(new Operator(datum.getVersion(), lastVersion, datum, DataSourceTypeEnum.BACKUP)); } }
以及
public class SnapshotBackUpNotifier implements IDataChangeNotifier { @Autowired private SyncDataService syncDataService; @Override public void notify(Datum datum, Long lastVersion) { syncDataService.appendOperator(new SnapshotOperator(datum.getVersion(), lastVersion, datum, DataSourceTypeEnum.BACKUP)); } }
6.2 checkExpired
此方法作用是去除过期日志。version是时间戳,所以可以定期check,如果过期,就清除。
public void checkExpired(int durationSEC) { write.lock(); try { //check all expired Long peekVersion = logOperatorsOrder.peek(); if (peekVersion != null && isExpired(durationSEC, peekVersion)) { logOperators.remove(logOperatorsOrder.poll()); checkExpired(durationSEC); } } finally { write.unlock(); } }
0x07 NotifyDataSyncRequest通知数据同步
此请求作用是通知接收端进行数据同步。
回忆下这部分的调用逻辑为:versionCheckExecutor.execute ------- localAcceptorStore.changeDataChheck ------ removeCache ----- notifyChange ------ NotifyDataSyncRequest
。
7.1 NotifyDataSyncHandler
接收端data server通过NotifyDataSyncHandler处理
public class NotifyDataSyncHandler extends AbstractClientHandlerimplements AfterWorkingProcess { @Autowired private DataServerConfig dataServerConfig; @Autowired private GetSyncDataHandler getSyncDataHandler; @Autowired private DataChangeEventCenter dataChangeEventCenter; private Executor executor = ExecutorFactory .newFixedThreadPool( 10, NotifyDataSyncHandler.class .getSimpleName()); private ThreadPoolExecutor notifyExecutor; @Autowired private DataNodeStatus dataNodeStatus; @Autowired private DatumCache datumCache; }
7.1.1 doHandle
doHandle方法用来继续处理。
@Override public Object doHandle(Channel channel, NotifyDataSyncRequest request) { final Connection connection = ((BoltChannel) channel).getConnection(); if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) { noWorkQueue.add(new SyncDataRequestForWorking(connection, request)); return CommonResponse.buildSucce***esponse(); } executorRequest(connection, request); return CommonResponse.buildSucce***esponse(); }
7.1.2 executorRequest
因为接到了发起端DataServer的同步通知NotifyDataSyncRequest,所以接收端DataServer主动发起拉取,进行同步数据。即调用GetSyncDataHandler来发送SyncDataRequest
private void executorRequest(Connection connection, NotifyDataSyncRequest request) { executor.execute(() -> { fetchSyncData(connection, request); }); } protected void fetchSyncData(Connection connection, NotifyDataSyncRequest request) { String dataInfoId = request.getDataInfoId(); String dataCenter = request.getDataCenter(); Datum datum = datumCache.get(dataCenter, dataInfoId); Long version = (datum == null) ? null : datum.getVersion(); Long requestVersion = request.getVersion(); if (version == null || requestVersion == 0L || version < requestVersion) { getSyncDataHandler.syncData(new SyncDataCallback(getSyncDataHandler, connection, new SyncDataRequest(dataInfoId, dataCenter, version, request.getDataSourceType()), dataChangeEventCenter)); } }
7.1.3 GetSyncDataHandler
GetSyncDataHandler和SyncDataCallback配合。
即调用GetSyncDataHandler来发送SyncDataRequest,用SyncDataCallback接收同步结果。
├── remoting │ ├── dataserver │ │ ├── DataServerConnectionFactory.java │ │ ├── DataServerNodeFactory.java │ │ ├── GetSyncDataHandler.java │ │ ├── SyncDataCallback.java │ │ ├── handler │ │ └── task
GetSyncDataHandler 和 SyncDataCallback 这两个辅助类的位置比较奇怪,大概因为是功能类,所以放在dataserver目录下,个人认为也许单独设置一个目录存放更好。
public class GetSyncDataHandler { @Autowired private DataNodeExchanger dataNodeExchanger; public void syncData(SyncDataCallback callback) { int tryCount = callback.getRetryCount(); if (tryCount > 0) { try { callback.setRetryCount(--tryCount); dataNodeExchanger.request(new Request() { @Override public Object getRequestBody() { return callback.getRequest(); } @Override public URL getRequestUrl() { return new URL(callback.getConnection().getRemoteIP(), callback .getConnection().getRemotePort()); } @Override public CallbackHandler getCallBackHandler() { return new CallbackHandler() { @Override public void onCallback(Channel channel, Object message) { callback.onResponse(message); } @Override public void onException(Channel channel, Throwable exception) { callback.onException(exception); } @Override public Executor getExecutor() { return callback.getExecutor(); } }; } }); } } } }
7.1.4 SyncDataCallback
这里接收同步结果。
public class SyncDataCallback implements InvokeCallback { private static final Executor EXECUTOR = ExecutorFactory.newFixedThreadPool(5, SyncDataCallback.class.getSimpleName()); private static final int RETRY_COUNT = 3; private Connection connection; private SyncDataRequest request; private GetSyncDataHandler getSyncDataHandler; private int retryCount; private DataChangeEventCenter dataChangeEventCenter; @Override public void onResponse(Object obj) { GenericResponseresponse = (GenericResponse) obj; if (!response.isSuccess()) { getSyncDataHandler.syncData(this); } else { SyncData syncData = response.getData(); Collectiondatums = syncData.getDatums(); DataSourceTypeEnum dataSourceTypeEnum = DataSourceTypeEnum.valueOf(request .getDataSourceType()); if (syncData.getWholeDataTag()) { //handle all data, replace cache with these datum directly for (Datum datum : datums) { if (datum == null) { datum = new Datum(); datum.setDataInfoId(syncData.getDataInfoId()); datum.setDataCenter(syncData.getDataCenter()); } Datum.internDatum(datum); dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum); break; } } else { //handle incremental data one by one if (!CollectionUtils.isEmpty(datums)) { for (Datum datum : datums) { if (datum != null) { Datum.internDatum(datum); dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE, dataSourceTypeEnum, datum); } } } } } } }
此时逻辑如下:
[Sender DataServer] +--------------------------+ +------------------------+ | | +----------------------------------------------------------------------+ | | | versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor | | | | | | | +--------+-----------------+ | | +--------------+---------+ | | | | | | | | | | | | | | Map<dataCenter, Map> acceptors Map<dataCenter, Map> notifyAcceptorsCache | removeCache / notifyChange | | + +----------------------------------------------------------------------+ | NotifyDataSyncRequest| 1 ^ 2 | | +-------------------------------------------------------------------------------------------------------------------------------------------+ | | SyncDataRequest v | +-------+-----------------------------------+ |[Other DataServer] | | | | | | | | | + | | GetSyncDataHandler SyncDataCallback | | | | | | | | | +-------------------------------------------+
手机如图:
0x08 SyncDataRequest回送通知
SyncDataRequest发送回通知发送者。所以这里是other DataServer 发送给 Sender DataServer。
8.1 SyncDataRequest
public class SyncDataRequest implements Serializable { private String dataInfoId; private String dataCenter; private String dataSourceType; /** * be null when dataInfoId not exist in local datumCache */ private Long version; }
8.1.1 SyncDataRequest 从哪里来
我们回忆下,SyncDataRequest 从哪里来?在 NotifyDataSyncHandler 的响应函数中,会产生 SyncDataRequest。这里会根据请求的信息,从cache之中获取infoId对应的version,然后发送请求。
public class NotifyDataSyncHandler extends AbstractClientHandlerimplements AfterWorkingProcess { protected void fetchSyncData(Connection connection, NotifyDataSyncRequest request) { String dataInfoId = request.getDataInfoId(); String dataCenter = request.getDataCenter(); Datum datum = datumCache.get(dataCenter, dataInfoId); Long version = (datum == null) ? null : datum.getVersion(); Long requestVersion = request.getVersion(); if (version == null || requestVersion == 0L || version < requestVersion) { getSyncDataHandler.syncData(new SyncDataCallback(getSyncDataHandler, connection, new SyncDataRequest(dataInfoId, dataCenter, version, request.getDataSourceType()), dataChangeEventCenter)); } } }
进而在AbstractAcceptorStore之中
private void notifyChange(Acceptor acceptor) { Long lastVersion = acceptor.getLastVersion(); //may be delete by expired if (lastVersion == null) { lastVersion = 0L; } NotifyDataSyncRequest request = new NotifyDataSyncRequest(acceptor.getDataInfoId(), acceptor.getDataCenter(), lastVersion, getType()); syncServer.sendSync(syncServer.getChannel(connection.getRemoteAddress()), request, 1000); }
8.2 syncDataHandler
通知发起者使用 SyncDataHandler 来处理。
- syncDataHandler
节点间数据同步 Handler,该 Handler 被触发时,会通过版本号进行比对,若当前 DataServer 所存储数据版本号含有当前请求版本号,则会返回所有大于当前请求数据版本号的所有数据,便于节点间进行数据同步。
public class SyncDataHandler extends AbstractServerHandler{ @Autowired private SyncDataService syncDataService; @Override public Object doHandle(Channel channel, SyncDataRequest request) { SyncData syncData = syncDataService.getSyncDataChange(request); return new GenericResponse().fillSucceed(syncData); } @Override public HandlerType getType() { return HandlerType.PROCESSER; } @Override public Class interest() { return SyncDataRequest.class; } @Override protected Node.NodeType getConnectNodeType() { return Node.NodeType.DATA; } }
8.3 SyncDataServiceImpl
具体业务服务是SyncDataServiceImpl。会从acceptorStore获取data,即getSyncDataChange方法。
public class SyncDataServiceImpl implements SyncDataService { @Override public void appendOperator(Operator operator) { AcceptorStore acceptorStore = StoreServiceFactory.getStoreService(operator.getSourceType() .toString()); if (acceptorStore != null) { acceptorStore.addOperator(operator); } } @Override public SyncData getSyncDataChange(SyncDataRequest syncDataRequest) { AcceptorStore acceptorStore = StoreServiceFactory.getStoreService(syncDataRequest .getDataSourceType()); if (acceptorStore != null) { return acceptorStore.getSyncData(syncDataRequest); } } }
关于appendOperator如何调用,前文有描述。
SyncDataServiceImpl会继续调用到AbstractAcceptorStore。
8.4 AbstractAcceptorStore
根据dataCenter和dataInfoId获取出Acceptor,然后返回其process后的数据。
@Override public SyncData getSyncData(SyncDataRequest syncDataRequest) { String dataCenter = syncDataRequest.getDataCenter(); String dataInfoId = syncDataRequest.getDataInfoId(); Long currentVersion = syncDataRequest.getVersion(); try { MapacceptorMap = acceptors.get(dataCenter); Acceptor existAcceptor = acceptorMap.get(dataInfoId); return existAcceptor.process(currentVersion); } }
8.5 Acceptor
然后是Acceptor的处理。
处理发送数据的当前版本号,如果当前版本号存在于当前queue中,返回所有版本号大于当前版本号的Operator,否则所有Operator。
public SyncData process(Long currentVersion) { read.lock(); try { Collectionoperators = acceptOperator(currentVersion); ListretList = new LinkedList<>(); SyncData syncData; boolean wholeDataTag = false; if (operators != null) { //first get all data if (operators.isEmpty()) { wholeDataTag = true; retList.add(datumCache.get(dataCenter, dataInfoId)); } else { for (Operator operator : operators) { retList.add(operator.getDatum()); } } syncData = new SyncData(dataInfoId, dataCenter, wholeDataTag, retList); } else { //no match get all data wholeDataTag = true; retList.add(datumCache.get(dataCenter, dataInfoId)); syncData = new SyncData(dataInfoId, dataCenter, wholeDataTag, retList); } return syncData; } finally { read.unlock(); } }
同步数据结构如下:
public class SyncData implements Serializable { private String dataInfoId; private String dataCenter; private Collectiondatums; private boolean wholeDataTag; }
此时图示如下:
[Sender DataServer] +--------------------------+ +------------------------+ | | +----------------------------------------------------------------------+ | | | versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor | | | | | | | +--------+-----------------+ | | +--------------+---------+ | | | | | | | | | | | | | | Map<dataCenter, Map> acceptors Map<dataCenter, Map> notifyAcceptorsCache | removeCache / notifyChange | | + +------------------------------------------------+-----+---------------+ | ^ | NotifyDataSyncRequest| 1 +-----------------+ 3 +--------------------+ 4 | | | | syncDataHandler +------> | SyncDataServiceImpl+------+ | | +-----+-----------+ +--------------------+ | | ^ 2 | | | | 5 | | | +-------------------------------------------------------------------------------------------------------------------------------------------+ | | SyncDataRequest | v | | +-------+-----------------------------------+ | |[Other DataServer] | | | | | | | | | | | | + | | | GetSyncDataHandler SyncDataCallback | <---------------------------+ | | | | | | | | +-------------------------------------------+
手机如下:
0x09 SyncDataCallback接受者回调
回到接受者,遍历接受到的所有Datum,逐一调用:
如果是全部datum,调用
dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum);
否则调用
dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE,dataSourceTypeEnum, datum)
具体如下:
public class SyncDataCallback implements InvokeCallback { private static final Executor EXECUTOR = ExecutorFactory.newFixedThreadPool(5, SyncDataCallback.class.getSimpleName()); private static final int RETRY_COUNT = 3; private Connection connection; private SyncDataRequest request; private GetSyncDataHandler getSyncDataHandler; private int retryCount; private DataChangeEventCenter dataChangeEventCenter; @Override public void onResponse(Object obj) { GenericResponseresponse = (GenericResponse) obj; if (!response.isSuccess()) { getSyncDataHandler.syncData(this); } else { SyncData syncData = response.getData(); Collectiondatums = syncData.getDatums(); DataSourceTypeEnum dataSourceTypeEnum = DataSourceTypeEnum.valueOf(request .getDataSourceType()); if (syncData.getWholeDataTag()) { //handle all data, replace cache with these datum directly for (Datum datum : datums) { if (datum == null) { datum = new Datum(); datum.setDataInfoId(syncData.getDataInfoId()); datum.setDataCenter(syncData.getDataCenter()); } Datum.internDatum(datum); dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, dataSourceTypeEnum, datum); break; } } else { //handle incremental data one by one if (!CollectionUtils.isEmpty(datums)) { for (Datum datum : datums) { if (datum != null) { Datum.internDatum(datum); dataChangeEventCenter.sync(DataChangeTypeEnum.MERGE, dataSourceTypeEnum, datum); } } } } } } }
DataChangeEventCenter调用如下:
public void sync(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum datum) { int idx = hash(datum.getDataInfoId()); DataChangeEvent event = new DataChangeEvent(changeType, sourceType, datum); dataChangeEventQueues[idx].onChange(event); }
DataChangeEventQueue调用handleDatum处理,这部分在其他文章中已经讲述。这里只是贴出代码。
@Override public void run() { if (changeData instanceof SnapshotData) { SnapshotData snapshotData = (SnapshotData) changeData; String dataInfoId = snapshotData.getDataInfoId(); MaptoBeDeletedPubMap = snapshotData.getToBeDeletedPubMap(); MapsnapshotPubMap = snapshotData.getSnapshotPubMap(); Datum oldDatum = datumCache.get(dataServerConfig.getLocalDataCenter(), dataInfoId); long lastVersion = oldDatum != null ? oldDatum.getVersion() : 0l; Datum datum = datumCache.putSnapshot(dataInfoId, toBeDeletedPubMap, snapshotPubMap); long version = datum != null ? datum.getVersion() : 0l; notify(datum, changeData.getSourceType(), null); } else { Datum datum = changeData.getDatum(); String dataCenter = datum.getDataCenter(); String dataInfoId = datum.getDataInfoId(); DataSourceTypeEnum sourceType = changeData.getSourceType(); DataChangeTypeEnum changeType = changeData.getChangeType(); if (changeType == DataChangeTypeEnum.MERGE && sourceType != DataSourceTypeEnum.BACKUP && sourceType != DataSourceTypeEnum.SYNC) { //update version for pub or unPub merge to cache //if the version product before merge to cache,it may be cause small version override big one datum.updateVersion(); } long version = datum.getVersion(); try { if (sourceType == DataSourceTypeEnum.CLEAN) { if (datumCache.cleanDatum(dataCenter, dataInfoId)) { } } else if (sourceType == DataSourceTypeEnum.PUB_TEMP) { notifyTempPub(datum, sourceType, changeType); } else { MergeResult mergeResult = datumCache.putDatum(changeType, datum); Long lastVersion = mergeResult.getLastVersion(); if (lastVersion != null && lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) { return; } //lastVersion null means first add datum if (lastVersion == null || version != lastVersion) { if (mergeResult.isChangeFlag()) { notify(datum, sourceType, lastVersion); } } } } } }
9.1 DataChangeHandler
DataChangeHandler 会定期提取DataChangeEventCenter中的消息,然后进行处理。
ChangeNotifier存储了Datum。因为此时版本号已经更新,所以不会再次通知,至此流程结束。
MergeResult mergeResult = datumCache.putDatum(changeType, datum); //lastVersion null means first add datum if (lastVersion == null || version != lastVersion) { if (mergeResult.isChangeFlag()) { notify(datum, sourceType, lastVersion); } }
此时逻辑如下:
[Sender DataServer] +--------------------------+ +------------------------+ | | +----------------------------------------------------------------------+ | | | versionCheckExecutor | | [AbstractAcceptorStore] | | expireCheckExecutor | | | | | | | +--------+-----------------+ | | +--------------+---------+ | | | | | | | | | | | | | | Map<dataCenter, Map> acceptors Map<dataCenter, Map> notifyAcceptorsCache | removeCache / notifyChange | | + +------------------------------------------------+-----+---------------+ | ^ | NotifyDataSyncRequest| 1 +-----------------+ 3 +--------------------+ 4 | | | | syncDataHandler +------> | SyncDataServiceImpl+------+ | | +-----+-----------+ +--------------------+ | | ^ 2 | | | | 5 | | | +-------------------------------------------------------------------------------------------------------------------------------------------+ | | SyncDataRequest | [Other DataServer] | | | | | | | | | | | +---------------------------------------+ | | | | | | v | v +------+-----------++ +-----------+-------+ 6 +-----------------------+ 7 +--------------------+ 8 +-----------------+ | GetSyncDataHandler| | SyncDataCallback +-----> | DataChangeEventCenter | +--> |DataChangeEventQueue| +--> |DataChangeHandler| +-------------------+ +-------------------+ +-----------------------+ +--------------------+ +-----------------+
手机上如下:
0x10 总结
回顾下“一次服务注册过程”的服务数据在内部流转过程。
- Client 调用 publisher.register 向 SessionServer 注册服务。
- SessionServer 收到服务数据 (PublisherRegister) 后,将其写入内存 (SessionServer 会存储 Client 的数据到内存,用于后续可以跟 DataServer 做定期检查),再根据 dataInfoId 的一致性 Hash 寻找对应的 DataServer,将 PublisherRegister 发给 DataServer。
- DataServer 接收到 PublisherRegister 数据,首先也是将数据写入内存 ,DataServer 会以 dataInfoId 的维度汇总所有 PublisherRegister。同时,DataServer 将该 dataInfoId 的变更事件通知给所有 SessionServer,变更事件的内容是 dataInfoId 和版本号信息 version。
- 同时,异步地,DataServer 以 dataInfoId 维度增量地同步数据给其他副本。因为 DataServer 在一致性 Hash 分片的基础上,对每个分片保存了多个副本(默认是3个副本)。
- SessionServer 接收到变更事件通知后,对比 SessionServer 内存中存储的 dataInfoId 的 version,若发现比 DataServer 发过来的小,则主动向 DataServer 获取 dataInfoId 的完整数据,即包含了所有该 dataInfoId 具体的 PublisherRegister 列表。
- 最后,SessionServer 将数据推送给相应的 Client,Client 就接收到这一次服务注册之后的最新的服务列表数据。
因为篇幅所限,上文讨论的是前两点,本文介绍第三,第四点。如果以后有时间,会介绍最后两点。
0xFF 参考
Eureka系列(六) TimedSupervisorTask类解析
Eureka的TimedSupervisorTask类(自动调节间隔的周期性任务)
java线程池ThreadPoolExecutor类使用详解
Java线程池ThreadPoolExecutor实现原理剖析
深入理解Java线程池:ThreadPoolExecutor
深入理解Java线程池:ThreadPoolExecutor
Java中线程池ThreadPoolExecutor原理探究
蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容
蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路
服务注册中心 Session 存储策略 | SOFARegistry 解析
海量数据下的注册中心 - SOFARegistry 架构介绍
服务注册中心数据分片和同步方案详解 | SOFARegistry 解析
蚂蚁金服开源通信框架SOFABolt解析之超时控制机制及心跳机制
蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析
SOFABolt 源码分析9 - UserProcessor 自定义处理器的设计
SOFABolt 源码分析13 - Connection 事件处理机制的设计
标签:金服,datum,private,源码,Datum,dataInfoId,new,SOFARegistry,public 来源: https://blog.51cto.com/u_15179348/2734011