编程语言
首页 > 编程语言> > [从源码学设计]蚂蚁金服SOFARegistry之推拉模型

[从源码学设计]蚂蚁金服SOFARegistry之推拉模型

作者:互联网

[从源码学设计]蚂蚁金服SOFARegistry之推拉模型

目录

0x00 摘要

SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。

本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。

本文为第七篇,介绍SOFARegistry网络操作的推拉模型。

0x01 相关概念

Push还是Pull???

1.1 推模型和拉模型

在观察者模式中,又分为推模型和拉模型两种方式。 

推模型:主题对象向观察者推送主题的详细信息,不管观察者是否需要,推送的信息通常是主题对象的全部或部分数据。

拉模型:主题对象在通知观察者的时候,只传递少量信息。如果观察者需要更具体的信息,由观察者主动到主题对象中获取,相当于是观察者从主题对象中拉数据。

具体两个模型详细剖析如下:

1.1.1 推模型:

特点:
优点:
缺点:

1.1.2 拉模型

特点:
优点:
缺点:

1.2 Guava LoadingCache

Guava是Google guava中的一个内存缓存模块,用于将数据缓存到JVM内存中。实际项目开发中经常将一些公共或者常用的数据缓存起来方便快速访问。

Google Guava Cache提供了基于容量,时间和引用的缓存回收方式。基于容量的方式内部实现采用LRU算法,基于引用回收很好的利用了Java虚拟机的垃圾回收机制。

其中的缓存构造器CacheBuilder采用构建者模式提供了设置好各种参数的缓存对象,缓存核心类LocalCache里面的内部类Segment与jdk1.7及以前的ConcurrentHashMap非常相似,都继承于ReetrantLock,还有六个队列,以实现丰富的本地缓存方案。

0x02 业务领域

2.1 应用场景

SOFARegistry 的业务特点如下:

2.2 问题点

我们通过业务,能够想到的问题点如下:

2.3 解决方案

对于以上的问题点,业界一般来说会采用“推”和“拉”结合的形式,例如,服务端只负责通知 “某一些数据已经准备好”,至于是否需要获取和什么时候客户端来获取这些数据,完全由客户端自行确定。

2.4 阿里方案

我们首先看看阿里都应用了什么方案。

2.4.1 各种模型应用

在SOFARegistry‘中,应用了各种模型,比如 :

下面是两种场景的数据推送对比图。

img

2.4.2 推拉模型

就本文涉及的问题域来说,蚂蚁金服在这里采用了经典的推拉模型来维持数据一致性,下面我们仅以 Session Server和 Data Server 之间维护数据一致性 为例说明。大致逻辑如下:

SOFARegistry 中采用了 LoadingCache 的数据结构来在 SessionServer 中缓存从 DataServer 中同步来的数据。

0x03 拉模型 in Session Server

这里 SOFARegistry 采用了 Guava LoadingCache 的数据结构,通过给 cache 中的 entry 设置过期时间的方式,使得 cache 定期从 DataServer 中拉取数据以替换过期的 entry。

模型大致示例如下,下文会详细讲解:

 +-----------------------------------------+
 |            Session Server               |
 |                                         |
 | +-------------------------------------+ |
 | |        SessionCacheService          | |
 | |                                     | |
 | | +--------------------------------+  | |
 | | |                                |  | |
 | | |    LoadingCache<Key, Value>    |  | |
 | | |            +                   |  | |
 | | |            |  expireAfterWrite |  | |
 | | |            |                   |  | |
 | | |            v                   |  | |
 | | |     DatumCacheGenerator        |  | |
 | | |            +                   |  | |
 | | +--------------------------------+  | |
 | +-------------------------------------+ |
 |                |                        |
 |                v                        |
 |       +--------+------------+           |
 |       | DataNodeServiceImpl |           |
 |       +--------+------------+           |
 |                |                        |
 +-----------------------------------------+
                  |
                  |   GetDataRequest
                  |
+-------------------------------------------+
                  |
                  |
                  v
          +-------+-----------+
          |   Data Server     |
          +-------------------+

3.1 Bean

