Nacos 的配置文件 Dump 的操作

Nacos Config 模块启动做的事情

Nacos Config模块有一个特点,会将数据库中的配置信息,dump成文件,通过直接文件读取的方式,替代直接读取数据库,降低数据库的压力,是的数据库可以更好的处理写操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
@Service
public class DumpService {

@Autowired
private Environment env;

@Autowired
PersistService persistService;

@PostConstruct
public void init() {
LogUtil.defaultLog.warn("DumpService start");
// 构建单配置文件Dump处理器
DumpProcessor processor = new DumpProcessor(this);
// 构建全部配置文件Dump处理器
DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this);
// 构建全部灰度配置文件Dump处理器
DumpAllBetaProcessor dumpAllBetaProcessor = new DumpAllBetaProcessor(this);
// 构建全部Tag配置文件Dump处理器
DumpAllTagProcessor dumpAllTagProcessor = new DumpAllTagProcessor(this);

// 构建单配置文件任务管理器
dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager");
dumpTaskMgr.setDefaultTaskProcessor(processor);

// 构建全配置文件任务管理器
dumpAllTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpAllTaskManager");
dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor);

// 任务添加
Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());

// 任务添加
Runnable dumpAllBeta = () -> dumpAllTaskMgr.addTask(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask());

// 清除历史配置文件信息(xx天之前的历史配置信息全部删除)
Runnable clearConfigHistory = () -> {
log.warn("clearConfigHistory start");
// 只有集群的第一IP节点可以执行此任务
if (ServerListService.isFirstIp()) {
try {
Timestamp startTime = getBeforeStamp(TimeUtils.getCurrentTime(), 24 * getRetentionDays());
int totalCount = persistService.findConfigHistoryCountByTime(startTime);
if (totalCount > 0) {
// 分页删除历史记录
// 之所以要采用分页的方式,一个是为了降低数据库删除数据时的压力,另一方面考虑数据库集群的主从同步延迟的问题(bin-log)
int pageSize = 1000;
int removeTime = (totalCount + pageSize - 1) / pageSize;
log.warn("clearConfigHistory, getBeforeStamp:{}, totalCount:{}, pageSize:{}, removeTime:{}",
startTime, totalCount, pageSize, removeTime);
while (removeTime > 0) {
// 分页删除,以免批量太大报错
persistService.removeConfigHistory(startTime, pageSize);
removeTime--;
}
}
} catch (Throwable e) {
log.error("clearConfigHistory error", e);
}
}
};

