编程语言
首页 > 编程语言> > soul网关源码学习09-Nacos数据同步

soul网关源码学习09-Nacos数据同步

作者:互联网

soul网关源码学习09-Nacos数据同步

目标:

一、Nacos概览

Nacos 能帮助我们发现、配置和管理微服务,提供了一组简单易用的特性集,帮助我们快速实现动态服务发现、服务配置、服务元数据及流量管理。

关键特性

持久化

默认情况下,Nacos使用嵌入式数据库实现数据的存储。所以,如果启动多个默认配置下的Nacos节点,数据存储是存在一致性问题的。为了解决这个问题,Nacos采用了集中式存储的方式来支持集群化部署,目前只支持MySQL的存储。

同步原理

大概的原理就是Nacos 服务端保存了配置信息,客户端连接到服务端之后,根据 dataID,group可以获取到具体的配置信息,当服务端的配置发生变更时,客户端会收到通知,拿到变更后的最新配置信息后,更新缓存。

二、流程分析

在这里插入图片描述

三、运行项目

修改配置

soul-bootstrapapplication.yml修改数据同步方式为Nacos,url是Nacos服务地址,namespace是命名空间,用于隔离不同的环境。

      nacos:
        url: localhost:8848
        namespace: 1c10d748-af86-43b9-8265-75f487d20c6c
        acm:
          enabled: false
          endpoint: acm.aliyun.com
          namespace:
          accessKey:
          secretKey:

soul-bootstrappom.xml增加Nacos同步数据依赖。

        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>soul-spring-boot-starter-sync-data-nacos</artifactId>
            <version>${project.version}</version>
        </dependency>

soul-adminapplication.yml修改数据同步方式为Nacos。

        nacos:
              url: localhost:8848
              namespace: 1c10d748-af86-43b9-8265-75f487d20c6c
              acm:
                enabled: false
                endpoint: acm.aliyun.com
                namespace:
                accessKey:
                secretKey:

修改完配置,启动soul-admin项目,然后启动soul-bootstrap,第一次启动的时候报空指针异常了,查看报错信息,发现有个configInfo为null,也就是没有加载到配置。
其实启动soul-admin的时候是已经有这个判空的操作了,但是也只是在触发了数据变化的情况下才会执行到,这里其实可以想一下为什么soul-admin启动的时候没有报错,而soul-bootstrap启动的时候报错了。

    @SneakyThrows
    private String getConfig(final String dataId) {
        String config = configService.getConfig(dataId, GROUP, 6000);
        return StringUtils.hasLength(config) ? config : EMPTY_CONFIG_DEFAULT_VALUE;
    }

由于第一次启动的时候Nacos里面没有数据,所以需要初始化Nacos里面的数据。如何初始化呢,最简单的就是直接在admin上面修改数据了,修改数据之后会根据命名空间去初始化到Nacos。(这里使用内嵌的数据库,没有使用mysql)

        watcherData(PLUGIN_DATA_ID, this::updatePluginMap);
        watcherData(SELECTOR_DATA_ID, this::updateSelectorMap);
        watcherData(RULE_DATA_ID, this::updateRuleMap);
        watcherData(META_DATA_ID, this::updateMetaDataMap);
        watcherData(AUTH_DATA_ID, this::updateAuthMap);

修改完数据之后重启soul-bootstrap就可以了。

四、源码分析

soul-admin

还是从soul-admin的配置类DataSyncConfiguration入手,找到Nacos的内部类NacosListener,注意这个ConfigService,是整个配置中心的核心,执行获取数据、监听等操作。

    @Configuration
    @ConditionalOnProperty(prefix = "soul.sync.nacos", name = "url")
    @Import(NacosConfiguration.class)
    static class NacosListener {

        /**
         * Data changed listener data changed listener.
         *
         * @param configService the config service
         * @return the data changed listener
         */
        @Bean
        @ConditionalOnMissingBean(NacosDataChangedListener.class)
        public DataChangedListener nacosDataChangedListener(final ConfigService configService) {
            return new NacosDataChangedListener(configService);
        }
    }

进入NacosDataChangedListener,这个类实现了DataChangedListener接口,onchange方法监听数据的变化。这里看一下onAppAuthChanged方法,其他几个类似。

    public void onAppAuthChanged(final List<AppAuthData> changed, final DataEventTypeEnum eventType) {
        updateAuthMap(getConfig(AUTH_DATA_ID));
        switch (eventType) {
            case DELETE:
                changed.forEach(appAuth -> AUTH_MAP.remove(appAuth.getAppKey()));
                break;
            case REFRESH:
            case MYSELF:
                Set<String> set = new HashSet<>(AUTH_MAP.keySet());
                changed.forEach(appAuth -> {
                    set.remove(appAuth.getAppKey());
                    AUTH_MAP.put(appAuth.getAppKey(), appAuth);
                });
                AUTH_MAP.keySet().removeAll(set);
                break;
            default:
                changed.forEach(appAuth -> AUTH_MAP.put(appAuth.getAppKey(), appAuth));
                break;
        }
        publishConfig(AUTH_DATA_ID, AUTH_MAP);
    }

首先看一下getConfig方法,根据dataIdGROUP去Nacos配置中心拿到配置信息,这里注意如果是第一次启动的情况,这个config的值是null,后面再进一步判断,最终返回{}

    private static final String EMPTY_CONFIG_DEFAULT_VALUE = "{}";
    private String getConfig(final String dataId) {
        String config = configService.getConfig(dataId, GROUP, 6000);
        return StringUtils.hasLength(config) ? config : EMPTY_CONFIG_DEFAULT_VALUE;
    }

