编程语言
首页 > 编程语言> > [从源码学设计]蚂蚁金服SOFARegistry之程序基本架构

[从源码学设计]蚂蚁金服SOFARegistry之程序基本架构

作者:互联网

之前我们通过三篇文章初步分析了 MetaServer 的基本架构,MetaServer 这三篇文章为我们接下来的工作做了坚实的铺垫。本系列我们接着分析 Data Server,顺带会涉及一些 Session Server。因为 DataServer 和 MetaServer 代码实现和架构的基本套路类似,所以我们主要关心差异点和DataServer的特点。本文会分析DataServer程序的基本架构。

[从源码学设计]蚂蚁金服SOFARegistry之程序基本架构

0x00 摘要

之前我们通过三篇文章初步分析了 MetaServer 的基本架构,MetaServer 这三篇文章为我们接下来的工作做了坚实的铺垫。

本系列我们接着分析 Data Server,顺带会涉及一些 Session Server。因为 DataServer 和 MetaServer 代码实现和架构的基本套路类似,所以我们主要关心差异点和DataServer的特点。

本文会分析DataServer程序的基本架构。

0x01 思路

前面文章专注于系统业务本身,本系列文章会换一种思路,重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。

具体学习方法是:

学习时注意点是:

因为会从多个维度来分析设计,比如业务维度和架构维度,因此在本系列中,可能有的文章会集中在模式的总结提取,有的文章会集中在业务实现,有的文章会集中在具体知识点的运用,也会出现 某一个业务模块或者代码段因为业务和实现 在不同文章中被提及的现象,希望大家事先有所了解。

0x02 基本架构&准则

2.1 SOFARegistry 总体架构

首先,我们要回忆下SOFARegistry 总体架构

应用服务器集群。Client 层是应用层,每个应用系统通过依赖注册中心相关的客户端 jar 包,通过编程方式来使用服务注册中心的服务发布和服务订阅能力。

Session 服务器集群。顾名思义,Session 层是会话层,通过长连接和 Client 层的应用服务器保持通讯,负责接收 Client 的服务发布和服务订阅请求。该层只在内存中保存各个服务的发布订阅关系,对于具体的服务信息,只在 Client 层和 Data 层之间透传转发。Session 层是无状态的,可以随着 Client 层应用规模的增长而扩容。

数据服务器集群。Data 层通过分片存储的方式保存着所用应用的服务注册数据。数据按照 dataInfoId(每一份服务数据的唯一标识)进行一致性 Hash 分片,多副本备份,保证数据的高可用。下文的重点也在于随着数据规模的增长,Data 层如何在不影响业务的前提下实现平滑的扩缩容。

元数据服务器集群。这个集群管辖的范围是 Session 服务器集群和 Data 服务器集群的服务器信息,其角色就相当于 SOFARegistry 架构内部的服务注册中心,只不过 SOFARegistry 作为服务注册中心是服务于广大应用服务层,而 Meta 集群是服务于 SOFARegistry 内部的 Session 集群和 Data 集群,Meta 层能够感知到 Session 节点和 Data 节点的变化,并通知集群的其它节点。

2.2 准则

对于一个程序来说,什么样才算是优秀的架构,其实没有一个放之四海而皆准的标准,关于这方面的书或者文章也有很多,所以我们就从最简单直接的角度,即从结果来想:即静态和动态两方面。

比如,假设你程序是基于SpringBoot,那么Bean的构建和分类就非常重要,如果Bean处理得很好,对你整理动态架构是非常有帮助。

下面就开始分析DataServer程序的基本架构。

0x03 目录结构

目录结构如下,我们可以看出来SOFAReistry大致思路,当然因为业务和架构耦合,所以我的分类不一定完全恰当,也有其他分类的方式,具体取决于你自己的思考方式。

程序基础业务功能:

业务功能:

具体目录如下:

.
├── DataApplication.java
├── bootstrap
├── cache
├── change
├── datasync
│   └── sync
├── event
│   └── handler
├── executor
├── node
├── remoting
│   ├── dataserver
│   │   ├── handler
│   │   └── task
│   ├── handler
│   ├── metaserver
│   │   ├── handler
│   │   ├── provideData
│   │   │   └── processor
│   │   └── task
│   └── sessionserver
│       ├── disconnect
│       ├── forward
│       └── handler
├── renew
├── resource
└── util

0x04 基本架构

依然是类似MetaServer的路数,使用SpringBoot框架来进行总体搭建。