try {
// dump出全部的配置信息到文件中
dumpConfigInfo(dumpAllProcessor);

// 更新beta缓存
LogUtil.defaultLog.info("start clear all config-info-beta.");
// 清除所有的灰度配置文件信息
DiskUtil.clearAllBeta();
// 如果存在灰度配置文件信息得的表
if (persistService.isExistTable(BETA_TABLE_NAME)) {
// 执行dump全部灰度配置文件的任务
dumpAllBetaProcessor.process(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask());
}
// 更新Tag缓存
LogUtil.defaultLog.info("start clear all config-info-tag.");
DiskUtil.clearAllTag();
if (persistService.isExistTable(TAG_TABLE_NAME)) {
dumpAllTagProcessor.process(DumpAllTagTask.TASK_ID, new DumpAllTagTask());
}

// add to dump aggr
List<ConfigInfoChanged> configList = persistService.findAllAggrGroup();
if (configList != null && !configList.isEmpty()) {
total = configList.size();
List<List<ConfigInfoChanged>> splitList = splitList(configList, INIT_THREAD_COUNT);
for (List<ConfigInfoChanged> list : splitList) {
MergeAllDataWorker work = new MergeAllDataWorker(list);
work.start();
}
log.info("server start, schedule merge end.");
}
} catch (Exception e) {
LogUtil.fatalLog.error(
"Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage(),
e.getCause());
throw new RuntimeException(
"Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage());
}
// 如果非单机模式,则Nacos Config存在一个dump文件的心跳记录,可以减少dump文件的开销以及任务耗时
if (!STANDALONE_MODE) {
Runnable heartbeat = () -> {
// 记录当前的时间
String heartBeatTime = TimeUtils.getCurrentTime().toString();
// write disk
try {
// 时间信息持久化
DiskUtil.saveHeartBeatToDisk(heartBeatTime);
} catch (IOException e) {
LogUtil.fatalLog.error("save heartbeat fail" + e.getMessage());
}
};

// 周期性执行任务
TimerTaskService.scheduleWithFixedDelay(heartbeat, 0, 10, TimeUnit.SECONDS);

// 随机的任务延迟时间
long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10;
LogUtil.defaultLog.warn("initialDelay:{}", initialDelay);

// 周期性执行dump全部配置文件的操作
TimerTaskService.scheduleWithFixedDelay(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE,
TimeUnit.MINUTES);

// 周期性执行dump全部灰度配置文件的操作
TimerTaskService.scheduleWithFixedDelay(dumpAllBeta, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE,
TimeUnit.MINUTES);
}

// 周期性执行清除往期历史配置信息记录
TimerTaskService.scheduleWithFixedDelay(clearConfigHistory, 10, 10, TimeUnit.MINUTES);

}
...
}
举一个Dump操作的任务——dumpConfigInfo(dumpAllProcessor)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor)
throws IOException {
int timeStep = 6;
Boolean isAllDump = true;
// initial dump all
FileInputStream fis = null;
Timestamp heartheatLastStamp = null;
try {
// 是否开启了快速启动模式(即根据前面的 heartbeat file 记录的时间信息进行快速dump出配置文件信息)
if (isQuickStart()) {
File heartbeatFile = DiskUtil.heartBeatFile();
if (heartbeatFile.exists()) {
// 打开文件流
fis = new FileInputStream(heartbeatFile);
String heartheatTempLast = IOUtils.toString(fis, Constants.ENCODE);
heartheatLastStamp = Timestamp.valueOf(heartheatTempLast);
// 计算时间差是否满足小于36小时
if (TimeUtils.getCurrentTime().getTime()
- heartheatLastStamp.getTime() < timeStep * 60 * 60 * 1000) {
// 超过时间,直接采取全部配置信息执行dump出文件的操作
isAllDump = false;
}
}
}
// 是否进行全部配置信息的dump成文件操作
if (isAllDump) {
LogUtil.defaultLog.info("start clear all config-info.");
// 删除所有的配置文件信息
DiskUtil.clearAll();
dumpAllProcessor.process(DumpAllTask.TASK_ID, new DumpAllTask());
} else {
// 此处为快速启动逻辑的操作
Timestamp beforeTimeStamp = getBeforeStamp(heartheatLastStamp,
timeStep);
// 数据变更 dump 处理器
DumpChangeProcessor dumpChangeProcessor = new DumpChangeProcessor(
this, beforeTimeStamp, TimeUtils.getCurrentTime());
// 执行操作
dumpChangeProcessor.process(DumpChangeTask.TASK_ID, new DumpChangeTask());
// 文件的 MD5 检查任务
Runnable checkMd5Task = () -> {
LogUtil.defaultLog.error("start checkMd5Task");
// 直接根据内存缓存中的配置信息的数据,进行快速检查每个配置文件信息的变更情况
List<String> diffList = ConfigService.checkMd5();
for (String groupKey : diffList) {
// 将对应格式的数据进行解析
String[] dg = GroupKey.parseKey(groupKey);
String dataId = dg[0];
String group = dg[1];
String tenant = dg[2];
// 直接查找对应的配置文件信息
ConfigInfoWrapper configInfo = persistService.queryConfigInfo(dataId, group, tenant);
// 进行变更判断并dump出文件
ConfigService.dumpChange(dataId, group, tenant, configInfo.getContent(),
configInfo.getLastModified());
}
LogUtil.defaultLog.error("end checkMd5Task");
};
// 进行周期任务调度执行
TimerTaskService.scheduleWithFixedDelay(checkMd5Task, 0, 12,
TimeUnit.HOURS);
}
} catch (IOException e) {
LogUtil.fatalLog.error("dump config fail" + e.getMessage());
throw e;
} finally {
if (null != fis) {
try {
// 文件流关闭
fis.close();
} catch (IOException e) {
LogUtil.defaultLog.warn("close file failed");
}
}
}
}