接着看updateAuthMap方法,拿到配置信息全量更新到缓存(全删全增)。这里注意一下json序列化的这个参数configInfo,如果上一步没加一个判空的处理的话,这里就会出现空指针异常,其实第一次启动soul-bootstrap报错应该就是这个原因,这个等会再看。

    private void updateAuthMap(final String configInfo) {
        JsonObject jo = GsonUtils.getInstance().fromJson(configInfo, JsonObject.class);
        Set<String> set = new HashSet<>(AUTH_MAP.keySet());
        for (Entry<String, JsonElement> e : jo.entrySet()) {
            set.remove(e.getKey());
            AUTH_MAP.put(e.getKey(), GsonUtils.getInstance().fromJson(e.getValue(), AppAuthData.class));
        }
        AUTH_MAP.keySet().removeAll(set);
    }

然后再往前看一下在哪里会调用这个onAppAuthChanged方法,又是DataChangedEventDispatcher事件分发器。

    public void onApplicationEvent(final DataChangedEvent event) {
        for (DataChangedListener listener : listeners) {
            switch (event.getGroupKey()) {
                case APP_AUTH:
                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
       //省略

走到这里,整个soul-admin就启动完成了,跟之前的差不多,启动了一个监听器,soul-admin修改数据时会发送事件,监听器接收到事件之后更新缓存并更新到Nacos。

soul-bootstrap

可以根据老套路,在日志中(you use nacos sync soul data)找到配置类NacosSyncDataConfiguration,直接进入NacosSyncDataService

    public NacosSyncDataService(final ConfigService configService, final PluginDataSubscriber pluginDataSubscriber,
                                final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
        super(configService, pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
        start();
    }

看start方法,有五个方法,先看第一个就行,其他的类似。这里是把updatePluginMap方法作为参数传到watcherData方法里面。

    public void start() {
        watcherData(PLUGIN_DATA_ID, this::updatePluginMap);
        watcherData(SELECTOR_DATA_ID, this::updateSelectorMap);
        watcherData(RULE_DATA_ID, this::updateRuleMap);
        watcherData(META_DATA_ID, this::updateMetaDataMap);
        watcherData(AUTH_DATA_ID, this::updateAuthMap);
    }

先看一下updatePluginMap方法,遍历所有插件的配置信息,从缓存里面先删除再新增,也就是全量更新(全删全增)。

    protected void updatePluginMap(final String configInfo) {
        try {
            List<PluginData> pluginDataList = new ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, PluginData.class).values());
            pluginDataList.forEach(pluginData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
                subscriber.unSubscribe(pluginData);
                subscriber.onSubscribe(pluginData);
            }));
        } catch (JsonParseException e) {
            log.error("sync plugin data have error:", e);
        }
    }

继续看watcherData方法,启动一个监听listener

    protected void watcherData(final String dataId, final OnChange oc) {
        Listener listener = new Listener() {
            @Override
            public void receiveConfigInfo(final String configInfo) {
                oc.change(configInfo);
            }

            @Override
            public Executor getExecutor() {
                return null;
            }
        };
        oc.change(getConfigAndSignListener(dataId, listener));
        LISTENERS.getOrDefault(dataId, new ArrayList<>()).add(listener);
    }

getConfigAndSignListener方法,先去Nacos拿content,请求超时时间6秒,继续往里面走(Nacos源码)

    private String getConfigAndSignListener(final String dataId, final Listener listener) {
        return configService.getConfigAndSignListener(dataId, GROUP, 6000, listener);
    }
    public String getConfigAndSignListener(String dataId, String group, long timeoutMs, Listener listener) throws NacosException {
        String content = getConfig(dataId, group, timeoutMs);
        worker.addTenantListenersWithContent(dataId, group, content, Arrays.asList(listener));
        return content;
    }

走到一个ClientWorkergetServerConfig方法,发起一个http请求去拿数据,这里好像跟上一篇的http长轮询拿全量数据的接口差不多。

public static final String CONFIG_CONTROLLER_PATH = BASE_PATH + "/configs";
result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);

再看一下ClientWorker这个类,从注释可以看出,这也是基于长轮询的。

/**
 * Longpolling
 *
 * @author Nacos
 */
public class ClientWorker {

再看一下构造方法,创建了几个线程池,看最后一个的checkConfigInfo方法。

    public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager) {
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;

        executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });

        executorService = Executors.newCachedThreadPool(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker.longPolling" + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });

        executor.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }
    public void checkConfigInfo() {
        // 分任务
        int listenerSize = cacheMap.get().size();
        // 向上取整为批数
        int longingTaskCount = (int)Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) {
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }

启动一个线程检查数据变化,往下看就是比较md5值。

class LongPollingRunnable implements Runnable {
	public void run() {
		checkLocalConfig(cacheData);
		//省略
	}
}	

整个流程大概就是先全量更新一次数据,然后启动监听,如果有数据变化更新缓存。

五、总结

走完整个流程的感受就是,Nacos捞数据的时候底层好像也是用了http长轮询,感觉直接用http长轮询就可以了,引入一个中间件好像会增加了不稳定性,而且都是全量更新数据,不像zookeeper可以增量更新。

标签:网关,String,09,soul,AUTH,Nacos,public,源码,final
来源: https://blog.csdn.net/m0_54065725/article/details/113048049