相关的Bean定义如下,其中SessionCacheService应用了Guava LoadingCache,DatumCacheGenerator是具体加载实现。

@Configuration
public static class SessionCacheConfiguration {

    @Bean
    public CacheService sessionCacheService() {
        return new SessionCacheService();
    }

    @Bean(name = "com.alipay.sofa.registry.server.session.cache.DatumKey")
    public DatumCacheGenerator datumCacheGenerator() {
        return new DatumCacheGenerator();
    }
}

3.2 代码分析

拉模型的实现是在SessionCacheService类,其删减版代码 如下

public class SessionCacheService implements CacheService {
    private final LoadingCache<Key, Value> readWriteCacheMap; 
    private Map<String, CacheGenerator>    cacheGenerators;

    ......
}

可以看到其核心就是利用了

private final LoadingCache<Key, Value> readWriteCacheMap;

3.2.1 Cache构造

构造LoadingCache举例如下:

代码如下:

this.readWriteCacheMap = CacheBuilder.newBuilder().maximumSize(1000L)
    .expireAfterWrite(31000, TimeUnit.MILLISECONDS).build(new CacheLoader<Key, Value>() {
        @Override
        public Value load(Key key) {
            return generatePayload(key);
        }
    });

3.2.2 获取value

获取value的函数比较简单:

@Override
public Value getValue(final Key key) throws CacheAccessException {
    Value payload = null;
    payload = readWriteCacheMap.get(key);
    return payload;
}

@Override
public Map<Key, Value> getValues(final Iterable<Key> keys) throws CacheAccessException {
    Map<Key, Value> valueMap = null;
    valueMap = readWriteCacheMap.getAll(keys);
    return valueMap;
}

3.2.3 批量清除

清除批量缓存对象,这个API在Data Server 主动给 Session Server 发送 Push 数据时候会用到,这样就将引发一次主动获取。

@Override
public void invalidate(Key... keys) {
    for (Key key : keys) {
        readWriteCacheMap.invalidate(key);
    }
}

3.2.4 自动加载

自动加载是通过CacheGenerator完成。

private Value generatePayload(Key key) {
    Value value = null;
    switch (key.getKeyType()) {
        case OBJ:
            EntityType entityType = key.getEntityType();
            CacheGenerator cacheGenerator = cacheGenerators
                .get(entityType.getClass().getName());
            value = cacheGenerator.generatePayload(key);
            break;
        case JSON:
            break;
        case XML:
            break;
        default:
            value = new Value(new HashMap<String, Object>());
            break;
    }
    return value;
}

3.2.5 设置加载

设置加载是通过如下代码完成。

/**
 * Setter method for property <tt>cacheGenerators</tt>.
 *
 * @param cacheGenerators  value to be assigned to property cacheGenerators
 */
@Autowired
public void setCacheGenerators(Map<String, CacheGenerator> cacheGenerators) {
    this.cacheGenerators = cacheGenerators;
}

具体设置时候runtime参数如下:

cacheGenerators = {LinkedHashMap@3368}  size = 1
 "com.alipay.sofa.registry.server.session.cache.DatumKey" -> {DatumCacheGenerator@3374} 

3.3 加载类实现

加载类是通过DatumCacheGenerator完成。

public class DatumCacheGenerator implements CacheGenerator {
    @Autowired
    private DataNodeService     dataNodeService;

    @Override
    public Value generatePayload(Key key) {

        EntityType entityType = key.getEntityType();
        if (entityType instanceof DatumKey) {
            DatumKey datumKey = (DatumKey) entityType;

            String dataCenter = datumKey.getDataCenter();
            String dataInfoId = datumKey.getDataInfoId();

            if (isNotBlank(dataCenter) && isNotBlank(dataInfoId)) {
                return new Value(dataNodeService.fetchDataCenter(dataInfoId, dataCenter));
            } 
        } 

        return null;
    }

    public boolean isNotBlank(String ss) {
        return ss != null && !ss.isEmpty();
    }
}

可以看到,加载具体就是通过DataNodeServiceImpl向 DataServer 发起请求。

