其他分享
首页 > 其他分享> > Nacos-配置中心

Nacos-配置中心

作者:互联网

一.服务端处理

1.1 ConfigController.getConfig接口获取配置

@GetMapping

@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)

public void getConfig(HttpServletRequest request, HttpServletResponse response,

@RequestParam("dataId") String dataId, @RequestParam("group") String group,

@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, //租户信息,对应 Nacos 的命名空间ID字段

@RequestParam(value = "tag", required = false) String tag)

throws IOException, ServletException, NacosException {

// check tenant,检查名称等有效性

ParamUtils.checkTenant(tenant);

tenant = NamespaceUtil.processNamespaceParameter(tenant);

// check params,校验参数非空和有效性

ParamUtils.checkParam(dataId, group, "datumId", "content");

ParamUtils.checkParam(tag);

final String clientIp = RequestUtil.getRemoteIp(request);

inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp);

}

1.1.1 getRemoteIp方法尽可能获取真实IP

获取IP是尽可能获取真实的,而不是代理的,如果有NGINX的话,优先使用X_FORWARDED_FOR,取出第一个就是最开始的客户端真实地址。如果都没有的话,只能用RemoteAddr。

public static String getRemoteIp(HttpServletRequest request) {

String xForwardedFor = request.getHeader(X_FORWARDED_FOR);

if (!StringUtils.isBlank(xForwardedFor)) {

return xForwardedFor.split(X_FORWARDED_FOR_SPLIT_SYMBOL)[0].trim();

}

String nginxHeader = request.getHeader(X_REAL_IP);

return StringUtils.isBlank(nginxHeader) ? request.getRemoteAddr() : nginxHeader;

}

1.1.2 ConfigServletInner.doGetConfig

根据groupKey更新缓存的属性,然后根据单例运行是否用mysql,进行mysql查询或者直接用文件零拷贝传输,因为有个DumpService在初始化的时候会去mysql比对记录,把文件保存到本地/data\config-data文件夹中,所以可以直接用零拷贝了。

public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group,

String tenant, String tag, String clientIp) {

final String groupKey = GroupKey2.getKey(dataId, group, tenant); //拼接

String autoTag = request.getHeader("Vipserver-Tag");

String requestIpApp = RequestUtil.getAppName(request);

//首先获取读锁,先从缓存中读,最大重试10次,间隔1

int lockResult = tryConfigReadLock(groupKey);

final String requestIp = RequestUtil.getRemoteIp(request);

boolean isBeta = false;

if (lockResult > 0) {

FileInputStream fis = null;

try {

String md5 = Constants.NULL;

long lastModified = 0L;

//获取缓存,包括MD5,lastmodifyedTs,type等..

CacheItem cacheItem = ConfigCacheService.getContentCache(groupKey);

//configType例如yaml

final String configType =

(null != cacheItem.getType()) ? cacheItem.getType() : FileTypeEnum.TEXT.getFileType();

File file = null;

ConfigInfoBase configInfoBase = null;

PrintWriter out = null;

md5 = cacheItem.getMd5(); //缓存中的md5值

lastModified = cacheItem.getLastModifiedTs(); //缓存中的最后修改时间

    //确定是否直接读取数据(嵌入式存储derby+单机),如果使用mysql,降低数据库读取压力; 如果使用raft + derby,降低leader读取压力; persistService.findConfigInfo方法后面有简单分析

if (PropertyUtil.isDirectRead()) {

configInfoBase = persistService.findConfigInfo(dataId, group, tenant);

} else { //读取本地文件

file = DiskUtil.targetFile(dataId, group, tenant);

}

response.setHeader(Constants.CONTENT_MD5, md5);

// 响应禁用缓存

response.setHeader("Pragma", "no-cache");

response.setDateHeader("Expires", 0);

response.setHeader("Cache-Control", "no-cache,no-store");

if (PropertyUtil.isDirectRead()) {

response.setDateHeader("Last-Modified", lastModified);

} else {

fis = new FileInputStream(file);

response.setDateHeader("Last-Modified", file.lastModified());

}

//设置一些头信息后,进行响应输出,如果有本地文件就用零拷贝,否则就用字符流。

if (PropertyUtil.isDirectRead()) {

out = response.getWriter(); //使用字符流

out.print(configInfoBase.getContent());

out.flush();

out.close();

} else {

//操作系统可以将字节直接从文件系统缓存传输到目标通道

fis.getChannel()

.transferTo(0L, fis.getChannel().size(), Channels.newChannel(response.getOutputStream()));

}

.....

} finally { //释放锁

releaseConfigReadLock(groupKey);

}

} else if (lockResult == 0) {

//如果获取不到说明配置文件不存在或者有线程正在写配置文件,那也不给读,为了保证数据的一致性:

...

}

return HttpServletResponse.SC_OK + "";

}