在上面的代码中,看到了这段代码

1
2
3
DumpChangeProcessor dumpChangeProcessor = new DumpChangeProcessor(this, beforeTimeStamp, TimeUtils.getCurrentTime());
// 执行操作
dumpChangeProcessor.process(DumpChangeTask.TASK_ID, new DumpChangeTask());

这段代码和nacos config模块的快速启动机制有关,来看下这个处理器具体做了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Override
public boolean process(String taskType, AbstractTask task) {
LogUtil.defaultLog.warn("quick start; startTime:{},endTime:{}",
startTime, endTime);
LogUtil.defaultLog.warn("updateMd5 start");
// 开始更新文件信息的时间
long startUpdateMd5 = System.currentTimeMillis();
// 从数据库查出所有的配置文件的MD5值信息
List<ConfigInfoWrapper> updateMd5List = persistService
.listAllGroupKeyMd5();
LogUtil.defaultLog.warn("updateMd5 count:{}", updateMd5List.size());
// 遍历列表进行更新本地的配置信息文件的 MD5 值信息
for (ConfigInfoWrapper config : updateMd5List) {
final String groupKey = GroupKey2.getKey(config.getDataId(),
config.getGroup());
ConfigService.updateMd5(groupKey, config.getMd5(),
config.getLastModified());
}
// 结束配置信息文件 MD5 更新任务的时间
long endUpdateMd5 = System.currentTimeMillis();
LogUtil.defaultLog.warn("updateMd5 done,cost:{}", endUpdateMd5
- startUpdateMd5);

LogUtil.defaultLog.warn("deletedConfig start");
long startDeletedConfigTime = System.currentTimeMillis();
// 查找需要删除的配置历史记录信息
// 对应的 SQL 语句:"SELECT DISTINCT data_id, group_id, tenant_id FROM his_config_info WHERE op_type = 'D' AND gmt_modified >=? AND gmt_modified <= ?"
List<ConfigInfo> configDeleted = persistService.findDeletedConfig(
startTime, endTime);
LogUtil.defaultLog.warn("deletedConfig count:{}", configDeleted.size());
// 更新配置信息
for (ConfigInfo configInfo : configDeleted) {
if (persistService.findConfigInfo(configInfo.getDataId(), configInfo.getGroup(),
configInfo.getTenant()) == null) {
// 移除对应的配置
ConfigService.remove(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant());
}
}
long endDeletedConfigTime = System.currentTimeMillis();
LogUtil.defaultLog.warn("deletedConfig done,cost:{}",
endDeletedConfigTime - startDeletedConfigTime);

LogUtil.defaultLog.warn("changeConfig start");
long startChangeConfigTime = System.currentTimeMillis();
// 查找变更的配置信息
// 对应的 SQL 语句:SELECT data_id, group_id, tenant_id, app_name, content, gmt_modified FROM config_info WHERE gmt_modified >=? AND gmt_modified <= ?"
List<PersistService.ConfigInfoWrapper> changeConfigs = persistService
.findChangeConfig(startTime, endTime);
LogUtil.defaultLog.warn("changeConfig count:{}", changeConfigs.size());
for (PersistService.ConfigInfoWrapper cf : changeConfigs) {
boolean result = ConfigService.dumpChange(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified());
final String content = cf.getContent();
final String md5 = MD5.getInstance().getMD5String(content);
LogUtil.defaultLog.info(
"[dump-change-ok] {}, {}, length={}, md5={}",
new Object[] {
GroupKey2.getKey(cf.getDataId(), cf.getGroup()),
cf.getLastModified(), content.length(), md5});
}
// 重新进行配置文件加载操作
ConfigService.reloadConfig();
long endChangeConfigTime = System.currentTimeMillis();
LogUtil.defaultLog.warn("changeConfig done,cost:{}",
endChangeConfigTime - startChangeConfigTime);
return true;
}