public class DataNodeServiceImpl implements DataNodeService {
    @Autowired
    private NodeExchanger         dataNodeExchanger;

    @Autowired
    private NodeManager           dataNodeManager;
  
    @Override
    public Datum fetchDataCenter(String dataInfoId, String dataCenterId) {

        Map<String/*datacenter*/, Datum> map = getDatumMap(dataInfoId, dataCenterId);
        if (map != null && map.size() > 0) {
            return map.get(dataCenterId);
        }
        return null;
    }
  
    @Override
    public Map<String, Datum> getDatumMap(String dataInfoId, String dataCenterId) {

        Map<String/*datacenter*/, Datum> map;

        try {
            GetDataRequest getDataRequest = new GetDataRequest();

            //dataCenter null means all dataCenters
            if (dataCenterId != null) {
                getDataRequest.setDataCenter(dataCenterId);
            }

            getDataRequest.setDataInfoId(dataInfoId);

            Request<GetDataRequest> getDataRequestStringRequest = new Request<GetDataRequest>() {

                @Override
                public GetDataRequest getRequestBody() {
                    return getDataRequest;
                }

                @Override
                public URL getRequestUrl() {
                    return getUrl(dataInfoId);
                }

                @Override
                public Integer getTimeout() {
                    return sessionServerConfig.getDataNodeExchangeForFetchDatumTimeOut();
                }
            };

            Response response = dataNodeExchanger.request(getDataRequestStringRequest);
            Object result = response.getResult();
            GenericResponse genericResponse = (GenericResponse) result;
            if (genericResponse.isSuccess()) {
                map = (Map<String, Datum>) genericResponse.getData();
                map.forEach((dataCenter, datum) -> Datum.internDatum(datum));
            } 
        } 
        return map;
    }
}

拉模型具体如下图所示:

 +-----------------------------------------+
 |            Session Server               |
 |                                         |
 | +-------------------------------------+ |
 | |        SessionCacheService          | |
 | |                                     | |
 | | +--------------------------------+  | |
 | | |                                |  | |
 | | |    LoadingCache<Key, Value>    |  | |
 | | |            +                   |  | |
 | | |            |  expireAfterWrite |  | |
 | | |            |                   |  | |
 | | |            v                   |  | |
 | | |     DatumCacheGenerator        |  | |
 | | |            +                   |  | |
 | | +--------------------------------+  | |
 | +-------------------------------------+ |
 |                |                        |
 |                v                        |
 |       +--------+------------+           |
 |       | DataNodeServiceImpl |           |
 |       +--------+------------+           |
 |                |                        |
 +-----------------------------------------+
                  |
                  |   GetDataRequest
                  |
+-------------------------------------------+
                  |
                  |
                  v
          +-------+-----------+
          |   Data Server     |
          +-------------------+

0x04 推模型

当 DataServer 中有数据更新时,也会主动向 SessionServer 发请求使对应 entry 失效,从而促使 SessionServer 去更新失效 entry。

4.1 发起推动作

DataChangeRequest in Data Server

当Data Server 有数据变化时候,会主动发送 DataChangeRequest 给 Session Server。

具体代码是在SessionServerNotifier之中,具体如下(这就与前文的Notifier联系起来):

public class SessionServerNotifier implements IDataChangeNotifier {

    private AsyncHashedWheelTimer          asyncHashedWheelTimer;

    @Autowired
    private DataServerConfig               dataServerConfig;

    @Autowired
    private Exchange                       boltExchange;

    @Autowired
    private SessionServerConnectionFactory sessionServerConnectionFactory;

    @Autowired
    private DatumCache                     datumCache;
  
    @Override
    public void notify(Datum datum, Long lastVersion) {
        DataChangeRequest request = new DataChangeRequest(datum.getDataInfoId(),
            datum.getDataCenter(), datum.getVersion());
        List<Connection> connections = sessionServerConnectionFactory.getSessionConnections();
        for (Connection connection : connections) {
            doNotify(new NotifyCallback(connection, request));
        }
    }