1.1.3.PersistService.findConfigInfo()

这个方法会直接查询数据库, select **** 这种。

derby数据库:

如果其他数据库用JdbcTemplate操作;

1.2 DumpService将配置文件全部Dump到磁盘

Nacos Config模块有一个特点,会将数据库中的配置信息,dump成文件,通过直接文件读取的方式,替代直接读取数据库,降低数据库的压力,是的数据库可以更好的处理写操作。

图片来源: https://blog.csdn.net/wangwei19871103/article/details/105814924

1.2.1 DumpService.init()

spring启动加载时,会执行带有 @PostConstruct 注解的初始化方法;

@PostConstruct

@Override

protected void init() throws Throwable {

if (ApplicationUtils.getStandaloneMode()) {

dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor, dumpAllTagProcessor);

return;

}

//非单机模式, 后面章节再分析

.....

1.2.2 dumpOperate()

protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProcessor,

DumpAllBetaProcessor dumpAllBetaProcessor, DumpAllTagProcessor dumpAllTagProcessor) throws NacosException {

String dumpFileContext = "CONFIG_DUMP_TO_FILE";

TimerContext.start(dumpFileContext);

try {

// 构建并添加全部配置文件Dump处理器

Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());

// 构建并添加全部灰度配置文件Dump处理器

Runnable dumpAllBeta = () -> dumpAllTaskMgr.addTask(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask());

// 构建并添加全部Tag配置文件Dump处理器

Runnable dumpAllTag = () -> dumpAllTaskMgr.addTask(DumpAllTagTask.TASK_ID, new DumpAllTagTask());

//清除历史配置文件信息(xx天之前的历史配置信息全部删除)

Runnable clearConfigHistory = () -> {

// 单机模式返回true, derby + raft 模式 leader节点可以执行此任务

if (canExecute()) {

try {

Timestamp startTime = getBeforeStamp(TimeUtils.getCurrentTime(), 24 * getRetentionDays());

int totalCount = persistService.findConfigHistoryCountByTime(startTime);

if (totalCount > 0) {

// 分页删除历史记录 ; 采用分页的方式,一是为了降低数据库删除数据时的压力,另一方面考虑数据库集群的主从同步延迟的问题(bin-log)

int pageSize = 1000;

int removeTime = (totalCount + pageSize - 1) / pageSize;

 

while (removeTime > 0) {

/// 分页删除,以免批量太大报错

persistService.removeConfigHistory(startTime, pageSize);

removeTime--;

}

}

}

}

};

 

try {

//全量Dump配置信息

dumpConfigInfo(dumpAllProcessor);

// 更新 Beta缓存,先删除文件

DiskUtil.clearAllBeta();

if (persistService.isExistTable(BETA_TABLE_NAME)) {

dumpAllBetaProcessor.process(new DumpAllBetaTask());

}

// 更新 Tag 缓存,先删除文件

DiskUtil.clearAllTag();

if (persistService.isExistTable(TAG_TABLE_NAME)) {

dumpAllTagProcessor.process(new DumpAllTagTask());

}

 

// add to dump aggr

List<ConfigInfoChanged> configList = persistService.findAllAggrGroup();

if (configList != null && !configList.isEmpty()) {

total = configList.size();

List<List<ConfigInfoChanged>> splitList = splitList(configList, INIT_THREAD_COUNT);

for (List<ConfigInfoChanged> list : splitList) {

MergeAllDataWorker work = new MergeAllDataWorker(list);

work.start();

}

LOGGER.info("server start, schedule merge end.");

}

}

// 非单机模式,则Nacos Config存在一个dump文件的心跳记录,可以减少dump文件的开销以及任务耗时

if (!ApplicationUtils.getStandaloneMode()) {

Runnable heartbeat = () -> {

String heartBeatTime = TimeUtils.getCurrentTime().toString();

// 时间信息持久化

DiskUtil.saveHeartBeatToDisk(heartBeatTime);

};

// 周期性执行任务

ConfigExecutor.scheduleConfigTask(heartbeat, 0, 10, TimeUnit.SECONDS);

// 随机的任务延迟时间

long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10;

ConfigExecutor.scheduleConfigTask(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);

// 周期性执行dump全部灰度配置文件的操作 ,6小时

ConfigExecutor

.scheduleConfigTask(dumpAllBeta, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);

// 周期性执行dump全部tag配置文件的操作 ,6小时

ConfigExecutor

.scheduleConfigTask(dumpAllTag, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);

}

// 周期性执行清除往期历史配置信息记录

ConfigExecutor.scheduleConfigTask(clearConfigHistory, 10, 10, TimeUnit.MINUTES);

} finally {

TimerContext.end(dumpFileContext, LogUtil.DUMP_LOG);

}

}

