[从源码学设计]蚂蚁金服SOFARegistry之续约和驱逐
作者:互联网
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。本文为第十五篇,介绍续约和剔除。
[从源码学设计]蚂蚁金服SOFARegistry之续约和驱逐
目录
- [从源码学设计]蚂蚁金服SOFARegistry之续约和驱逐
0x00 摘要
SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。
本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。
本文为第十五篇,介绍续约和剔除。
0x01 业务范畴
续约和剔除是服务注册与发现的重要功能,比如:
1.1 失效剔除
有些时候,我们的服务实例并不一定会正常下线,可能由于内存溢出,网络故障等原因使服务不能正常工作,而服务注册中心未收到”服务下线“的请求。
为了从服务列表中将这些无法提供服务的实例剔除。Server在启动的时候会创建一个定时任务,默认每隔一段时间(默认60s)将当前清单中,超时(默认为90s)没有续约的服务剔除出去。
1.2 服务续约
在注册完服务之后,服务提供者会维护一个心跳用来持续告诉 Server: "我还活着"。以防止 Server 的”剔除任务“将该服务实例从服务列表中排除出去。我们称该操作为服务续约(Renew)。
0x02 DatumLeaseManager
在 Data Server 端,DatumLeaseManager 实现了 “失效剔除” 和 “服务续约 “功能。
2.1 定义
DatumLeaseManager 的主要变量如下:
connectIdRenewTimestampMap 里面会维护每个服务最近一次发送心跳的时间,Eureka 里面也有类似的数据结构;
locksForConnectId :为了每次只有一个线程操作;lock for connectId: every connectId allows only one task to be created;
具体定义如下:
public class DatumLeaseManager implements AfterWorkingProcess { /** record the latest heartbeat time for each connectId, format: connectId -> lastRenewTimestamp */ private final MapconnectIdRenewTimestampMap = new ConcurrentHashMap<>(); /** lock for connectId , format: connectId -> true */ private ConcurrentHashMaplocksForConnectId = new ConcurrentHashMap(); private volatile boolean serverWorking = false; private volatile boolean renewEnable = true; private AsyncHashedWheelTimer datumAsyncHashedWheelTimer; @Autowired private DataServerConfig dataServerConfig; @Autowired private DisconnectEventHandler disconnectEventHandler; @Autowired private DatumCache datumCache; @Autowired private DataNodeStatus dataNodeStatus; private ScheduledThreadPoolExecutor executorForHeartbeatLess; private ScheduledFuture futureForHeartbeatLess; }
2.2 续约
2.2.1 数据结构
在DatumLeaseManager之中,主要是有如下数据结构对续约起作用。
private ConcurrentHashMaplocksForConnectId = new ConcurrentHashMap(); private AsyncHashedWheelTimer datumAsyncHashedWheelTimer;
2.2.2 调用
在如下模块会调用到 review,这些都是 AbstractServerHandler。
public class PublishDataHandler extends AbstractServerHandlerpublic class DatumSnapshotHandler extends AbstractServerHandlerpublic class RenewDatumHandler extends AbstractServerHandlerimplements AfterWorkingProcess public class UnPublishDataHandler extends AbstractServerHandler
2.2.3 续约
DatumLeaseManager 这里会记录最新的时间戳,然后启动scheduleEvictTask。
public void renew(String connectId) { // record the renew timestamp connectIdRenewTimestampMap.put(connectId, System.currentTimeMillis()); // try to trigger evict task scheduleEvictTask(connectId, 0); }
具体如下:
- 如果当前ConnectionId已经被锁定,则返回;
- 启动时间轮,加入一个定时操作,如果时间到,则:
- 释放当前ConnectionId对应的lock;
- 获取当前ConnectionId对应的上次续约时间,如果不存在,说明当前ConnectionId已经被移除,则返回;
- 如果当前状态是不可续约状态,则设置下次定时操作时间,因为If in a non-working state, cannot clean up because the renew request cannot be received at this time;
- 如果上次续约时间已经到期,则使用evict进行驱逐
- 如果没到期,则会调用 scheduleEvictTask(connectId, nextDelaySec); 设置下次操作
具体代码如下:
/** * trigger evict task: if connectId expired, create ClientDisconnectEvent to cleanup datums bind to the connectId * PS: every connectId allows only one task to be created */ private void scheduleEvictTask(String connectId, long delaySec) { delaySec = (delaySec { boolean continued = true; long nextDelaySec = 0; try { // release lock locksForConnectId.remove(connectId); // get lastRenewTime of this connectId Long lastRenewTime = connectIdRenewTimestampMap.get(connectId); if (lastRenewTime == null) { // connectId is already clientOff return; } /* * 1. lastRenewTime expires, then: * - build ClientOffEvent and hand it to DataChangeEventCenter. * - It will not be scheduled next time, so terminated. * 2. lastRenewTime not expires, then: * - trigger the next schedule */ boolean isExpired = System.currentTimeMillis() - lastRenewTime > dataServerConfig.getDatumTimeToLiveSec() * 1000L; if (!isRenewEnable()) { nextDelaySec = dataServerConfig.getDatumTimeToLiveSec(); } else if (isExpired) { int ownPubSize = getOwnPubSize(connectId); if (ownPubSize > 0) { evict(connectId); } connectIdRenewTimestampMap.remove(connectId, lastRenewTime); continued = false; } else { nextDelaySec = dataServerConfig.getDatumTimeToLiveSec() - (System.currentTimeMillis() - lastRenewTime) / 1000L; nextDelaySec = nextDelaySec <= 0 ? 1 : nextDelaySec; } } if (continued) { scheduleEvictTask(connectId, nextDelaySec); } }, delaySec, TimeUnit.SECONDS); }
2.2.4 图示
具体如下图所示
+------------------+ +-------------------------------------------+ |PublishDataHandler| | DatumLeaseManager | +--------+---------+ | | | | newTimeout | | | +----------------------> | doHandle | ^ + | | | | | | | renew | +-----------+--------------+ | | | +--------------> | | AsyncHashedWheelTimer | | | | | +-----+-----+--------------+ | | | | | ^ | | | | | | scheduleEvictTask | | | | evict | + v | | | | <----------------------+ | | +-------------------------------------------+ | | | | | | | | v v
或者如下图所示:
+------------------+ +-------------------+ +------------------------+ |PublishDataHandler| | DatumLeaseManager | | AsyncHashedWheelTimer | +--------+---------+ +--------+----------+ +-----------+------------+ | | new | doHandle +------------------------> | | renew | | +-------------------> | | | | | | | | | scheduleEvictTask | | | | | | newTimeout | | +----------> +------------------------> | | | | | | | | | | | | | | | | No + | | | 0) | | | + | | v | | +--+ scheduleEvictTask | Yes | + v | | evict | | | v v v
2.3 驱逐
2.3.1 数据结构
在DatumLeaseManager之中,主要是有如下数据结构对续约起作用。
private ScheduledThreadPoolExecutor executorForHeartbeatLess; private ScheduledFuture futureForHeartbeatLess;
有两个调用途径,这样在数据变化时,就会看看是否可以驱逐:
- 启动时调用;
- 显式调用;
2.3.2 显式调用
LocalDataServerChangeEventHandler 类中,调用了datumLeaseManager.reset(),随之调用了 evict。
@Override public void doHandle(LocalDataServerChangeEvent localDataServerChangeEvent) { isChanged.set(true); // Better change to Listener pattern localDataServerCleanHandler.reset(); datumLeaseManager.reset(); events.offer(localDataServerChangeEvent); }
DatumLeaseManager的reset调用了scheduleEvictTaskForHeartbeatLess启动了驱逐线程。
public synchronized void reset() { if (futureForHeartbeatLess != null) { futureForHeartbeatLess.cancel(false); } scheduleEvictTaskForHeartbeatLess(); }
2.3.3 启动调用
启动时候,会启动驱逐线程。
@PostConstruct public void init() { ...... executorForHeartbeatLess = new ScheduledThreadPoolExecutor(1, threadFactoryBuilder .setNameFormat("Registry-DatumLeaseManager-ExecutorForHeartbeatLess").build()); scheduleEvictTaskForHeartbeatLess(); }
2.3.4 驱逐
具体驱逐是通过启动了一个定时线程 EvictTaskForHeartbeatLess 来完成。
private void scheduleEvictTaskForHeartbeatLess() { futureForHeartbeatLess = executorForHeartbeatLess.scheduleWithFixedDelay( new EvictTaskForHeartbeatLess(), dataServerConfig.getDatumTimeToLiveSec(), dataServerConfig.getDatumTimeToLiveSec(), TimeUnit.SECONDS); }
当时间端到达之后,会从datumCache获取目前所有connectionId,然后遍历connectionID,看看上次时间戳是否到期,如果到期就驱逐。
/** * evict own connectIds with heartbeat less */ private class EvictTaskForHeartbeatLess implements Runnable { @Override public void run() { // If in a non-working state, cannot clean up because the renew request cannot be received at this time. if (!isRenewEnable()) { return; } SetallConnectIds = datumCache.getAllConnectIds(); for (String connectId : allConnectIds) { Long timestamp = connectIdRenewTimestampMap.get(connectId); // no heartbeat if (timestamp == null) { int ownPubSize = getOwnPubSize(connectId); if (ownPubSize > 0) { evict(connectId); } } } } }
这里调用
private void evict(String connectId) { disconnectEventHandler.receive(new ClientDisconnectEvent(connectId, System .currentTimeMillis(), 0)); }
具体如下图:
+--------------------------------------------------+ | DatumLeaseManager | | | | | | EvictTaskForHeartbeatLess.run | | | | +----------------------------------------------+ | | | | | | | | | | | | | | | | | v | | | | | | | | allConnectIds = datumCache.getAllConnectIds()| | | | | | | | | | | | | | for (allConnectIds) | | | | v | | | | | | | | connectIdRenewTimestampMap | | | | | | | | | | | | | | no heartbeat | | | | v | | | | | | | | evict | | | | | | | +----------------------------------------------+ | +--------------------------------------------------+
2.3.5 驱逐处理业务
2.3.5.1 转发驱逐消息
驱逐消息需要转发出来,就对应到 DisconnectEventHandler . receive 这里,就是 EVENT_QUEUE.add(event);
public class DisconnectEventHandler implements InitializingBean, AfterWorkingProcess { /** * a DelayQueue that contains client disconnect events */ private final DelayQueueEVENT_QUEUE = new DelayQueue<>(); @Autowired private SessionServerConnectionFactory sessionServerConnectionFactory; @Autowired private DataChangeEventCenter dataChangeEventCenter; @Autowired private DataNodeStatus dataNodeStatus; private static final BlockingQueuenoWorkQueue = new LinkedBlockingQueue<>(); public void receive(DisconnectEvent event) { if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) { noWorkQueue.add(event); return; } EVENT_QUEUE.add(event); } }
在 afterPropertiesSet 中会启动一个 Thread,循环从 EVENT_QUEUE 之中取出消息,然后处理,具体就是:
- 从 sessionServerConnectionFactory 之中移除对应的 Connection;
- 给 dataChangeEventCenter 发一个 ClientChangeEvent 通知;
具体如下:
@Override public void afterPropertiesSet() { Executor executor = ExecutorFactory .newSingleThreadExecutor(DisconnectEventHandler.class.getSimpleName()); executor.execute(() -> { while (true) { try { DisconnectEvent disconnectEvent = EVENT_QUEUE.take(); if (disconnectEvent.getType() == DisconnectTypeEnum.SESSION_SERVER) { SessionServerDisconnectEvent event = (SessionServerDisconnectEvent) disconnectEvent; String processId = event.getProcessId(); //check processId confirm remove,and not be registered again when delay time String sessionServerHost = event.getSessionServerHost(); if (sessionServerConnectionFactory .removeProcessIfMatch(processId,sessionServerHost)) { SetconnectIds = sessionServerConnectionFactory .removeConnectIds(processId); if (connectIds != null && !connectIds.isEmpty()) { for (String connectId : connectIds) { unPub(connectId, event.getRegisterTimestamp()); } } } } else { ClientDisconnectEvent event = (ClientDisconnectEvent) disconnectEvent; unPub(event.getConnectId(), event.getRegisterTimestamp()); } } } }); } /** * * @param connectId * @param registerTimestamp */ private void unPub(String connectId, long registerTimestamp) { dataChangeEventCenter.onChange(new ClientChangeEvent(connectId, dataServerConfig .getLocalDataCenter(), registerTimestamp)); }
如下图所示
+--------------------------------------------------+ | DatumLeaseManager | | | | | | EvictTaskForHeartbeatLess.run | | | | +----------------------------------------------+ | | | | | | | | | | | | | | | | | v | | | | | | | | allConnectIds = datumCache.getAllConnectIds()| | | | | | | | | | | | | | for (allConnectIds) | | +------------------------+ | | v | | | | | | | | | DisconnectEventHandler | | | connectIdRenewTimestampMap | | | | | | | | | +-------------+ | | | | | | | | noWorkQueue | | | | | no heartbeat | | | +-------------+ | | | v | | receive | | | | | | | +--------------+ | | | evict +---------------------------------> | EVENT_QUEUE | | | | | | | +--------------+ | | +----------------------------------------------+ | +------------------------+ +--------------------------------------------------+
2.3.5.1 DataChangeEventCenter 转发
逻辑然后来到了 DataChangeEventCenter,这里也是起到转发作用。
public class DataChangeEventCenter { /** * queues of DataChangeEvent */ private DataChangeEventQueue[] dataChangeEventQueues; /** * receive changed publisher, then wrap it into the DataChangeEvent and put it into dataChangeEventQueue * * @param publisher * @param dataCenter */ public void onChange(Publisher publisher, String dataCenter) { int idx = hash(publisher.getDataInfoId()); Datum datum = new Datum(publisher, dataCenter); if (publisher instanceof UnPublisher) { datum.setContainsUnPub(true); } if (publisher.getPublishType() != PublishType.TEMPORARY) { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB, datum)); } else { dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE, DataSourceTypeEnum.PUB_TEMP, datum)); } } }
2.3.5.2 DataChangeEventQueue 处理
具体业务是 DataChangeEventQueue 完成的,就是调用 addTempChangeData 与 handleDatum 处理对应数据,就是处理这些需要驱逐的数据。
当event被取出之后,会根据 DataChangeScopeEnum.DATUM 的不同,会做不同的处理。
- 如果是DataChangeScopeEnum.DATUM,则判断dataChangeEvent.getSourceType();
- 如果是 DataSourceTypeEnum.PUB_TEMP,则addTempChangeData,就是往CHANGE_QUEUE添加ChangeData;
- 如果不是,则handleDatum;
- 如果是DataChangeScopeEnum.CLIENT,则handleClientOff((ClientChangeEvent) event);
- 如果是DataChangeScopeEnum.SNAPSHOT,则handleSnapshot((DatumSnapshotEvent) event);
具体参见前文 从源码学设计]蚂蚁金服SOFARegistry之消息总线异步处理
+--------------------------------------------------+ | DatumLeaseManager | | | | | | EvictTaskForHeartbeatLess.run | | | | +----------------------------------------------+ | | | | | | | | | | | | | | | | | v | | | | | | | | allConnectIds = datumCache.getAllConnectIds()| | | | | | | | | | | | | | for (allConnectIds) | | +------------------------+ | | v | | | | | | | | | DisconnectEventHandler | | | connectIdRenewTimestampMap | | | | | | | | | +-------------+ | | | | | | | | noWorkQueue | | | | | no heartbeat | | | +-------------+ | | | v | | receive | | | | | | | +--------------+ | | | evict +---------------------------------> | EVENT_QUEUE | | | | | | | +--------------+ | | +----------------------------------------------+ | +------------------------+ +--------------------------------------------------+ | | +----------------------+ | onChange | DataChangeEventQueue | v | | +--------+------------------+ | | | DataChangeEventCenter | | +------------+ | | | | | eventQueue | | add DataChangeEvent | | | +------------+ | | +-----------------------+ | | | <-----------------------------+ | | dataChangeEventQueues | | | addTempChangeData | | +-----------------------+ | | | +---------------------------+ | handleDatum | | | +----------------------+
0xFF 参考
蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容
蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路
服务注册中心 Session 存储策略 | SOFARegistry 解析
海量数据下的注册中心 - SOFARegistry 架构介绍
服务注册中心数据分片和同步方案详解 | SOFARegistry 解析
蚂蚁金服开源通信框架SOFABolt解析之超时控制机制及心跳机制
蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析
SOFABolt 源码分析9 - UserProcessor 自定义处理器的设计
SOFABolt 源码分析13 - Connection 事件处理机制的设计
标签:续约,金服,private,connectId,DatumLeaseManager,源码,2.3,SOFARegistry,event 来源: https://blog.51cto.com/u_15179348/2734006