@EnableDataServer
@SpringBootApplication
public class DataApplication {
    public static void main(String[] args) {
        SpringApplication.run(DataApplication.class, args);
    }
}

EnableDataServer这个注解将引入基本配置 DataServerBeanConfiguration。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(DataServerBeanConfiguration.class)
public @interface EnableDataServer {
}

0x05 配置Bean

DataServer是SpringBoot程序。所以大量使用Bean。

DataServerBeanConfiguration 的作用是构建各种相关配置,从其中可以看出来DataServer相关模块和功能。

系统初始化时的 bean 都在 DataServerBeanConfiguration 里面通过 JavaConfig 来注册,主要以如下几个配置类体现(配置类会有变更,具体内容可以参照源码实现):

部分Bean的功能如下:

缩减版代码如下 :

@Configuration
@Import(DataServerInitializer.class)
@EnableConfigurationProperties
public class DataServerBeanConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public DataServerBootstrap dataServerBootstrap() {}

    @Configuration
    protected static class DataServerBootstrapConfigConfiguration {}

    @Configuration
    public static class DataServerStorageConfiguration {}

    @Configuration
    public static class LogTaskConfigConfiguration {}

    @Configuration
    public static class SessionRemotingConfiguration {}

    @Configuration
    public static class DataServerNotifyBeanConfiguration {}

    @Configuration
    public static class DataServerSyncBeanConfiguration {}

    @Configuration
    public static class DataServerEventBeanConfiguration {}

    @Configuration
    public static class DataServerRemotingBeanConfiguration {}

    @Configuration
    public static class ResourceConfiguration {}

    @Configuration
    public static class ExecutorConfiguration {}

    @Configuration
    public static class DataProvideDataConfiguration {}
}

0x06 启动

6.1 入口

DataServer 模块启动入口类为 DataServerInitializer,该类不由 JavaConfig 管理配置,而是继承了 SmartLifecycle 接口,在启动时由 Spring 框架调用其 start 方法。其简略版代码如下:

public class DataServerInitializer implements SmartLifecycle {

    @Autowired
    private DataServerBootstrap dataServerBootstrap;

    @Override
    public void start() {
        dataServerBootstrap.start();
        this.isRunning = true;
    }
}

该方法中调用了 DataServerBootstrap#start 方法,用于启动一系列的初始化服务。

public void start() {
    try {
        openDataServer();
        openDataSyncServer();
        openHttpServer();
        startRaftClient();
        fetchProviderData();
        startScheduler();
        Runtime.getRuntime().addShutdownHook(new Thread(this::doStop));
    } 
}

6.2 启动业务

DataServerBootstrap负责程序的启动,具体如下:

@EnableConfigurationProperties
public class DataServerBootstrap {

    // 节点间的 bolt 通信组件以及其配置
    @Autowired
    private DataServerConfig                  dataServerConfig;
  
    @Resource(name = "serverHandlers")
    private CollectionserverHandlers;

    @Resource(name = "serverSyncHandlers")
    private CollectionserverSyncHandlers;  
  
    @Autowired
    private Exchange                          boltExchange;

    private Server                            server;

    private Server                            dataSyncServer;  
  
    // 用于控制的Http 通信组件以及其配置
    @Autowired
    private ApplicationContext                applicationContext;    
  
    @Autowired
    private ResourceConfig                    jerseyResourceConfig;

    @Autowired
    private Exchange                          jerseyExchange;
 
    private Server                            httpServer; 
  
    // JVM 内部的事件通信组件以及其配置
    @Autowired
    private EventCenter                       eventCenter;
  
    // MetaServer Raft相关组件
    @Autowired
    private IMetaServerService                metaServerService;
  
    @Autowired
    private DatumLeaseManager                 datumLeaseManager;
  
    // 定时器组件以及其配置
    @Autowired
    private Scheduler                         syncDataScheduler;

    @Autowired
    private CacheDigestTask                   cacheDigestTask;

    /**
     * start dataserver
     */
    public void start() {
            openDataServer(); // 节点间的 bolt 通信组件以及其配置
            openDataSyncServer();

            openHttpServer(); // 用于控制的Http 通信组件以及其配置

            startRaftClient(); // MetaServer Raft相关组件

            fetchProviderData(); 

            startScheduler(); // 定时器组件以及其配置

            Runtime.getRuntime().addShutdownHook(new Thread(this::doStop));
    }