1.2.3 dumpConfigInfo()

作用: 主要是将数据库中的所有ConfigInfo查询出来写到服务器的磁盘中

参数: dumpAllProcessor, 这个是TaskProcessor 任务处理器;处理器中有个 process()方法; 最终执行任务的时候就是执行这个方法的;

private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor) throws IOException {

int timeStep = 6;

Boolean isAllDump = true;

// initial dump all

FileInputStream fis = null;

Timestamp heartheatLastStamp = null;

try {

//判断是否快速启动,即配置文件中的isQuickStart默认false ,

if (isQuickStart()) {

File heartbeatFile = DiskUtil.heartBeatFile();

if (heartbeatFile.exists()) {

fis = new FileInputStream(heartbeatFile);

String heartheatTempLast = IoUtils.toString(fis, Constants.ENCODE);

heartheatLastStamp = Timestamp.valueOf(heartheatTempLast);

//如果上一次服务正常的时间距离现在不超过6个小时; 那么设置 isAllDump = false;表示不需要全量Dump

if (TimeUtils.getCurrentTime().getTime() - heartheatLastStamp.getTime()

< timeStep * 60 * 60 * 1000) {

isAllDump = false;

}

}

}

if (isAllDump) {

LogUtil.DEFAULT_LOG.info("start clear all config-info.");

//先删除本地file

DiskUtil.clearAll();

//处理全部配置数据

dumpAllProcessor.process(new DumpAllTask());

} else {

//非全量dump,下面小节1.2.5介绍.dumpChangeProcessor

Timestamp beforeTimeStamp = getBeforeStamp(heartheatLastStamp, timeStep);

DumpChangeProcessor dumpChangeProcessor = new DumpChangeProcessor(this, beforeTimeStamp,

TimeUtils.getCurrentTime());

dumpChangeProcessor.process(new DumpChangeTask());

// 文件的 MD5 检查任务

Runnable checkMd5Task = () -> {

LogUtil.DEFAULT_LOG.error("start checkMd5Task");

// 直接根据内存缓存中的配置信息的数据,进行快速检查每个配置文件信息的变更情况

List<String> diffList = ConfigCacheService.checkMd5();

for (String groupKey : diffList) {

// 将对应格式的数据进行解析

String[] dg = GroupKey.parseKey(groupKey);

String dataId = dg[0];

String group = dg[1];

String tenant = dg[2];

// 直接查找对应的配置文件信息

ConfigInfoWrapper configInfo = persistService.queryConfigInfo(dataId, group, tenant);

// 进行变更判断并dump出文件

ConfigCacheService.dumpChange(dataId, group, tenant, configInfo.getContent(),

configInfo.getLastModified());

}

LogUtil.DEFAULT_LOG.error("end checkMd5Task");

};

// 进行周期任务调度执行

ConfigExecutor.scheduleConfigTask(checkMd5Task, 0, 12, TimeUnit.HOURS);

}

}

}

DiskUtil.heartBeatFile() 获取心跳文件