    private void doNotify(NotifyCallback notifyCallback) {
        Connection connection = notifyCallback.connection;
        DataChangeRequest request = notifyCallback.request;
        try {
            //check connection active
            if (!connection.isFine()) {
                return;
            }
            Server sessionServer = boltExchange.getServer(dataServerConfig.getPort());
            sessionServer.sendCallback(sessionServer.getChannel(connection.getRemoteAddress()),
                request, notifyCallback, dataServerConfig.getRpcTimeout());
        } catch (Exception e) {
            onFailed(notifyCallback);
        }
    }
}

4.2 接收推消息

DataChangeRequestHandler in Session Server

在Session Server,DataChangeRequestHandler负责响应处理收到的推消息 DataChangeRequest

可以看到,其调用了如下代码使得Cache失效,进而后续Cache会去Data Server重新load value

sessionCacheService.invalidate(new Key(KeyType.OBJ, DatumKey.class.getName(), new DatumKey(
        dataChangeRequest.getDataInfoId(), dataChangeRequest.getDataCenter())));

其删减版代码如下:

public class DataChangeRequestHandler extends AbstractClientHandler {

    /**
     * store subscribers
     */
    @Autowired
    private Interests                        sessionInterests;

    @Autowired
    private SessionServerConfig              sessionServerConfig;

    @Autowired
    private ExecutorManager                  executorManager;

    @Autowired
    private CacheService                     sessionCacheService;

    @Autowired
    private DataChangeRequestHandlerStrategy dataChangeRequestHandlerStrategy;

    @Override
    public Object reply(Channel channel, Object message) {
        DataChangeRequest dataChangeRequest = (DataChangeRequest) message;
        dataChangeRequest.setDataCenter(dataChangeRequest.getDataCenter());
        dataChangeRequest.setDataInfoId(dataChangeRequest.getDataInfoId());

        //update cache when change
        sessionCacheService.invalidate(new Key(KeyType.OBJ, DatumKey.class.getName(), new DatumKey(
            dataChangeRequest.getDataInfoId(), dataChangeRequest.getDataCenter())));

        try {
            boolean result = sessionInterests.checkInterestVersions(
                dataChangeRequest.getDataCenter(), dataChangeRequest.getDataInfoId(),
                dataChangeRequest.getVersion());
            fireChangFetch(dataChangeRequest);
        } 

        return null;
    }

    private void fireChangFetch(DataChangeRequest dataChangeRequest) {
        dataChangeRequestHandlerStrategy.doFireChangFetch(dataChangeRequest);
    }
}

于是我们的架构图变化为:

 +----------------------------------------------------------------+
 |                        Session Server                          |
 |                                                                |
 | +-----------------------------------------------------------+  |
 | |                  SessionCacheService                      |  |
 | |                                                           |  |
 | | +-------------------------------------------------------+ |  |
 | | |                                                       | |  |
 | | |    LoadingCache<Key, Value>  <----------+             | |  |
 | | |            +                            |             | |  |
 | | |            |  expireAfterWrite          | invalidate  | |  |
 | | |            |                            |             | |  |
 | | |            v                            |             | |  |
 | | |     DatumCacheGenerator                 |             | |  |
 | | |            +                            |             | |  |
 | | +-------------------------------------------------------+ |  |
 | +-----------------------------------------------------------+  |
 |                |                            |                  |
 |                v                            |                  |
 |       +--------+------------+     +---------+----------------+ |
 |       | DataNodeServiceImpl |     | DataChangeRequestHandler | |
 |       +--------+------------+     +---------+----------------+ |
 |                |                            ^                  |
 +----------------------------------------------------------------+
                  |                            |
   GetDataRequest |                            | DataChangeRequest
                  |                            |
+--------------------------------------------------------------------+
                  |                            |
                  |  Pull                      | Push
                  v                            |
                +-+----------------------------+-+
                |           Data Server          |
                +--------------------------------+

手机上如下:

让我们在SessionServer内部继续延伸下,看看当收到推消息之后,SessionServer是怎样进行后续的push,就是通知Client。即我们之前提到的:Client 与 SessionServer 之间,完全基于推的机制

4.3 延伸处理Strategy

