soul网关源码学习09-Nacos数据同步
作者:互联网
soul网关源码学习09-Nacos数据同步
目标:
- 了解Nacos的原理
- 梳理Nacos同步数据的流程
一、Nacos概览
Nacos 能帮助我们发现、配置和管理微服务,提供了一组简单易用的特性集,帮助我们快速实现动态服务发现、服务配置、服务元数据及流量管理。
关键特性
- 服务发现和服务健康监测
- 动态配置服务
- 动态 DNS 服务
- 服务及其元数据管理
持久化
默认情况下,Nacos使用嵌入式数据库实现数据的存储。所以,如果启动多个默认配置下的Nacos节点,数据存储是存在一致性问题的。为了解决这个问题,Nacos采用了集中式存储的方式来支持集群化部署,目前只支持MySQL的存储。
同步原理
大概的原理就是Nacos 服务端保存了配置信息,客户端连接到服务端之后,根据 dataID,group可以获取到具体的配置信息,当服务端的配置发生变更时,客户端会收到通知,拿到变更后的最新配置信息后,更新缓存。
二、流程分析
soul-admin
启动监听,(目前没有把数据库的数据更新到Nacos的操作)。soul-admin
CRUD操作时会触发事件监听,从Nacos拉取全量数据,更新缓存并更新到Nacos。soul-bootstrap
启动的时候会从Nacos拉取全量数据更新到缓存,然后启动监听,如果Nacos最新的数据和缓存的数据不一致,则全量更新到缓存。
三、运行项目
修改配置
soul-bootstrap
的application.yml
修改数据同步方式为Nacos,url
是Nacos服务地址,namespace
是命名空间,用于隔离不同的环境。
nacos:
url: localhost:8848
namespace: 1c10d748-af86-43b9-8265-75f487d20c6c
acm:
enabled: false
endpoint: acm.aliyun.com
namespace:
accessKey:
secretKey:
soul-bootstrap
的pom.xml
增加Nacos同步数据依赖。
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-nacos</artifactId>
<version>${project.version}</version>
</dependency>
soul-admin
的application.yml
修改数据同步方式为Nacos。
nacos:
url: localhost:8848
namespace: 1c10d748-af86-43b9-8265-75f487d20c6c
acm:
enabled: false
endpoint: acm.aliyun.com
namespace:
accessKey:
secretKey:
修改完配置,启动soul-admin
项目,然后启动soul-bootstrap
,第一次启动的时候报空指针异常了,查看报错信息,发现有个configInfo
为null,也就是没有加载到配置。
其实启动soul-admin
的时候是已经有这个判空的操作了,但是也只是在触发了数据变化的情况下才会执行到,这里其实可以想一下为什么soul-admin
启动的时候没有报错,而soul-bootstrap启动的时候报错了。
@SneakyThrows
private String getConfig(final String dataId) {
String config = configService.getConfig(dataId, GROUP, 6000);
return StringUtils.hasLength(config) ? config : EMPTY_CONFIG_DEFAULT_VALUE;
}
由于第一次启动的时候Nacos里面没有数据,所以需要初始化Nacos里面的数据。如何初始化呢,最简单的就是直接在admin
上面修改数据了,修改数据之后会根据命名空间去初始化到Nacos。(这里使用内嵌的数据库,没有使用mysql)
watcherData(PLUGIN_DATA_ID, this::updatePluginMap);
watcherData(SELECTOR_DATA_ID, this::updateSelectorMap);
watcherData(RULE_DATA_ID, this::updateRuleMap);
watcherData(META_DATA_ID, this::updateMetaDataMap);
watcherData(AUTH_DATA_ID, this::updateAuthMap);
修改完数据之后重启soul-bootstrap
就可以了。
四、源码分析
soul-admin
还是从soul-admin
的配置类DataSyncConfiguration
入手,找到Nacos的内部类NacosListener
,注意这个ConfigService
,是整个配置中心的核心,执行获取数据、监听等操作。
@Configuration
@ConditionalOnProperty(prefix = "soul.sync.nacos", name = "url")
@Import(NacosConfiguration.class)
static class NacosListener {
/**
* Data changed listener data changed listener.
*
* @param configService the config service
* @return the data changed listener
*/
@Bean
@ConditionalOnMissingBean(NacosDataChangedListener.class)
public DataChangedListener nacosDataChangedListener(final ConfigService configService) {
return new NacosDataChangedListener(configService);
}
}
进入NacosDataChangedListener
,这个类实现了DataChangedListener
接口,onchange
方法监听数据的变化。这里看一下onAppAuthChanged
方法,其他几个类似。
public void onAppAuthChanged(final List<AppAuthData> changed, final DataEventTypeEnum eventType) {
updateAuthMap(getConfig(AUTH_DATA_ID));
switch (eventType) {
case DELETE:
changed.forEach(appAuth -> AUTH_MAP.remove(appAuth.getAppKey()));
break;
case REFRESH:
case MYSELF:
Set<String> set = new HashSet<>(AUTH_MAP.keySet());
changed.forEach(appAuth -> {
set.remove(appAuth.getAppKey());
AUTH_MAP.put(appAuth.getAppKey(), appAuth);
});
AUTH_MAP.keySet().removeAll(set);
break;
default:
changed.forEach(appAuth -> AUTH_MAP.put(appAuth.getAppKey(), appAuth));
break;
}
publishConfig(AUTH_DATA_ID, AUTH_MAP);
}
首先看一下getConfig
方法,根据dataId
和GROUP
去Nacos配置中心拿到配置信息,这里注意如果是第一次启动的情况,这个config
的值是null
,后面再进一步判断,最终返回{}
private static final String EMPTY_CONFIG_DEFAULT_VALUE = "{}";
private String getConfig(final String dataId) {
String config = configService.getConfig(dataId, GROUP, 6000);
return StringUtils.hasLength(config) ? config : EMPTY_CONFIG_DEFAULT_VALUE;
}
接着看updateAuthMap
方法,拿到配置信息全量更新到缓存(全删全增)。这里注意一下json序列化的这个参数configInfo
,如果上一步没加一个判空的处理的话,这里就会出现空指针异常,其实第一次启动soul-bootstrap
报错应该就是这个原因,这个等会再看。
private void updateAuthMap(final String configInfo) {
JsonObject jo = GsonUtils.getInstance().fromJson(configInfo, JsonObject.class);
Set<String> set = new HashSet<>(AUTH_MAP.keySet());
for (Entry<String, JsonElement> e : jo.entrySet()) {
set.remove(e.getKey());
AUTH_MAP.put(e.getKey(), GsonUtils.getInstance().fromJson(e.getValue(), AppAuthData.class));
}
AUTH_MAP.keySet().removeAll(set);
}
然后再往前看一下在哪里会调用这个onAppAuthChanged
方法,又是DataChangedEventDispatcher
事件分发器。
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
//省略
走到这里,整个soul-admin
就启动完成了,跟之前的差不多,启动了一个监听器,soul-admin
修改数据时会发送事件,监听器接收到事件之后更新缓存并更新到Nacos。
soul-bootstrap
可以根据老套路,在日志中(you use nacos sync soul data
)找到配置类NacosSyncDataConfiguration
,直接进入NacosSyncDataService
。
public NacosSyncDataService(final ConfigService configService, final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
super(configService, pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
start();
}
看start方法,有五个方法,先看第一个就行,其他的类似。这里是把updatePluginMap
方法作为参数传到watcherData
方法里面。
public void start() {
watcherData(PLUGIN_DATA_ID, this::updatePluginMap);
watcherData(SELECTOR_DATA_ID, this::updateSelectorMap);
watcherData(RULE_DATA_ID, this::updateRuleMap);
watcherData(META_DATA_ID, this::updateMetaDataMap);
watcherData(AUTH_DATA_ID, this::updateAuthMap);
}
先看一下updatePluginMap
方法,遍历所有插件的配置信息,从缓存里面先删除再新增,也就是全量更新(全删全增)。
protected void updatePluginMap(final String configInfo) {
try {
List<PluginData> pluginDataList = new ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, PluginData.class).values());
pluginDataList.forEach(pluginData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
subscriber.unSubscribe(pluginData);
subscriber.onSubscribe(pluginData);
}));
} catch (JsonParseException e) {
log.error("sync plugin data have error:", e);
}
}
继续看watcherData
方法,启动一个监听listener
。
protected void watcherData(final String dataId, final OnChange oc) {
Listener listener = new Listener() {
@Override
public void receiveConfigInfo(final String configInfo) {
oc.change(configInfo);
}
@Override
public Executor getExecutor() {
return null;
}
};
oc.change(getConfigAndSignListener(dataId, listener));
LISTENERS.getOrDefault(dataId, new ArrayList<>()).add(listener);
}
看getConfigAndSignListener
方法,先去Nacos拿content
,请求超时时间6秒,继续往里面走(Nacos源码)
private String getConfigAndSignListener(final String dataId, final Listener listener) {
return configService.getConfigAndSignListener(dataId, GROUP, 6000, listener);
}
public String getConfigAndSignListener(String dataId, String group, long timeoutMs, Listener listener) throws NacosException {
String content = getConfig(dataId, group, timeoutMs);
worker.addTenantListenersWithContent(dataId, group, content, Arrays.asList(listener));
return content;
}
走到一个ClientWorker
的getServerConfig
方法,发起一个http请求去拿数据,这里好像跟上一篇的http长轮询拿全量数据的接口差不多。
public static final String CONFIG_CONTROLLER_PATH = BASE_PATH + "/configs";
result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
再看一下ClientWorker
这个类,从注释可以看出,这也是基于长轮询的。
/**
* Longpolling
*
* @author Nacos
*/
public class ClientWorker {
再看一下构造方法,创建了几个线程池,看最后一个的checkConfigInfo
方法。
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
executorService = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling" + agent.getName());
t.setDaemon(true);
return t;
}
});
executor.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
public void checkConfigInfo() {
// 分任务
int listenerSize = cacheMap.get().size();
// 向上取整为批数
int longingTaskCount = (int)Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) {
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
启动一个线程检查数据变化,往下看就是比较md5
值。
class LongPollingRunnable implements Runnable {
public void run() {
checkLocalConfig(cacheData);
//省略
}
}
整个流程大概就是先全量更新一次数据,然后启动监听,如果有数据变化更新缓存。
五、总结
走完整个流程的感受就是,Nacos捞数据的时候底层好像也是用了http长轮询,感觉直接用http长轮询就可以了,引入一个中间件好像会增加了不稳定性,而且都是全量更新数据,不像zookeeper可以增量更新。
标签:网关,String,09,soul,AUTH,Nacos,public,源码,final 来源: https://blog.csdn.net/m0_54065725/article/details/113048049