    // 节点间的 bolt 通信组件以及其配置
    private void openDataServer() {
            if (serverForSessionStarted.compareAndSet(false, true)) {
                server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(),
                    dataServerConfig.getPort()), serverHandlers
                    .toArray(new ChannelHandler[serverHandlers.size()]));
            }
    }

    private void openDataSyncServer() {
            if (serverForDataSyncStarted.compareAndSet(false, true)) {
                dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
                    .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
                    .toArray(new ChannelHandler[serverSyncHandlers.size()]));
            }
    }

    // 用于控制的Http 通信组件以及其配置
    private void openHttpServer() {
            if (httpServerStarted.compareAndSet(false, true)) {
                bindResourceConfig();
                httpServer = jerseyExchange.open(
                    new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig
                        .getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig });
            }
    }

    // MetaServer Raft相关组件
    private void startRaftClient() { 
        metaServerService.startRaftClient();
        eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
    }

    private void fetchProviderData() {
        ProvideData provideData = metaServerService
            .fetchData(ValueConstants.ENABLE_DATA_DATUM_EXPIRE);
        boolean enableDataDatumExpire = Boolean.parseBoolean((String) provideData.getProvideData()
            .getObject());
        datumLeaseManager.setRenewEnable(enableDataDatumExpire);
    }

    // 定时器组件以及其配置
    private void startScheduler() {
            if (schedulerStarted.compareAndSet(false, true)) {
                syncDataScheduler.startScheduler();
                // start all startTask except correction task
                eventCenter.post(new StartTaskEvent(
                        Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW)
                                .collect(Collectors.toSet())));

                //start dump log
                cacheDigestTask.start();
            }
    }
}

6.2 核心组件

DataServer 的核心启动类是 DataServerBootstrap,对于其内部模块分类,官方博客主要提及其主要组件 :

该类主要包含了三类组件:节点间的 bolt 通信组件、JVM 内部的事件通信组件、定时器组件。

我这里划分的更加细致,把组件划分为如下:

6.3 Server组件

6.3.1 DataServer

dataServer 负责数据相关服务,比如数据服务,获取数据的推送,服务上下线通知等;

DataServer是基于Bolt进行通讯。

private void openDataServer() {
    try {
        if (serverForSessionStarted.compareAndSet(false, true)) {
            server = boltExchange.open(new URL(NetUtil.getLocalAddress().getHostAddress(),
                dataServerConfig.getPort()), serverHandlers
                .toArray(new ChannelHandler[serverHandlers.size()]));
        }
    } 
}

其响应函数为serverHandlers

@Bean(name = "serverHandlers")
public CollectionserverHandlers() {
    Collectionlist = new ArrayList<>();
    list.add(getDataHandler());
    list.add(clientOffHandler());
    list.add(getDataVersionsHandler());
    list.add(publishDataProcessor());
    list.add(sessionServerRegisterHandler());
    list.add(unPublishDataHandler());
    list.add(dataServerConnectionHandler());
    list.add(renewDatumHandler());
    list.add(datumSnapshotHandler());
    return list;
}

其具体功能如下 :

6.3.2 DataSyncServer

dataSyncServer 主要是处理一些数据同步相关的服务;也是基于Bolt进行通讯。

private void openDataSyncServer() {
    try {
        if (serverForDataSyncStarted.compareAndSet(false, true)) {
            dataSyncServer = boltExchange.open(new URL(NetUtil.getLocalAddress()
                .getHostAddress(), dataServerConfig.getSyncDataPort()), serverSyncHandlers
                .toArray(new ChannelHandler[serverSyncHandlers.size()]));
        }
    }
}

其响应函数为serverSyncHandlers。

@Bean(name = "serverSyncHandlers")
public CollectionserverSyncHandlers() {
    Collectionlist = new ArrayList<>();
    list.add(getDataHandler());
    list.add(publishDataProcessor());
    list.add(unPublishDataHandler());
    list.add(notifyFetchDatumHandler());
    list.add(notifyOnlineHandler());
    list.add(syncDataHandler());
    list.add(dataSyncServerConnectionHandler());
    return list;
}

其具体功能如下 :

6.3.3 HttpServer

HttpServer 是  Http 通信组件,提供一系列 REST 接口,用于 dashboard 管理、数据查询等。

其基于Jersey进行通讯。

private void openHttpServer() {
    try {
        if (httpServerStarted.compareAndSet(false, true)) {
            bindResourceConfig();
            httpServer = jerseyExchange.open(
                new URL(NetUtil.getLocalAddress().getHostAddress(), dataServerConfig
                    .getHttpServerPort()), new ResourceConfig[] { jerseyResourceConfig });
        }
    }
}

6.3.4 Handler