来看看这个dumpChange(...)做了什么事情

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
static public boolean dumpChange(String dataId, String group, String tenant, String content, long lastModifiedTs) {
final String groupKey = GroupKey2.getKey(dataId, group, tenant);

makeSure(groupKey);
// 针对CacheItem进行加锁操作
final int lockResult = tryWriteLock(groupKey);
assert (lockResult != 0);

// 获取锁失败
if (lockResult < 0) {
dumpLog.warn("[dump-error] write lock failed. {}", groupKey);
return false;
}

try {
// 计算当前配置文件内容的签名值
final String md5 = MD5.getInstance().getMD5String(content);
// 集群模式,采用Dump成文件的形式,将配置文件进行缓存
if (!STANDALONE_MODE || PropertyUtil.isStandaloneUseMysql()) {
// 获取当前缓存文件的签名值
String loacalMd5 = DiskUtil.getLocalConfigMd5(dataId, group, tenant);
if (md5.equals(loacalMd5)) {
dumpLog.warn(
"[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
+ "lastModifiedNew={}",
groupKey, md5, ConfigService.getLastModifiedTs(groupKey), lastModifiedTs);
} else {
// 如果签名不一致,则更新缓存文件
DiskUtil.saveToDisk(dataId, group, tenant, content);
}
}
// 更新CacheItem的元数据信息
updateMd5(groupKey, md5, lastModifiedTs);
return true;
} catch (IOException ioe) {
dumpLog.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(),
ioe);
return false;
} finally {
releaseWriteLock(groupKey);
}
}

public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
CacheItem cache = makeSure(groupKey);
if (cache.md5 == null || !cache.md5.equals(md5)) {
cache.md5 = md5;
cache.lastModifiedTs = lastModifiedTs;
// 发布数据变更事件
EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey));
}
}

为什么执行一次这个ConfigService.reloadConfig()操作?

其实这一块,是nacos config模块自己采用配置管理的功能的体现,通过将一些配置进行管理,使得nacos config模块的内部一些功能可以根据用户设置的参数动态变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
static public void reloadConfig() {
String aggreds = null;
try {
if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
// 聚合白名单
ConfigInfoBase config = persistService.findConfigInfoBase(AggrWhitelist.AGGRIDS_METADATA, "DEFAULT_GROUP");
if (config != null) {
aggreds = config.getContent();
}
} else {
aggreds = DiskUtil.getConfig(AggrWhitelist.AGGRIDS_METADATA,
"DEFAULT_GROUP", StringUtils.EMPTY);
}
if (aggreds != null) {
// 聚合白名单列表重新加载
AggrWhitelist.load(aggreds);
}
} catch (IOException e) {
dumpLog.error("reload fail:" + AggrWhitelist.AGGRIDS_METADATA, e);
}

String clientIpWhitelist = null;
try {
// 客户端白名单
if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
ConfigInfoBase config = persistService.findConfigInfoBase(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA, "DEFAULT_GROUP");
if (config != null) {
clientIpWhitelist = config.getContent();
}
} else {
clientIpWhitelist = DiskUtil.getConfig(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA, "DEFAULT_GROUP",
StringUtils.EMPTY);
}
if (clientIpWhitelist != null) {
ClientIpWhiteList.load(clientIpWhitelist);
}
} catch (IOException e) {
dumpLog.error("reload fail:"
+ ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA, e);
}

String switchContent = null;
try {
// nacos 功能开关配置重新加载
if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
ConfigInfoBase config = persistService.findConfigInfoBase(SwitchService.SWITCH_META_DATAID,
"DEFAULT_GROUP");
if (config != null) {
switchContent = config.getContent();
}
} else {
switchContent = DiskUtil.getConfig(
SwitchService.SWITCH_META_DATAID, "DEFAULT_GROUP", StringUtils.EMPTY);
}
if (switchContent != null) {
SwitchService.load(switchContent);
}
} catch (IOException e) {
dumpLog.error("reload fail:" + SwitchService.SWITCH_META_DATAID, e);
}

}

为什么要Dump配置成文件