心跳文件在 {NACOS_HOME}/status/heartBeat.txt,这是一个心跳文件,每十秒就会把当前时间写入到这个文件中;

作用: 为了能够快速启动应用,可以选择不需要全部Dump所有的配置文件,因为上一次可能已经Dump了文件在磁盘中了,如果配置很大的话,走IO还是会花费一定的时间的; 所以每十秒来持久化一次当前时间,用于记录上一次服务正常距离现在有多长时间; 假设服务宕机了,半个小时之后才启动成功,那么我们只需要将这半小时之内数据库中的配置变化重新Dump到磁盘中就行了,不需要DumpAll;

1.2.4 全量dump

DumpAllProcessor.process()方法

@Override

public boolean process(NacosTask task) {

//查询数据库最大id

long currentMaxId = persistService.findConfigMaxId();

long lastMaxId = 0;

while (lastMaxId < currentMaxId) {

//分页获取数据库的数据,每次1000

Page<ConfigInfoWrapper> page = persistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE);

if (page != null && page.getPageItems() != null && !page.getPageItems().isEmpty()) {

for (ConfigInfoWrapper cf : page.getPageItems()) {

long id = cf.getId();

lastMaxId = id > lastMaxId ? id : lastMaxId;

// AggrWhitelist是Nacos页面自定义的一个DataId; 如果ConfigInfo的DataId是这个值的话就会被单独解析,

if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) {

AggrWhitelist.load(cf.getContent());

}

// ClientIpWhiteList也是Nacos自己定义的一个预留配置DataId,Ip白名单

if (cf.getDataId().equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {

ClientIpWhiteList.load(cf.getContent());

}

// SwitchService也是Nacos内部预留的一个配置;DataId是 com.alibaba.nacos.meta.switch ; 开发者可以配置这个里面的属性,来进行一些设置内部属性的操作;

if (cf.getDataId().equals(SwitchService.SWITCH_META_DATAID)) {

SwitchService.load(cf.getContent());

}

//dump方法里面会校验MD5是否改变并更新,真正的磁盘写入操作

boolean result = ConfigCacheService

.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified(),

cf.getType());

}

DEFAULT_LOG.info("[all-dump] {} / {}", lastMaxId, currentMaxId);

} else {

lastMaxId += PAGE_SIZE;

}

}

return true;

}

ConfigService.dump()方法

真正的磁盘写入操作;

这个方法首先将配置保存到磁盘文件中,并且缓存配置信息的MD5到内存中;

如果配置信息不一致(MD5不一致),则将会发送一个通知事件 LocalDataChangeEvent告知本地数据有更改;

CacheItem 是配置信息的对象;保存着配置信息的一些信息,但是没有保存Content,只保存了content的MD5;

/*** Save config file and update md5 value in cache. */

public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,

String type) {

String groupKey = GroupKey2.getKey(dataId, group, tenant);

//如果内存中没有当前配置的缓存 CacheItem,则组装对象保存进去;这时md5是空字符串;

CacheItem ci = makeSure(groupKey);

ci.setType(type);

//获取写锁,没获取到则报错或返回false

final int lockResult = tryWriteLock(groupKey);

try {

    //计算content的MD5

final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);

if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {

        ...

} else if (!PropertyUtil.isDirectRead()) {

//上面计算的md5跟内存 CacheItem 中的md5做比较(第一次肯定不相等),如果不相等则将文件保存到磁盘中;

DiskUtil.saveToDisk(dataId, group, tenant, content);

}

//updateMd5方法中,如果MD5不相同,则更新 CacheItem 中的MD5属性和lastModifiedTs属性;lastModifiedTs是表示最后更新时间; 如果MD5不相同,还要发送通知告知数据有变更;

updateMd5(groupKey, md5, lastModifiedTs);

return true;

}

return false;

} finally {

releaseWriteLock(groupKey);

}

}

updateMd5方法:

public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {

CacheItem cache = makeSure(groupKey);

if (cache.md5 == null || !cache.md5.equals(md5)) {

cache.md5 = md5;

cache.lastModifiedTs = lastModifiedTs;

NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));

}

}

LongPollingService监听LocalDataChangeEvent事件