DefaultDataChangeRequestHandlerStrategy

前面代码来到了处理dataChangeRequest的部分。

dataChangeRequestHandlerStrategy.doFireChangFetch(dataChangeRequest);

剩下部分还是 Strategy -- Listener -- Task 的套路(后续有文章讲解)。

public class DefaultDataChangeRequestHandlerStrategy implements DataChangeRequestHandlerStrategy {
    @Autowired
    private TaskListenerManager taskListenerManager;

    @Override
    public void doFireChangFetch(DataChangeRequest dataChangeRequest) {
        TaskEvent taskEvent = new TaskEvent(dataChangeRequest.getDataInfoId(),
            TaskEvent.TaskType.DATA_CHANGE_FETCH_CLOUD_TASK);
        taskListenerManager.sendTaskEvent(taskEvent);
    }
}

4.4 延伸处理Listener

DataChangeFetchCloudTaskListener

DataChangeFetchCloudTaskListener在 support函数中配置了支持 DATA_CHANGE_FETCH_CLOUD_TASK。

@Override
public TaskType support() {
    return TaskType.DATA_CHANGE_FETCH_CLOUD_TASK;
}

具体代码如下:

public class DataChangeFetchCloudTaskListener implements TaskListener {

    @Autowired
    private Interests                                    sessionInterests;

    @Autowired
    private SessionServerConfig                          sessionServerConfig;

    /**
     * trigger task com.alipay.sofa.registry.server.meta.listener process
     */
    @Autowired
    private TaskListenerManager                          taskListenerManager;

    @Autowired
    private ExecutorManager                              executorManager;

    @Autowired
    private CacheService                                 sessionCacheService;

    private volatile TaskDispatcher<String, SessionTask> singleTaskDispatcher;

    private TaskProcessor                                dataNodeSingleTaskProcessor;

    public DataChangeFetchCloudTaskListener(TaskProcessor dataNodeSingleTaskProcessor) {
        this.dataNodeSingleTaskProcessor = dataNodeSingleTaskProcessor;
    }

    public TaskDispatcher<String, SessionTask> getSingleTaskDispatcher() {
        if (singleTaskDispatcher == null) {
            synchronized (this) {
                if (singleTaskDispatcher == null) {
                    singleTaskDispatcher = TaskDispatchers.createSingleTaskDispatcher(
                        TaskDispatchers.getDispatcherName(TaskType.DATA_CHANGE_FETCH_CLOUD_TASK
                            .getName()), sessionServerConfig.getDataChangeFetchTaskMaxBufferSize(),
                        sessionServerConfig.getDataChangeFetchTaskWorkerSize(), 1000, 100,
                        dataNodeSingleTaskProcessor);
                }
            }
        }
        return singleTaskDispatcher;
    }

    @Override
    public TaskType support() {
        return TaskType.DATA_CHANGE_FETCH_CLOUD_TASK;
    }

    @Override
    public void handleEvent(TaskEvent event) {
        SessionTask dataChangeFetchTask = new DataChangeFetchCloudTask(sessionServerConfig,
            taskListenerManager, sessionInterests, executorManager, sessionCacheService);
        dataChangeFetchTask.setTaskEvent(event);
        getSingleTaskDispatcher().dispatch(dataChangeFetchTask.getTaskId(), dataChangeFetchTask,
            dataChangeFetchTask.getExpiryTime());
    }

}

4.5 延伸处理Task

DataChangeFetchCloudTask

DataChangeFetchCloudTask 会进行后续的push,就是通知Client

public class DataChangeFetchCloudTask extends AbstractSessionTask {
    private final SessionServerConfig sessionServerConfig;

    private Interests                 sessionInterests;

    /**
     * trigger task com.alipay.sofa.registry.server.meta.listener process
     */
    private final TaskListenerManager taskListenerManager;

    private final ExecutorManager     executorManager;

    private String                    fetchDataInfoId;

    private final CacheService        sessionCacheService;
}

会获取每个Subscriber的 IP,然后向 taskListenerManager 发送若干种消息,比如:

从而进行后续对client的 push。