各 Handler 具体作用如图 3 所示:

图 3 各 Handler 作用

图 各 Handler 作用

6.3.5 Raft

Raft相关的是:

private void startRaftClient() {
    metaServerService.startRaftClient();
    eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
}

6.3.6 Scheduler

这个模块辅助各种定期任务,具体作用是:

private void startScheduler() {
    try {
        if (schedulerStarted.compareAndSet(false, true)) {
            syncDataScheduler.startScheduler();
            // start all startTask except correction task
            eventCenter.post(new StartTaskEvent(
                    Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW)
                            .collect(Collectors.toSet())));

            //start dump log
            cacheDigestTask.start();
        }
    } 
}
6.3.6.1 startScheduler

启动了versionCheckExecutor和scheduler,具体会调用LocalAcceptorStore中的函数进行定期检测。

public class Scheduler {

    public final ExecutorService           versionCheckExecutor;
    private final ScheduledExecutorService scheduler;
    private final ThreadPoolExecutor       expireCheckExecutor;

    @Autowired
    private AcceptorStore                  localAcceptorStore;

    /**
     * constructor
     */
    public Scheduler() {
        scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler"));

        expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
            new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck"));

        versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(), new NamedThreadFactory(
                "SyncDataScheduler-versionChangeCheck"));

    }

    /**
     * start scheduler
     */
    public void startScheduler() {

        scheduler.schedule(
                new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3,
                        TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()),
                30, TimeUnit.SECONDS);


        versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck());

    }

    /**
     * stop scheduler
     */
    public void stopScheduler() {
        if (scheduler != null && !scheduler.isShutdown()) {
            scheduler.shutdown();
        }
        if (versionCheckExecutor != null && !versionCheckExecutor.isShutdown()) {
            versionCheckExecutor.shutdown();
        }
    }
}
6.3.6.2 StartTaskEventHandler

StartTaskEventHandler内部有一个ScheduledExecutorService 和 tasks,一旦StartTaskEventHandler收到一个StartTaskEvent,就会定期调用tasks中的task执行;

@Bean(name = "tasks")
public Listtasks() {
    Listlist = new ArrayList<>();
    list.add(connectionRefreshTask());
    list.add(connectionRefreshMetaTask());
    list.add(renewNodeTask());
    return list;
}

具体代码如下:

public class StartTaskEventHandler extends AbstractEventHandler{

    @Resource(name = "tasks")
    private Listtasks;

    private ScheduledExecutorService executor     = null;

    @Override
    public List<Class> interest() {
        return Lists.newArrayList(StartTaskEvent.class);
    }

    @Override
    public void doHandle(StartTaskEvent event) {
        if (executor == null || executor.isShutdown()) {
            getExecutor();
        }

        for (AbstractTask task : tasks) {
            if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) {
                executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(),
                    task.getTimeUnit());
            }
        }
    }

    private void getExecutor() {
        executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass()
            .getSimpleName());
    }
}

6.4 处理Task

这里专门就StartTaskEventHandler做简要说明,其就是针对 tasks Bean 里面声明的task,进行启动。

但是具体启动哪些task,则需要依据event里面的设置决定,下面代码中的循环就是看看tasks和event中如何匹配。

        for (AbstractTask task : tasks) {
            if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) {
                executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(),task.getTimeUnit());
            }
        }

具体代码如下:

public class StartTaskEventHandler extends AbstractEventHandler{

    @Resource(name = "tasks")
    private Listtasks;

    private ScheduledExecutorService executor     = null;

    @Override
    public List<Class> interest() {
        return Lists.newArrayList(StartTaskEvent.class);
    }

    @Override
    public void doHandle(StartTaskEvent event) {
        if (executor == null || executor.isShutdown()) {
            getExecutor();
        }

        for (AbstractTask task : tasks) {
            if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) {
                executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(),
                    task.getTimeUnit());

            }
        }
    }

    private void getExecutor() {
        executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass()
            .getSimpleName());
    }
}

6.4.1 beans

对应的beans,一共三个task。

@Bean(name = "tasks")
public Listtasks() {
    Listlist = new ArrayList<>();
    list.add(connectionRefreshTask());
    list.add(connectionRefreshMetaTask());
    list.add(renewNodeTask());
    return list;
}

对应了StartTaskTypeEnum中的枚举,其中VersionCompareTask没实现。

public enum StartTaskTypeEnum {

    /**
     * ConnectionRefreshMetaTask
     */
    CONNECT_META,