@Override

public void onEvent(Event event) {

//SwitchService配置中的一个属性 isFixedPolling; 是否固定长轮询

if (isFixedPolling()) {

// Ignore.

} else {

if (event instanceof LocalDataChangeEvent) {

LocalDataChangeEvent evt = (LocalDataChangeEvent) event;

ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));

}

}

}

最终执行DataChangeTask

class DataChangeTask implements Runnable {

@Override

public void run() {

ConfigCacheService.getContentBetaMd5(groupKey);

    //1.遍历所有的长轮询订阅者者

for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {

ClientLongPolling clientSub = iter.next();

if (clientSub.clientMd5Map.containsKey(groupKey)) {

if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {

continue; // 2. 如果是beta发布且不在beta列表直接跳过

}

if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {

continue; // 3.如果tag发布且不在tag列表直接跳过

}

getRetainIps().put(clientSub.ip, System.currentTimeMillis());

iter.remove(); // Delete subscribers' relationships.

//4.发送Http请求通知所有未被上面2、3过滤掉的的订阅者最新的配置数据ConfigInfo

clientSub.sendResponse(Arrays.asList(groupKey));

}

}

}

...

}

1.2.5 非全量dump

DumpChangeProcessor执行器

即Dump有变化的数据的执行器;

public class DumpChangeProcessor implements NacosTaskProcessor {

@Override

public boolean process(NacosTask task) {

long startUpdateMd5 = System.currentTimeMillis();

//1.查询数据库所有的配置文件

List<ConfigInfoWrapper> updateMd5List = persistService.listAllGroupKeyMd5();

//2.将所有的配置文件缓存到内存中,并通知所有订阅的客户端

for (ConfigInfoWrapper config : updateMd5List) {

final String groupKey = GroupKey2.getKey(config.getDataId(), config.getGroup());     ConfigCacheService.updateMd5(groupKey, config.getMd5(), config.getLastModified());

}

long endUpdateMd5 = System.currentTimeMillis();

long startDeletedConfigTime = System.currentTimeMillis();

//3. 从 his_config_info 历史表中找到从上一次心跳时间(heartBeat.txt)到现在的所有被删除记录,

// his_config_info 记录的就是历史的配置文件;

List<ConfigInfo> configDeleted = persistService.findDeletedConfig(startTime, endTime);

//4. 遍历拿到的历史配置数据的dataId,group,Tenant;然后去config_info表中查找能不能查到数据

// 如果能查到,说明配置不是被删除了,只是修改了content;

// 如果不能查到,说明整个配置文件都被删除了;则将磁盘对应的配置文件删除;

并且通知订阅的客户端数据变更;

for (ConfigInfo configInfo : configDeleted) {

if (persistService.findConfigInfo(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant())

== null) {

ConfigCacheService.remove(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant());

}

}

long endDeletedConfigTime = System.currentTimeMillis();

final long startChangeConfigTime = System.currentTimeMillis();

//5. config_info表中查找 从上一次心跳时间(heartBeat.txt)到现在的所有有被修改过的配置数据,

// 然后执行 ConfigService.dumpChange 将这个改过的配置Dump的磁盘中,并通知;

List<ConfigInfoWrapper> changeConfigs = persistService.findChangeConfig(startTime, endTime);

for (ConfigInfoWrapper cf : changeConfigs) {

boolean result = ConfigCacheService

.dumpChange(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified());

}

//6. load Nacos内置的一些DataId配置:ClientIpWhiteList,AggrWhitelist,SwitchService

ConfigCacheService.reloadConfig();

long endChangeConfigTime = System.currentTimeMillis();

return true;

}

}

每隔12个小时全量Dump一次数据

ConfigExecutor.scheduleConfigTask(checkMd5Task, 0, 12, TimeUnit.HOURS);

 

public static void scheduleConfigTask(Runnable command, long initialDelay, long delay, TimeUnit unit) {

TIMER_EXECUTOR.scheduleWithFixedDelay(command, initialDelay, delay, unit);

}

另外,还有部分逻辑在 1.2.2 dumpOperate()方法中;

二.客户端读取配置

2.1 服务启动相关配置