@Override
public void execute() {
    Map<String/*dataCenter*/, Datum> datumMap = getDatumsCache();

    if (datumMap != null && !datumMap.isEmpty()) {

        PushTaskClosure pushTaskClosure = getTaskClosure(datumMap);

        for (ScopeEnum scopeEnum : ScopeEnum.values()) {
            Map<InetSocketAddress, Map<String, Subscriber>> map = getCache(fetchDataInfoId,
                scopeEnum);
            if (map != null && !map.isEmpty()) {
                for (Entry<InetSocketAddress, Map<String, Subscriber>> entry : map.entrySet()) {
                    Map<String, Subscriber> subscriberMap = entry.getValue();
                    if (subscriberMap != null && !subscriberMap.isEmpty()) {
                        List<String> subscriberRegisterIdList = new ArrayList<>(
                            subscriberMap.keySet());

                        //select one row decide common info
                        Subscriber subscriber = subscriberMap.values().iterator().next();

                        //remove stopPush subscriber avoid push duplicate
                        evictReSubscribers(subscriberMap.values());

                        fireReceivedDataMultiPushTask(datumMap, subscriberRegisterIdList,
                            scopeEnum, subscriber, subscriberMap, pushTaskClosure);
                    }
                }
            }
        }

        pushTaskClosure.start();
    } 
}

以 RECEIVED_DATA_MULTI_PUSH_TASK 为例,我们的架构流程图更改如下:

+-------------------------------------------------------------------------------------------------------------------+
|                        Session Server                                  +-------------------------------------+    |
|                                                                        |  ReceivedDataMultiPushTaskListener  |    |
| +-----------------------------------------------------------+          +------+------------------------------+    |
| |                  SessionCacheService                      |                 ^                                   |
| |                                                           |                 |  RECEIVED_DATA_MULTI_PUSH_TASK    |
| | +-------------------------------------------------------+ |                 |                                   |
| | |                                                       | |             +---+------------------------+          |
| | |    LoadingCache<Key, Value>  <----------+             | |             |  DataChangeFetchCloudTask  |          |
| | |            +                            |             | |             +---+------------------------+          |
| | |            |  expireAfterWrite          | invalidate  | |                 ^                                   |
| | |            |                            |             | |                 |                                   |
| | |            v                            |             | |                 |                                   |
| | |     DatumCacheGenerator                 |             | |           +-----+----------------------------+      |
| | |            +                            |             | |           | DataChangeFetchCloudTaskListener |      |
| | +-------------------------------------------------------+ |           +-----+----------------------------+      |
| +-----------------------------------------------------------+                 ^                                   |
|                |                            |                                 |  DATA_CHANGE_FETCH_CLOUD_TASK     |
|                v                            |                                 |                                   |
|       +--------+------------+     +---------+----------------+       +--------+--------------------------------+  |
|       | DataNodeServiceImpl |     | DataChangeRequestHandler +-----> | DefaultDataChangeRequestHandlerStrategy |  |
|       +--------+------------+     +---------+----------------+       +-----------------------------------------+  |
|                |                            ^                                                                     |
+-------------------------------------------------------------------------------------------------------------------+
                 |                            ^
  GetDataRequest |                            | DataChangeRequest
                 |                            |
+-------------------------------------------------------------------------------------------------------------------+
                 |                            ^
                 | Pull                       |  Push
                 v                            |
               +-+----------------------------+-+
               |           Data Server          |
               +--------------------------------+

手机上如下:

0x05 总结

本文讲解了蚂蚁金服在维持数据一致性上采用的经典的推拉模型,以 Session Server和 Data Server 之间维护数据一致性 为例。其大致逻辑如下:

SOFARegistry 中采用了 LoadingCache 的数据结构来在 SessionServer 中缓存从 DataServer 中同步来的数据。

大家在日常开发中,可以借鉴。

0xFF 参考

Guava LoadingCache详解及工具类

Google Guava Cache 全解析

蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析

标签:SessionServer,金服,模型,DataServer,private,源码,推送,SOFARegistry,public
来源: https://www.cnblogs.com/rossiXYZ/p/14123647.html