[从源码学设计]蚂蚁金服SOFARegistry之程序基本架构
作者:互联网
之前我们通过三篇文章初步分析了 MetaServer 的基本架构,MetaServer 这三篇文章为我们接下来的工作做了坚实的铺垫。本系列我们接着分析 Data Server,顺带会涉及一些 Session Server。因为 DataServer 和 MetaServer 代码实现和架构的基本套路类似,所以我们主要关心差异点和DataServer的特点。本文会分析DataServer程序的基本架构。
[从源码学设计]蚂蚁金服SOFARegistry之程序基本架构
0x00 摘要
之前我们通过三篇文章初步分析了 MetaServer 的基本架构,MetaServer 这三篇文章为我们接下来的工作做了坚实的铺垫。
本系列我们接着分析 Data Server,顺带会涉及一些 Session Server。因为 DataServer 和 MetaServer 代码实现和架构的基本套路类似,所以我们主要关心差异点和DataServer的特点。
本文会分析DataServer程序的基本架构。
0x01 思路
前面文章专注于系统业务本身,本系列文章会换一种思路,重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。
具体学习方法是:
- 先自行设想业务场景,存在问题以及解决方案。一定要密切联系业务,一切脱离业务谈设计,都是耍流氓;
- 然后看蚂蚁金服源码,看看他们是怎么解决问题的,和自己的方案有何区别。因为蚂蚁金服的当前代码,不一定是在目前应用场景下的最理想方案(比如hash的解决方案),但肯定是在各种力量博弈后的产物,是经历了金融级别锤炼下的最佳实践。
- 因为自己的设想肯定和现实差距很大,所以在研究代码之后,需要调整自己的思路,再次思考。
- 然后看看阿里的解决方案在未来有没有可以改进的地方;
- 最后看看是否可以从源码中提炼出共同点或者是可以复用的办法或者模式。
学习时注意点是:
- 架构设计的本质之一是平衡和妥协。一个系统在不同的时期、不同的数据环境、不同的应用场景下会选择不同的架构,在选择时本质上是在平衡一些重要的点;
- 重点关注算法以及其他相关逻辑在时间和空间上的关系——这样一种逻辑上的架构关系。在一个系统中,这些维度(空间和时间)纵横交错,使得复杂度非常高。我们学习的目的就是分离这些维度,简化之间的交互。
- 不光要看表面的,还要看底层的思路和逻辑。不光要看代码注释中提到的,更要挖掘代码注释中没有提到的;
- 我们既要深入研究一个个孤立的功能/组件/模块,也要在架构的角度和业务场景下重新审视这些模块,这样可以对组件之间的关系有更加深入的理解,可以从全局角度来看这个系统;
- 思维方式的转变才是你最应该在意的部分;
因为会从多个维度来分析设计,比如业务维度和架构维度,因此在本系列中,可能有的文章会集中在模式的总结提取,有的文章会集中在业务实现,有的文章会集中在具体知识点的运用,也会出现 某一个业务模块或者代码段因为业务和实现 在不同文章中被提及的现象,希望大家事先有所了解。
0x02 基本架构&准则
2.1 SOFARegistry 总体架构
首先,我们要回忆下SOFARegistry 总体架构
- Client 层
应用服务器集群。Client 层是应用层,每个应用系统通过依赖注册中心相关的客户端 jar 包,通过编程方式来使用服务注册中心的服务发布和服务订阅能力。
- Session 层
Session 服务器集群。顾名思义,Session 层是会话层,通过长连接和 Client 层的应用服务器保持通讯,负责接收 Client 的服务发布和服务订阅请求。该层只在内存中保存各个服务的发布订阅关系,对于具体的服务信息,只在 Client 层和 Data 层之间透传转发。Session 层是无状态的,可以随着 Client 层应用规模的增长而扩容。
- Data 层
数据服务器集群。Data 层通过分片存储的方式保存着所用应用的服务注册数据。数据按照 dataInfoId(每一份服务数据的唯一标识)进行一致性 Hash 分片,多副本备份,保证数据的高可用。下文的重点也在于随着数据规模的增长,Data 层如何在不影响业务的前提下实现平滑的扩缩容。
- Meta 层
元数据服务器集群。这个集群管辖的范围是 Session 服务器集群和 Data 服务器集群的服务器信息,其角色就相当于 SOFARegistry 架构内部的服务注册中心,只不过 SOFARegistry 作为服务注册中心是服务于广大应用服务层,而 Meta 集群是服务于 SOFARegistry 内部的 Session 集群和 Data 集群,Meta 层能够感知到 Session 节点和 Data 节点的变化,并通知集群的其它节点。
2.2 准则
对于一个程序来说,什么样才算是优秀的架构,其实没有一个放之四海而皆准的标准,关于这方面的书或者文章也有很多,所以我们就从最简单直接的角度,即从结果来想:即静态和动态两方面。
- 静态 :这个角度就是当你拿到一个新代码,你首先会看其目录结构。如果这个程序的目录结构清晰,只看目录结构就能让你能把这个代码逻辑整理出来,只从文件名字就能知道它应该属于什么目录,什么模块,不会出现某一个文件让你觉得其实应该放到另外目录的冲动,那么这个程序从静态角度讲,其架构就是优秀的。
- 动态 :这个角度就是当你只是大概浏览了代码,你闭眼之后,自己能够在脑子中把程序运行模块构建出来,能够知道程序分成几个功能模块,清晰的知道程序的入口,能构架出来其基本功能的流程和内部模块交互逻辑,那么这个程序从动态角度讲,其架构就是优秀的。
比如,假设你程序是基于SpringBoot,那么Bean的构建和分类就非常重要,如果Bean处理得很好,对你整理动态架构是非常有帮助。
下面就开始分析DataServer程序的基本架构。
0x03 目录结构
目录结构如下,我们可以看出来SOFAReistry大致思路,当然因为业务和架构耦合,所以我的分类不一定完全恰当,也有其他分类的方式,具体取决于你自己的思考方式。
- 程序主体:DataApplication;
- 程序入口以及Bean:bootstrap;
程序基础业务功能:
- 网络:remoting;
- 辅助:utils;
- http:resource;
- 缓存:cache;
- 线程:executor;
业务功能:
- renew;
- datasync;
- change;
- event;
- node;
具体目录如下:
. ├── DataApplication.java ├── bootstrap ├── cache ├── change ├── datasync │ └── sync ├── event │ └── handler ├── executor ├── node ├── remoting │ ├── dataserver │ │ ├── handler │ │ └── task │ ├── handler │ ├── metaserver │ │ ├── handler │ │ ├── provideData │ │ │ └── processor │ │ └── task │ └── sessionserver │ ├── disconnect │ ├── forward │ └── handler ├── renew ├── resource └── util
0x04 基本架构
依然是类似MetaServer的路数,使用SpringBoot框架来进行总体搭建。
@EnableDataServer @SpringBootApplication public class DataApplication { public static void main(String[] args) { SpringApplication.run(DataApplication.class, args); } }
EnableDataServer这个注解将引入基本配置 DataServerBeanConfiguration。
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(DataServerBeanConfiguration.class) public @interface EnableDataServer { }
0x05 配置Bean
DataServer是SpringBoot程序。所以大量使用Bean。
DataServerBeanConfiguration 的作用是构建各种相关配置,从其中可以看出来DataServer相关模块和功能。
系统初始化时的 bean 都在 DataServerBeanConfiguration 里面通过 JavaConfig 来注册,主要以如下几个配置类体现(配置类会有变更,具体内容可以参照源码实现):
- DataServerBootstrap
- DataServerBootstrapConfigConfiguration,
- CommonConfig
- DataServerConfig
- DataNodeStatus
- PropertySplitter
- DataServerStorageConfiguration
- DatumCache
- LocalDatumStorage
- LogTaskConfigConfiguration
- CacheDigestTask
- SessionRemotingConfiguration
- jerseyExchange
- boltExchange
- MetaNodeExchanger
- DataNodeExchanger
- DataServerCache
- ForwardService
- SessionServerConnectionFactory
- DataServerConnectionFactory
- MetaServerConnectionFactory
- serverHandlers
- serverSyncHandlers
- dataClientHandlers
- metaClientHandlers
- AfterWorkingProcessHandler
- DatumLeaseManager
- DisconnectEventHandler
- DataServerNotifyBeanConfiguration
- DataChangeHandler
- SessionServerNotifier
- TempPublisherNotifier
- BackUpNotifier
- SnapshotBackUpNotifier
- DataServerSyncBeanConfiguration
- SyncDataService
- LocalAcceptorStore
- syncDataScheduler
- StoreServiceFactory
- DataServerEventBeanConfiguration
- DataServerChangeEventHandler
- LocalDataServerChangeEventHandler
- MetaServerChangeEventHandler
- StartTaskEventHandler
- LocalDataServerCleanHandler
- GetSyncDataHandler
- EventCenter
- DataChangeEventCenter
- DataServerRemotingBeanConfiguration
- ConnectionRefreshTask
- ConnectionRefreshMetaTask
- RenewNodeTask
- List
- DefaultMetaServiceImpl
- ResourceConfiguration
- jerseyResourceConfig
- HealthResource
- DataDigestResource
- ExecutorConfiguration
- publishProcessorExecutor
- renewDatumProcessorExecutor
- getDataProcessorExecutor
- DataProvideDataConfiguration
- ProvideDataProcessorManager
- datumExpireProvideDataProcessor
部分Bean的功能如下:
- DataServerBootstrapConfigConfiguration:该配置类主要作用是提供一些 DataServer 服务启动时基本的 Bean,比如 DataServerConfig 基础配置 Bean、DataNodeStatus 节点状态 Bean、DatumCache 缓存 Bean 等;
- LogTaskConfigConfiguration:该配置类主要用于提供一些日志处理相关的 Bean;
- SessionRemotingConfiguration:该配置类主要作用是提供一些与 SessionServer 相互通信的 Bean,以及连接过程中的一些请求处理 Bean。比如 BoltExchange、JerseyExchange 等用于启动服务的 Bean,还有节点上下线、数据发布等的 Bean,为关键配置类;
- DataServerNotifyBeanConfiguration:该配置类中配置的 Bean 主要用于进行事件通知,如用于处理数据变更的 DataChangeHandler 等;
- DataServerSyncBeanConfiguration:该配置类中配置的 Bean 主要用于数据同步操作;
- DataServerEventBeanConfiguration:该配置类中配置的 Bean 主要用于处理与数据节点相关的事件,如事件中心 EventCenter、数据变化事件中心 DataChangeEventCenter 等;
- DataServerRemotingBeanConfiguration:该配置类中配置的 Bean 主要用于 DataServer 的连接管理;
- ResourceConfiguration:该配置类中配置的 Bean 主要用于提供一些 Rest 接口资源;
- AfterWorkingProcessConfiguration:该配置类中配置一些后处理 Handler Bean,用于处理一些业务逻辑结束后的后处理动作;
- ExecutorConfiguration:该配置类主要配置一些线程池 Bean,用于执行不同的任务;
缩减版代码如下 :
@Configuration @Import(DataServerInitializer.class) @EnableConfigurationProperties public class DataServerBeanConfiguration { @Bean @ConditionalOnMissingBean public DataServerBootstrap dataServerBootstrap() {} @Configuration protected static class DataServerBootstrapConfigConfiguration {} @Configuration public static class DataServerStorageConfiguration {} @Configuration public static class LogTaskConfigConfiguration {} @Configuration public static class SessionRemotingConfiguration {} @Configuration public static class DataServerNotifyBeanConfiguration {} @Configuration public static class DataServerSyncBeanConfiguration {} @Configuration public static class DataServerEventBeanConfiguration {} @Configuration public static class DataServerRemotingBeanConfiguration {} @Configuration public static class ResourceConfiguration {} @Configuration public static class ExecutorConfiguration {} @Configuration public static class DataProvideDataConfiguration {} }
0x06 启动
6.1 入口
DataServer 模块启动入口类为 DataServerInitializer,该类不由 JavaConfig 管理配置,而是继承了 SmartLifecycle 接口,在启动时由 Spring 框架调用其 start 方法。其简略版代码如下:
public class DataServerInitializer implements SmartLifecycle { @Autowired private DataServerBootstrap dataServerBootstrap; @Override public void start() { dataServerBootstrap.start(); this.isRunning = true; } }
该方法中调用了 DataServerBootstrap#start 方法,用于启动一系列的初始化服务。
public void start() { try { openDataServer(); openDataSyncServer(); openHttpServer(); startRaftClient(); fetchProviderData(); startScheduler(); Runtime.getRuntime().addShutdownHook(new Thread(this::doStop)); } }
6.2 启动业务
DataServerBootstrap负责程序的启动,具体如下:
@EnableConfigurationProperties public class DataServerBootstrap { // 节点间的 bolt 通信组件以及其配置 @Autowired private DataServerConfig dataServerConfig; @Resource(name = "serverHandlers") private CollectionserverHandlers; @Resource(name = "serverSyncHandlers") private CollectionserverSyncHandlers; @Autowired private Exchange boltExchange; private Server server; private Server dataSyncServer; // 用于控制的Http 通信组件以及其配置 @Autowired private ApplicationContext applicationContext; @Autowired private ResourceConfig jerseyResourceConfig; @Autowired private Exchange jerseyExchange; private Server httpServer; // JVM 内部的事件通信组件以及其配置 @Autowired private EventCenter eventCenter; // MetaServer Raft相关组件 @Autowired private IMetaServerService metaServerService; @Autowired private DatumLeaseManager datumLeaseManager; // 定时器组件以及其配置 @Autowired private Scheduler syncDataScheduler; @Autowired private CacheDigestTask cacheDigestTask; /** * start dataserver */ public void start() { openDataServer(); // 节点间的 bolt 通信组件以及其配置 openDataSyncServer(); openHttpServer(); // 用于控制的Http 通信组件以及其配置 startRaftClient(); // MetaServer Raft相关组件 fetchProviderData(); startScheduler(); // 定时器组件以及其配置 Runtime.getRuntime().addShutdownHook(new Thread(this::doStop)); } // 节点间的 bolt 通信组件以及其配置 private void openDataServer() { if (serverForSessionStarted.compareAndSet(false, true)) { server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig.getPort()), serverHandlers .toArray(new ChannelHandler[serverHandlers.size()])); } } private void openDataSyncServer() { if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } } // 用于控制的Http 通信组件以及其配置 private void openHttpServer() { if (httpServerStarted.compareAndSet(false, true)) { bindResourceConfig(); httpServer = jerseyExchange.open( new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig .getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig }); } } // MetaServer Raft相关组件 private void startRaftClient() { metaServerService.startRaftClient(); eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); } private void fetchProviderData() { ProvideData provideData = metaServerService .fetchData(ValueConstants.ENABLE_DATA_DATUM_EXPIRE); boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData() .getObject()); datumLeaseManager.setRenewEnable(enableDataDatumExpire); } // 定时器组件以及其配置 private void startScheduler() { if (schedulerStarted.compareAndSet(false, true)) { syncDataScheduler.startScheduler(); // start all startTask except correction task eventCenter.post(new StartTaskEvent( Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW) .collect(Collectors.toSet()))); //start dump log cacheDigestTask.start(); } } }
6.2 核心组件
DataServer 的核心启动类是 DataServerBootstrap,对于其内部模块分类,官方博客主要提及其主要组件 :
该类主要包含了三类组件:节点间的 bolt 通信组件、JVM 内部的事件通信组件、定时器组件。
我这里划分的更加细致,把组件划分为如下:
- 节点间的 bolt 通信组件以及其配置
- DataServerConfig。配置
- boltExchange。bolt组件通讯组件,用来给server和dataSyncServer提供通讯服务;
- server。dataServer 则负责数据相关服务,比如数据服务,获取数据的推送,服务上下线通知等;
- dataSyncServer。dataSyncServer 主要是处理一些数据同步相关的服务;
- serverHandlers。服务响应函数
- serverSyncHandlers。服务响应函数,从其注册的 handler 来看,dataSyncServer 和 dataSever 的职责有部分重叠;
- 用于控制的Http 通信组件以及其配置,提供一系列 REST 接口,用于 dashboard 管理、数据查询等;
- jerseyResourceConfig。配置
- jerseyExchange。jersey组件通讯组件,提供服务;
- applicationContext。注册服务所需;
- httpServer 主要提供一系列 http 接口,用于 dashboard 管理、数据查询等;
- MetaServer相关组件
- metaServerService,用来与MetaServer进行交互,基于raft和Bolt;
- datumLeaseManager,用来维护具体数据;
- JVM 内部的事件通信组件以及其配置
- EventCenter。DataServer 内部逻辑主要是通过事件驱动机制来实现的,一个事件往往会有多个投递源,非常适合用 EventCenter 来解耦事件投递和事件处理之间的逻辑;
- 定时器组件以及其配置
- syncDataScheduler,主要启动了expireCheckExecutor,versionCheckExecutor,即例如定时检测节点信息、定时检测数据版本信息;
- CacheDigestTask,用来定时分析;
6.3 Server组件
6.3.1 DataServer
dataServer 负责数据相关服务,比如数据服务,获取数据的推送,服务上下线通知等;
DataServer是基于Bolt进行通讯。
private void openDataServer() { try { if (serverForSessionStarted.compareAndSet(false, true)) { server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig.getPort()), serverHandlers .toArray(new ChannelHandler[serverHandlers.size()])); } } }
其响应函数为serverHandlers
@Bean(name = "serverHandlers") public CollectionserverHandlers() { Collectionlist = new ArrayList<>(); list.add(getDataHandler()); list.add(clientOffHandler()); list.add(getDataVersionsHandler()); list.add(publishDataProcessor()); list.add(sessionServerRegisterHandler()); list.add(unPublishDataHandler()); list.add(dataServerConnectionHandler()); list.add(renewDatumHandler()); list.add(datumSnapshotHandler()); return list; }
其具体功能如下 :
- getDataHandler:从当前Data节点中获取注册信息数据,若当前节点不处于工作状态,则改为下个节点;
- clientOffHandler:服务订阅者下线;
- getDataVersionsHandler:获取数据版本号;
- publishDataProcessor:服务注册信息发布;
- sessionServerRegisterHandler:sessionServer会话注册;
- unPublishDataHandler :服务下线处理;
- dataServerConnectionHandler:连接管理;
- renewDatumHandler:数据续约管理;
- datumSnapshotHandler:数据快照管理;
6.3.2 DataSyncServer
dataSyncServer 主要是处理一些数据同步相关的服务;也是基于Bolt进行通讯。
private void openDataSyncServer() { try { if (serverForDataSyncStarted.compareAndSet(false, true)) { dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress() .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers .toArray(new ChannelHandler[serverSyncHandlers.size()])); } } }
其响应函数为serverSyncHandlers。
@Bean(name = "serverSyncHandlers") public CollectionserverSyncHandlers() { Collectionlist = new ArrayList<>(); list.add(getDataHandler()); list.add(publishDataProcessor()); list.add(unPublishDataHandler()); list.add(notifyFetchDatumHandler()); list.add(notifyOnlineHandler()); list.add(syncDataHandler()); list.add(dataSyncServerConnectionHandler()); return list; }
其具体功能如下 :
- getDataHandler:获取Data节点注册信息数据;
- publishDataProcessor :服务注册信息发布;
- unPublishDataHandler :服务下线处理;
- notifyFetchDatumHandler :对比版本号,抓去最新服务注册数据;
- notifyOnlineHandler:检查Data节点是否在线;
- syncDataHandler:数据同步;
- dataSyncServerConnectionHandler:连接管理;
6.3.3 HttpServer
HttpServer 是 Http 通信组件,提供一系列 REST 接口,用于 dashboard 管理、数据查询等。
其基于Jersey进行通讯。
private void openHttpServer() { try { if (httpServerStarted.compareAndSet(false, true)) { bindResourceConfig(); httpServer = jerseyExchange.open( new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig .getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig }); } } }
6.3.4 Handler
各 Handler 具体作用如图 3 所示:
图 各 Handler 作用
6.3.5 Raft
Raft相关的是:
- 启动Raft客户端,保证分布式一致性;
- 向 EventCenter 投放MetaServerChangeEvent;
private void startRaftClient() { metaServerService.startRaftClient(); eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap())); }
6.3.6 Scheduler
这个模块辅助各种定期任务,具体作用是:
- 启动数据同步任务;
- 定时检测数据接受者节点变化情况,下线过期节点;
- 启动数据变更轮询;
- 向EventCenter投放消息,以便由这些消息对应的响应函数处理,包括:
- CONNECT_META,具体由 ConnectionRefreshMetaTask处理;
- CONNECT_DATA,具体由 ConnectionRefreshTask 处理;
- VERSION_COMPARE,这个目前没有处理;
- 需要注意的是,RENEW 类型消息在系统启动时候没有投放,而是在 MetaServerChangeEventHandler . registerMetaServer 之中,当注册之后,才会进行投放,以此定期Renew;
- 启动dump log任务;
private void startScheduler() { try { if (schedulerStarted.compareAndSet(false, true)) { syncDataScheduler.startScheduler(); // start all startTask except correction task eventCenter.post(new StartTaskEvent( Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW) .collect(Collectors.toSet()))); //start dump log cacheDigestTask.start(); } } }
6.3.6.1 startScheduler
启动了versionCheckExecutor和scheduler,具体会调用LocalAcceptorStore中的函数进行定期检测。
public class Scheduler { public final ExecutorService versionCheckExecutor; private final ScheduledExecutorService scheduler; private final ThreadPoolExecutor expireCheckExecutor; @Autowired private AcceptorStore localAcceptorStore; /** * constructor */ public Scheduler() { scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler")); expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck")); versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory( "SyncDataScheduler-versionChangeCheck")); } /** * start scheduler */ public void startScheduler() { scheduler.schedule( new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3, TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()), 30, TimeUnit.SECONDS); versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck()); } /** * stop scheduler */ public void stopScheduler() { if (scheduler != null && !scheduler.isShutdown()) { scheduler.shutdown(); } if (versionCheckExecutor != null && !versionCheckExecutor.isShutdown()) { versionCheckExecutor.shutdown(); } } }
6.3.6.2 StartTaskEventHandler
StartTaskEventHandler内部有一个ScheduledExecutorService 和 tasks,一旦StartTaskEventHandler收到一个StartTaskEvent,就会定期调用tasks中的task执行;
@Bean(name = "tasks") public Listtasks() { Listlist = new ArrayList<>(); list.add(connectionRefreshTask()); list.add(connectionRefreshMetaTask()); list.add(renewNodeTask()); return list; }
具体代码如下:
public class StartTaskEventHandler extends AbstractEventHandler{ @Resource(name = "tasks") private Listtasks; private ScheduledExecutorService executor = null; @Override public List<Class> interest() { return Lists.newArrayList(StartTaskEvent.class); } @Override public void doHandle(StartTaskEvent event) { if (executor == null || executor.isShutdown()) { getExecutor(); } for (AbstractTask task : tasks) { if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) { executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(), task.getTimeUnit()); } } } private void getExecutor() { executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass() .getSimpleName()); } }
6.4 处理Task
这里专门就StartTaskEventHandler做简要说明,其就是针对 tasks Bean 里面声明的task,进行启动。
但是具体启动哪些task,则需要依据event里面的设置决定,下面代码中的循环就是看看tasks和event中如何匹配。
for (AbstractTask task : tasks) { if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) { executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(),task.getTimeUnit()); } }
具体代码如下:
public class StartTaskEventHandler extends AbstractEventHandler{ @Resource(name = "tasks") private Listtasks; private ScheduledExecutorService executor = null; @Override public List<Class> interest() { return Lists.newArrayList(StartTaskEvent.class); } @Override public void doHandle(StartTaskEvent event) { if (executor == null || executor.isShutdown()) { getExecutor(); } for (AbstractTask task : tasks) { if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) { executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(), task.getTimeUnit()); } } } private void getExecutor() { executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass() .getSimpleName()); } }
6.4.1 beans
对应的beans,一共三个task。
@Bean(name = "tasks") public Listtasks() { Listlist = new ArrayList<>(); list.add(connectionRefreshTask()); list.add(connectionRefreshMetaTask()); list.add(renewNodeTask()); return list; }
对应了StartTaskTypeEnum中的枚举,其中VersionCompareTask没实现。
public enum StartTaskTypeEnum { /** * ConnectionRefreshMetaTask */ CONNECT_META, /** * ConnectionRefreshDataTask */ CONNECT_DATA, /** * RenewNodeTask */ RENEW, /** * VersionCompareTask */ VERSION_COMPARE }
6.4.2 解耦
我们用 StartTaskEvent 举例,这里使用Set。
public class StartTaskEvent implements Event { private final SetsuitableTypes; public StartTaskEvent(SetsuitableTypes) { this.suitableTypes = suitableTypes; } public SetgetSuitableTypes() { return suitableTypes; } }
在 MetaServerChangeEventHandler 之中,则启动了renew task。
if (obj instanceof NodeChangeResult) { NodeChangeResultresult = (NodeChangeResult) obj; MapversionMap = result.getDataCenterListVersions(); //send renew after first register dataNode Setset = new HashSet<>(); set.add(StartTaskTypeEnum.RENEW); eventCenter.post(new StartTaskEvent(set)); eventCenter.post(new DataServerChangeEvent(result.getNodes(), versionMap, DataServerChangeEvent.FromType.REGISTER_META)); break; }
在启动时候,post了event,但是指定了启动非RENEW task。
private void startScheduler() { try { if (schedulerStarted.compareAndSet(false, true)) { syncDataScheduler.startScheduler(); // start all startTask except correction task eventCenter.post(new StartTaskEvent( Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW) .collect(Collectors.toSet()))); //start dump log cacheDigestTask.start(); } } catch (Exception e) { schedulerStarted.set(false); throw new RuntimeException("Data Scheduler start error!", e); } }
0x07 动态结构
最后动态架构如下,我们也大致知道,DataServer就是一个SpringBoot程序,有几个Server,有若干Bean,有若干定时服务,具体有一些其他业务模块等等,这对我们接下来的理解有帮助。
+---------------------------------------------------------------------------+ | [DataServerBootstrap] | | | | | | +------------------------------------------+ +------------------------+ | | | [Bolt related] | | [http relatged] | | | | | | | | | | DataServerConfig | | httpServer | | | | | | | | | | boltExchange | | jerseyExchange | | | | | | | | | | server +-----------> serverHandlers | | applicationContext | | | | | | | | | | dataSyncServer+----> serverSyncHandlers | | jerseyResourceConfig | | | | | | | | | +------------------------------------------+ +------------------------+ | | +---------------------+ +----------------+ +------------------------+ | | |[meta related] | |[JVM related] | |[Timer related] | | | | | | | | | | | | metaServerService | | | | syncDataScheduler | | | | | | EventCenter | | | | | | datumLeaseManager | | | | CacheDigestTask | | | +---------------------+ +----------------+ | | | | +------------------------+ | +---------------------------------------------------------------------------+
0x08 问题列表
因为从问题出发更有帮助,所以我们总结出一些问题列表,这些我们期望在以后的分析中陆续解决。
- 问题:Datacenter究竟是什么概念?
- 问题:DataServer应该当成什么系统来看?
- 问题:DataServer应该实现什么功能?如何实现?
- 问题:如何维持高可用?
- 问题:如何负载均衡?
- 问题:DataServer之间如何同步?实现数据一致性?
- 问题:SessionServer如何寻址DataServer?
- 问题:客户端如何知道应该联系哪个SessionServer?
- 问题:SessionServer在DataServer内部如何表示,有缓存嘛?
- 问题:hash路由表是什么样的?
- 问题:DataServer如何把信息推送给所有SessionServer?
- 问题:DataServer如何同步给其他DataServer?
- 问题:dataSyncServer 主要是处理一些数据同步相关的服务;dataServer 则负责数据相关服务;两者有什么区别?
- 问题:EventCenter的机制,里面有几种Event?
- 问题:如何轮询MetaServer?
- 问题:如何判断当前机房节点?
- 问题:DataServer集群内部如何数据迁移?
- 问题:SessionServer 和 DataServer 之间的通信,是基于推拉结合的机制?
- 问题:为什么 DataServerBootstrap 之中还有 startRaftClient?
- 问题:MetaServerChangeEventHandler怎么启动,谁来控制,用来做啥?
- 问题:DatumLeaseManager 的作用?
- 问题:SessionServer从DataServer拉取什么?
- 问题:DataServer如何向MetaServer 来renew自己?是否定期?
- 问题:DataServer如何知道,保存其他 DataServer?其他地方用到了吗?
- 问题:需要考虑 DataServer 需要保存什么?
- 问题:版本号用来做什么?
- 问题:DatumCache 用来做什么?
- 问题:为什么要有 AfterWorkingProcess?
- 问题:bolt怎么维护connection?
0xFF 参考
蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容
蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路
服务注册中心 Session 存储策略 | SOFARegistry 解析
海量数据下的注册中心 - SOFARegistry 架构介绍
服务注册中心数据分片和同步方案详解 | SOFARegistry 解析
蚂蚁金服开源通信框架SOFABolt解析之超时控制机制及心跳机制
蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析
SOFABolt 源码分析9 - UserProcessor 自定义处理器的设计
SOFABolt 源码分析13 - Connection 事件处理机制的设计
标签:task,金服,DataServer,private,Bean,源码,new,SOFARegistry,public 来源: https://blog.51cto.com/u_15179348/2734034