spring-cloud-alibaba-nacos-config工程中 META-INF\spring.factories文件注入的类;

org.springframework.cloud.bootstrap.BootstrapConfiguration=\

com.alibaba.cloud.nacos.NacosConfigBootstrapConfiguration

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\

com.alibaba.cloud.nacos.NacosConfigAutoConfiguration,\

com.alibaba.cloud.nacos.endpoint.NacosConfigEndpointAutoConfiguration

org.springframework.boot.diagnostics.FailureAnalyzer=\

com.alibaba.cloud.nacos.diagnostics.analyzer.NacosConnectionFailureAnalyzer

2.1.1 NacosConfigBootstrapConfiguration

nacosConfigProperties实例和nacosPropertySourceLocator实例注入。

@Configuration

@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)

public class NacosConfigBootstrapConfiguration {

@Bean

@ConditionalOnMissingBean

public NacosConfigProperties nacosConfigProperties() {

return new NacosConfigProperties();

}

@Bean

public NacosPropertySourceLocator nacosPropertySourceLocator(

NacosConfigProperties nacosConfigProperties) {

return new NacosPropertySourceLocator(nacosConfigProperties);

}

}

2.1.2 NacosPropertySourceLocator

实现接口PropertySourceLocator;

  1. locate()

先准备设置一堆属性,然后进行共享配置和额外配置的加载,主要是loadApplicationConfiguration。

@Override

public PropertySource<?> locate(Environment env) {

//获取配置服务实例,NacosFactory根据properties反射方式创建

ConfigService configService = nacosConfigProperties.configServiceInstance();

long timeout = nacosConfigProperties.getTimeout();

nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService,

timeout); //属性源建造器,timeout为超时30秒

String name = nacosConfigProperties.getName(); //dataid的名字

String dataIdPrefix = nacosConfigProperties.getPrefix(); //前缀

if (StringUtils.isEmpty(dataIdPrefix)) {

dataIdPrefix = name;

}

//前缀为空的话默认就是spring.application.name

if (StringUtils.isEmpty(dataIdPrefix)) {

dataIdPrefix = env.getProperty("spring.application.name");

}

//创建符合属性源

CompositePropertySource composite = new CompositePropertySource(

NACOS_PROPERTY_SOURCE_NAME);

loadSharedConfiguration(composite); //共享配置,主要是默认组里面的

loadExtConfiguration(composite); // 额外配置

loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);

return composite;

}

2. loadApplicationConfiguration()

先获取配置的扩展名和分组,根据分组配置文件加载和激活的环境加载,也就是我们经常用激活环境的配置文件xxx-dev-yaml这种。加载后的信息都要放入CompositePropertySource符合属性里并返回。

private void loadApplicationConfiguration(

CompositePropertySource compositePropertySource, String dataIdPrefix,

NacosConfigProperties properties, Environment environment) {

String fileExtension = properties.getFileExtension(); //扩展名,比如yml

String nacosGroup = properties.getGroup(); //分组,默认DEFAULT_GROUP

//根据分组配置文件加载

loadNacosDataIfPresent(compositePropertySource,

dataIdPrefix + DOT + fileExtension, nacosGroup, fileExtension, true);

for (String profile : environment.getActiveProfiles()) { //有环境配置的更高级别,比如dev,prod

String dataId = dataIdPrefix + SEP1 + profile + DOT + fileExtension;

//加载环境配置

loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup,

fileExtension, true);

}

}

loadNacosDataIfPresent方法

private void loadNacosDataIfPresent(final CompositePropertySource composite,

final String dataId, final String group, String fileExtension,

boolean isRefreshable) {

if (NacosContextRefresher.getRefreshCount() != 0) { //刷新过了

NacosPropertySource ps;

if (!isRefreshable) { //不刷新,直接缓存取

ps = NacosPropertySourceRepository.getNacosPropertySource(dataId);

}

else {

ps = nacosPropertySourceBuilder.build(dataId, group, fileExtension, true);

}

composite.addFirstPropertySource(ps);

}

else {

NacosPropertySource ps = nacosPropertySourceBuilder.build(dataId, group,

fileExtension, isRefreshable);

composite.addFirstPropertySource(ps);

}

}