数据库其实是config manager组件的瓶颈,数据库最大并发程度,限制了配置管理组件能够承担的最大客户端查询配置并发度,因此,需要采取一定的缓存策略,降低查询直接打在数据库上,提高数据库的可用性

直接存在内存 Map 中可不可以?

如果说配置文件信息很小以及配置文件数量很少的情况下,可以考虑直接缓存在内存的 Map 数据结构中,但是当配置文件信息内容太大时,这个时候,就不适合放在内存的 Map 中了,而是需要考虑采用外部存储来缓存起来,但是本着稳定的原则,能不依赖外部组件就不依赖外部组件,因此,nacos采取了磁盘文件缓存的方式,同时内存 Map 负责存储每个配置文件的元数据信息,

Apollo 的不足

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public ConfigFileController(
final ConfigController configController,
final NamespaceUtil namespaceUtil,
final WatchKeysUtil watchKeysUtil,
final GrayReleaseRulesHolder grayReleaseRulesHolder) {
//
localCache = CacheBuilder.newBuilder()
// 设置缓存失效的时间
.expireAfterWrite(EXPIRE_AFTER_WRITE, TimeUnit.MINUTES)
// 计算缓存容量
.weigher((Weigher<String, String>) (key, value) -> value == null ? 0 : value.length())
// 设定最大的缓存容量
.maximumWeight(MAX_CACHE_SIZE)
// 缓存失效 or 被自动删除时的监听器需要做的操作
.removalListener(notification -> {
String cacheKey = notification.getKey();
logger.debug("removing cache key: {}", cacheKey);
if (!cacheKey2WatchedKeys.containsKey(cacheKey)) {
return;
}
//create a new list to avoid ConcurrentModificationException
List<String> watchedKeys = new ArrayList<>(cacheKey2WatchedKeys.get(cacheKey));
for (String watchedKey : watchedKeys) {
watchedKeys2CacheKey.remove(watchedKey, cacheKey);
}
cacheKey2WatchedKeys.removeAll(cacheKey);
logger.debug("removed cache key: {}", cacheKey);
})
.build();
...
}

public class ConfigServiceWithCache extends AbstractConfigService {
...
@PostConstruct
void initialize() {
configCache = CacheBuilder.newBuilder()
.expireAfterAccess(DEFAULT_EXPIRED_AFTER_ACCESS_IN_MINUTES, TimeUnit.MINUTES)
// 构建可以自动加载不存在Key的value的缓存Map
.build(new CacheLoader<String, ConfigCacheEntry>() {
@Override
public ConfigCacheEntry load(String key) throws Exception {
List<String> namespaceInfo = STRING_SPLITTER.splitToList(key);
if (namespaceInfo.size() != 3) {
Tracer.logError(
new IllegalArgumentException(String.format("Invalid cache load key %s", key)));
return nullConfigCacheEntry;
}

Transaction transaction = Tracer.newTransaction(TRACER_EVENT_CACHE_LOAD, key);
try {
// 直接搜数据库
ReleaseMessage latestReleaseMessage = releaseMessageService.findLatestReleaseMessageForMessages(Lists.newArrayList(key));
Release latestRelease = releaseService.findLatestActiveRelease(namespaceInfo.get(0), namespaceInfo.get(1), namespaceInfo.get(2));

transaction.setStatus(Transaction.SUCCESS);

long notificationId = latestReleaseMessage == null ? ConfigConsts.NOTIFICATION_ID_PLACEHOLDER : latestReleaseMessage.getId();

if (notificationId == ConfigConsts.NOTIFICATION_ID_PLACEHOLDER && latestRelease == null) {
return nullConfigCacheEntry;
}
// 返回创建的新的缓存Value对象
return new ConfigCacheEntry(notificationId, latestRelease);
} catch (Throwable ex) {
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}
});
...
}

Apollo的缓存策略,是采取了guavaCache的实现,本质还是一个内存的Map,只是多了过期算法,使得缓存能够稳定在某个具体的内存占用上限,但是也有相应的缺陷,由于是采用内存过期存储Map + 数据库直查的方式,容易出现大量的缓存失效时,大量的数据查询操作直接打在数据库上,有可能