    /**
     * ConnectionRefreshDataTask
     */
    CONNECT_DATA,

    /**
     * RenewNodeTask
     */
    RENEW,

    /**
     * VersionCompareTask
     */
    VERSION_COMPARE
}

6.4.2 解耦

我们用 StartTaskEvent 举例,这里使用Set。

public class StartTaskEvent implements Event {
    private final SetsuitableTypes;

    public StartTaskEvent(SetsuitableTypes) {
        this.suitableTypes = suitableTypes;
    }

    public SetgetSuitableTypes() {
        return suitableTypes;
    }
}

在 MetaServerChangeEventHandler 之中,则启动了renew task。

if (obj instanceof NodeChangeResult) {
    NodeChangeResultresult = (NodeChangeResult) obj;
    MapversionMap = result.getDataCenterListVersions();

    //send renew after first register dataNode
    Setset = new HashSet<>();
    set.add(StartTaskTypeEnum.RENEW);
    eventCenter.post(new StartTaskEvent(set));

    eventCenter.post(new DataServerChangeEvent(result.getNodes(), versionMap,
            DataServerChangeEvent.FromType.REGISTER_META));
    break;
}

在启动时候,post了event,但是指定了启动非RENEW task。

private void startScheduler() {
    try {
        if (schedulerStarted.compareAndSet(false, true)) {
            syncDataScheduler.startScheduler();
            // start all startTask except correction task
            eventCenter.post(new StartTaskEvent(
                    Arrays.stream(StartTaskTypeEnum.values()).filter(type -> type != StartTaskTypeEnum.RENEW)
                            .collect(Collectors.toSet())));

            //start dump log
            cacheDigestTask.start();
        }
    } catch (Exception e) {
        schedulerStarted.set(false);
        throw new RuntimeException("Data Scheduler start error!", e);
    }
}

0x07 动态结构

最后动态架构如下,我们也大致知道,DataServer就是一个SpringBoot程序,有几个Server,有若干Bean,有若干定时服务,具体有一些其他业务模块等等,这对我们接下来的理解有帮助。

+---------------------------------------------------------------------------+
| [DataServerBootstrap]                                                     |
|                                                                           |
|                                                                           |
| +------------------------------------------+  +------------------------+  |
| | [Bolt related]                           |  | [http relatged]        |  |
| |                                          |  |                        |  |
| |            DataServerConfig              |  |      httpServer        |  |
| |                                          |  |                        |  |
| |              boltExchange                |  |    jerseyExchange      |  |
| |                                          |  |                        |  |
| |    server +-----------> serverHandlers   |  |   applicationContext   |  |
| |                                          |  |                        |  |
| | dataSyncServer+----> serverSyncHandlers  |  | jerseyResourceConfig   |  |
| |                                          |  |                        |  |
| +------------------------------------------+  +------------------------+  |
| +---------------------+   +----------------+  +------------------------+  |
| |[meta related]       |   |[JVM related]   |  |[Timer related]         |  |
| |                     |   |                |  |                        |  |
| |  metaServerService  |   |                |  |   syncDataScheduler    |  |
| |                     |   |  EventCenter   |  |                        |  |
| |  datumLeaseManager  |   |                |  |    CacheDigestTask     |  |
| +---------------------+   +----------------+  |                        |  |
|                                               +------------------------+  |
+---------------------------------------------------------------------------+

0x08 问题列表

因为从问题出发更有帮助,所以我们总结出一些问题列表,这些我们期望在以后的分析中陆续解决。

0xFF 参考

蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容

蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路

服务注册中心 Session 存储策略 | SOFARegistry 解析

海量数据下的注册中心 - SOFARegistry 架构介绍

服务注册中心数据分片和同步方案详解 | SOFARegistry 解析

蚂蚁金服开源通信框架SOFABolt解析之连接管理剖析

蚂蚁金服开源通信框架SOFABolt解析之超时控制机制及心跳机制

蚂蚁金服开源通信框架 SOFABolt 协议框架解析

蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析

蚂蚁通信框架实践

sofa-bolt 远程调用

sofa-bolt学习

SOFABolt 设计总结 - 优雅简洁的设计之道

SofaBolt源码分析-服务启动到消息处理

SOFABolt 源码分析

SOFABolt 源码分析9 - UserProcessor 自定义处理器的设计

SOFARegistry 介绍

SOFABolt 源码分析13 - Connection 事件处理机制的设计

标签:task,金服,DataServer,private,Bean,源码,new,SOFARegistry,public
来源: https://blog.51cto.com/u_15179348/2734034