NacosPropertySourceBuilder的build方法

先加载数据,然后结果封装成NacosPropertySource,放进缓存(并发hashmap)。

NacosPropertySource build(String dataId, String group, String fileExtension,

boolean isRefreshable) {

Properties p = loadNacosData(dataId, group, fileExtension);

NacosPropertySource nacosPropertySource = new NacosPropertySource(group, dataId,

propertiesToMap(p), new Date(), isRefreshable);

NacosPropertySourceRepository.collectNacosPropertySources(nacosPropertySource);

return nacosPropertySource;

}

  1. loadNacosData()

用NacosConfigService来加载,加载到了就解析成LinkedHashMap返回,否则就是个空的LinkedHashMap。

private Properties loadNacosData(String dataId, String group, String fileExtension) {

String data = null;

try {

//获取到的是字符串,需要后面根据文件类型解析

data = configService.getConfig(dataId, group, timeout);

if (!StringUtils.isEmpty(data)) {

log.info(String.format("Loading nacos data, dataId: '%s', group: '%s'",

dataId, group));

//properties格式配置文件

if (fileExtension.equalsIgnoreCase("properties")) {

Properties properties = new Properties();

properties.load(new StringReader(data));

return properties;

}

         //yml和yaml格式配置文件

else if (fileExtension.equalsIgnoreCase("yaml")

|| fileExtension.equalsIgnoreCase("yml")) {

YamlPropertiesFactoryBean yamlFactory = new YamlPropertiesFactoryBean();

yamlFactory.setResources(new ByteArrayResource(data.getBytes()));

return yamlFactory.getObject();

}

}

}

return EMPTY_PROPERTIES; //返回空的数据

}

2.2 NacosConfigService

NacosConfigService结构在下面小节介绍。

2.2.1 getConfig()

@Override

public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {

return getConfigInner(namespace, dataId, group, timeoutMs);

}

getConfigInner()方法

首先优先从本地获取,再次从网络获取,否则从本地快照缓存文件获取;

private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {

group = null2defaultGroup(group); //默认组

ParamUtils.checkKeyParam(dataId, group); //检查参数

ConfigResponse cr = new ConfigResponse(); // 创建响应

cr.setDataId(dataId);

cr.setTenant(tenant);

cr.setGroup(group);

// 优先使用本地配置,比如C:\Users\600336\nacos\config\fixed-localhost_8848_nacos\data\config-data\目录中获取相应配置文件

String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);

if (content != null) {

cr.setContent(content);

configFilterChainManager.doFilter(null, cr);

content = cr.getContent();

return content;

}

 

try {

// 如果不本地不存在的话,就从网络读,下一节分析

content = worker.getServerConfig(dataId, group, tenant, timeoutMs);

cr.setContent(content);

configFilterChainManager.doFilter(null, cr);

content = cr.getContent();

return content;

}

//如果上面都没获取到,则获取本地快照缓存文件内容,比如 C:\Users\600336\nacos\config\fixed-localhost_8848_nacos\snapshot\mytest\nacos-config-client.yml

dataId, group, tenant, ContentUtils.truncateContent(content));

content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);

cr.setContent(content);

configFilterChainManager.doFilter(null, cr);

content = cr.getContent();

return content;

}

2.2.2 ClientWorker.getServerConfig()

用代理请求/v1/cs/configs,传参数dataId,group,tenant获取配置文件。

public String getServerConfig(String dataId, String group, String tenant, long readTimeout)

throws NacosException {

... //参数group空则默认组

HttpResult result = null;

try {

List<String> params = null;

if (StringUtils.isBlank(tenant)) {

params = Arrays.asList("dataId", dataId, "group", group);

} else {

params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant);

}

result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);

}

switch (result.code) {

case HttpURLConnection.HTTP_OK:

LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);

return result.content;

    ...

}

}

2.2.3 NacosConfigService大致结构

图片来源:https://blog.csdn.net/wangwei19871103/article/details/105738140

由于版本问题,可能结构有区别。

  1. 创建配置服务

客户端启动时候, 前面介绍的NacosPropertySourceLocator.locate()方法中创建,

ConfigService configService = nacosConfigProperties.configServiceInstance();

调用:

此时会初始化一些配置到properties中;

configService = NacosFactory.createConfigService(properties);

调用:

其实就是反射出NacosConfigService,然后获取有参构造方法,反射创建实例。

public static ConfigService createConfigService(Properties properties) throws NacosException {

Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");

Constructor constructor = driverImplClass.getConstructor(Properties.class);

ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);

return vendorImpl;

}

  1. 构造方法

内部组件有:

ServerHttpAgent : http请求的代理,

MetricsHttpAgent : 包装了ServerHttpAgent,加了计时的功能,

ClientWorker : 做配置文件检查。

public NacosConfigService(Properties properties) throws NacosException {

String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);

if (StringUtils.isBlank(encodeTmp)) {

encode = Constants.ENCODE; //默认设置utf-8

} else {

encode = encodeTmp.trim();

}

initNamespace(properties);

agent = new MetricsHttpAgent(new ServerHttpAgent(properties));

agent.start();

worker = new ClientWorker(agent, configFilterChainManager, properties);

}

  1. ServerHttpAgent

public ServerHttpAgent(Properties properties) throws NacosException {

serverListMgr = new ServerListManager(properties);

init(properties); //设置编码,密码,最大重试次数

}

  1. ServerListManager.start()

ServerListManager用来管理注册中心集群列表;

agent.start(); 最终调用到 ServerListManager的start方法;

public synchronized void start() throws NacosException {

if (isStarted || isFixed) {

return;

}

//这里会创建任务,从nameserver获取serverlist

GetServerListTask getServersTask = new GetServerListTask(addressServerUrl);

//重试5次

for (int i = 0; i < initServerlistRetryTimes && serverUrls.isEmpty(); ++i) {

//如果有改变则发起ServerlistChangeEvent事件

getServersTask.run();

this.wait((i + 1) * 100L);

     if (serverUrls.isEmpty()) { ...抛服务器异常... }

}

//无延迟开始调度,每30秒一次

TimerService.scheduleWithFixedDelay(getServersTask, 0L, 30L, TimeUnit.SECONDS);

isStarted = true;

}

  1. ClientWorker

参数:

agent: http代理,MetricsHttpAgent对象,

ConfigFilterChainManager : 过滤器管理器 ,默认里面没有过滤器,可以addFilter自己加。

这里也开启了一个单线程的执行器,执行checkConfigInfo检查配置任务,每10毫秒一次,去检查当前的配置数量,如果超过一个轮询任务的限制数量,默认3000个,就开启一个新的任务去做。

@SuppressWarnings("PMD.ThreadPoolCreationRule")

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {

this.agent = agent;

this.configFilterChainManager = configFilterChainManager;

// 初始化超时时间

init(properties);

 

executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {

@Override

public Thread newThread(Runnable r) {

Thread t = new Thread(r);

t.setName("com.alibaba.nacos.client.Worker." + agent.getName());

t.setDaemon(true);

return t;

}

});

//有cpu核数的线程,用来做长轮询的,每次检查配置,如果LongPollingRunnable任务的配置缓存超过一定数量,默认3000个,就要去开启一个新任务去检查配置

executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {

@Override

public Thread newThread(Runnable r) {

Thread t = new Thread(r);

t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());

t.setDaemon(true);

return t;

}

});

//配置检查

executor.scheduleWithFixedDelay(new Runnable() {

@Override

public void run() {

try {

checkConfigInfo();

} catch (Throwable e) {

LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);

}

}

}, 1L, 10L, TimeUnit.MILLISECONDS);

}

 

checkConfigInfo()方法后续会分析到;

 

 

 

参考:

https://blog.csdn.net/wangwei19871103/article/details/105814924 ,

https://blog.csdn.net/somenzz/article/details/100518028 ,

https://www.liaochuntao.cn/2019/09/16/java-web-54/ ,

https://www.liaochuntao.cn/categories/nacos/ ,

https://blog.csdn.net/wangwei19871103/article/details/105729211 ,

官方api : https://nacos.io/zh-cn/docs/open-api.html

标签:dataId,group,String,中心,配置,Nacos,new,tenant,配置文件
来源: https://www.cnblogs.com/coloz/p/14248816.html