Nacos-Server 集群之配置中心

如何实现配置在集群间同步的?

前提步骤
  • 部署MySQL,并执行sql脚本创建nacos schema
  • 参照官网的nacos集群部署指南——nacos 集群部署指南
  • 启动nacos集群
开始设置
  • 在集群中任意登陆一台nacos server节点,然后创建配置
  • 再次登陆其余节点,会发现配置信息已经同步到其余的节点中了
配置信息如何在集群间同步?

其实,这个配置信息同步是很简单的——直接从MySQL中读取就好了,在集群模式下,nacos-server需要使用数据库进行存储config配置信息,然后其他nacos-server config节点直接从数据库中读取即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 此处为 nacos-server-config拉取所有配置信息列表
* 查询配置信息,返回JSON格式。
*/
@RequestMapping(params = "search=accurate", method = RequestMethod.GET)
@ResponseBody
public Page<ConfigInfo> searchConfig(HttpServletRequest request,
HttpServletResponse response,
@RequestParam("dataId") String dataId,
@RequestParam("group") String group,
@RequestParam(value = "appName", required = false) String appName,
@RequestParam(value = "tenant", required = false,
defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "config_tags", required = false) String configTags,
@RequestParam("pageNo") int pageNo,
@RequestParam("pageSize") int pageSize) {
Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(100);
if (StringUtils.isNotBlank(appName)) {
configAdvanceInfo.put("appName", appName);
}
if (StringUtils.isNotBlank(configTags)) {
configAdvanceInfo.put("config_tags", configTags);
}
try {
return persistService.findConfigInfo4Page(pageNo, pageSize, dataId, group, tenant,
configAdvanceInfo);
} catch (Exception e) {
String errorMsg = "serialize page error, dataId=" + dataId + ", group=" + group;
log.error(errorMsg, e);
throw new RuntimeException(errorMsg, e);
}
}

CommunicationController的作用

既然部署了nacos-server config集群,那么nacos-client连接的时候,基本上都是写上nacos-server cluster集群下所有节点的IP地址信息,那么这里就会有一个问题了,假设现在有三台nacos-server-config,分别为nacos-A/nacos-B/nacos-C,同时有一个nacos-client,它连接到了nacos-A,但是运维的同学,却在nacos-B进行了配置发布,那么nacos-client要怎么才能获取到最新的配置信息,换句话说,nacos-A要怎么才能知道配置已经更新了?这个时候CommunicationController的作用就体现出来了

CommunicationController是用于其他节点通知的控制器,是被动接受配置更新通知的。那么,这个通知是怎么发出的呢?这个时候回到ConfigController.publishConfig

关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if (StringUtils.isBlank(betaIps)) {
if (StringUtils.isBlank(tag)) {
// 配置信息持久化存储
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
// 进行事件发布
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
} else {
persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
}
} else {
// beta publish,配置信息灰度发布
persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
}

这里可以看到,在publishConfig时,会发布一个ConfigDataChangeEvent事件,然后这个事件,会被监听器AsyncNotifyService接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 异步通知任务,如果发生配置信息变更事件,将此事件广播给所有 nacos-server 集群
@Override
public void onEvent(Event event) {

// 并发产生 ConfigDataChangeEvent
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent)event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
List<?> ipList = serverListService.getServerList();

// 其实这里任何类型队列都可以
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
for (int i = 0; i < ipList.size(); i++) {

/**
* 构建通知url :{@link com.alibaba.nacos.config.server.controller.CommunicationController#notifyConfigInfo(HttpServletRequest, HttpServletResponse, String, String, String, String)}
*/
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String)ipList.get(i), evt.isBeta));
}
EXCUTOR.execute(new AsyncTask(httpclient, queue));
}
}

可以看到在循环语句中,创建了NotifySingleTask这个任务,并且构造函数中的参数中,有一个入参为(String)ipList.get(i),而这个其实就是nacos-server config集群中每个节点的IP地址信息。然后再次创建一个AsyncTask任务,去消费刚刚的Queue<NotifySingleTask>

核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void executeAsyncInvoke() {

while (!queue.isEmpty()) {

NotifySingleTask task = queue.poll();
String targetIp = task.getTargetIP();
if (serverListService.getServerList().contains(targetIp)) {
// 启动健康检查且有不监控的ip则直接把放到通知队列,否则通知
if (serverListService.isHealthCheck() && ServerListService.getServerListUnhealth().contains(targetIp)) {
// target ip 不健康,则放入通知列表中
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target);
// get delay time and set fail count to the task
int delay = getDelayTime(task);
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
queue.add(task);
AsyncTask asyncTask = new AsyncTask(httpclient, queue);
((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
} else {
HttpGet request = new HttpGet(task.url);
request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));
request.setHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, LOCAL_IP);
if (task.isBeta) {
request.setHeader("isBeta", "true");
}
httpclient.execute(request, new AyscNotifyCallBack(httpclient, task));
}
}
}
}

然后会对nacos-server config每个节点数据进行判断,符合条件的话,直接先nacos-server config节点的/v1/cs/communication/dataChange接口发起通知,告知有配置已更新;那如果说通知其他节点失败了呢?又该怎么做?这里其实nacos都考虑到了,来看看AyscNotifyCallBack这个回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class AyscNotifyCallBack implements FutureCallback<HttpResponse> {

public AyscNotifyCallBack(CloseableHttpAsyncClient httpclient, NotifySingleTask task) {
this.task = task;
this.httpclient = httpclient;
}

@Override
public void completed(HttpResponse response) {

long delayed = System.currentTimeMillis() - task.getLastModified();
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
ConfigTraceService.logNotifyEvent(task.getDataId(),
task.getGroup(), task.getTenant(), null, task.getLastModified(),
LOCAL_IP,
ConfigTraceService.NOTIFY_EVENT_OK, delayed,
task.target);
} else {
log.error("[notify-error] {}, {}, to {}, result {}",
new Object[] {task.getDataId(), task.getGroup(),
task.target,
response.getStatusLine().getStatusCode()});
ConfigTraceService.logNotifyEvent(task.getDataId(),
task.getGroup(), task.getTenant(), null, task.getLastModified(),
LOCAL_IP,
ConfigTraceService.NOTIFY_EVENT_ERROR, delayed,
task.target);


//get delay time and set fail count to the task
int delay = getDelayTime(task);

Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();

queue.add(task);
AsyncTask asyncTask = new AsyncTask(httpclient, queue);

((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);

LogUtil.notifyLog.error("[notify-retry] target:{} dataid:{} group:{} ts:{}", new Object[] {task.target, task.getDataId(), task.getGroup(), task.getLastModified()});
MetricsMonitor.getConfigNotifyException().increment();
}
HttpClientUtils.closeQuietly(response);
}

@Override
public void failed(Exception ex) {

long delayed = System.currentTimeMillis() - task.getLastModified();
log.error("[notify-exception] " + task.getDataId() + ", " + task.getGroup() + ", to " + task.target + ", "
+ ex.toString());
log.debug("[notify-exception] " + task.getDataId() + ", " + task.getGroup() + ", to " + task.target + ", "
+ ex.toString(), ex);
ConfigTraceService.logNotifyEvent(task.getDataId(),
task.getGroup(), task.getTenant(), null, task.getLastModified(),
LOCAL_IP,
ConfigTraceService.NOTIFY_EVENT_EXCEPTION, delayed,
task.target);

//get delay time and set fail count to the task
int delay = getDelayTime(task);
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();

queue.add(task);
AsyncTask asyncTask = new AsyncTask(httpclient, queue);

((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
LogUtil.notifyLog.error("[notify-retry] target:{} dataid:{} group:{} ts:{}", new Object[] {task.target, task.getDataId(), task.getGroup(), task.getLastModified()});
MetricsMonitor.getConfigNotifyException().increment();
}

@Override
public void cancelled() {

LogUtil.notifyLog.error("[notify-exception] target:{} dataid:{} group:{} ts:{}", new Object[] {task.target, task.getGroup(), task.getGroup(), task.getLastModified()}, "CANCELED");

//get delay time and set fail count to the task
int delay = getDelayTime(task);
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();

queue.add(task);
AsyncTask asyncTask = new AsyncTask(httpclient, queue);

((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
LogUtil.notifyLog.error("[notify-retry] target:{} dataid:{} group:{} ts:{}", new Object[] {task.target, task.getDataId(), task.getGroup(), task.getLastModified()});
MetricsMonitor.getConfigNotifyException().increment();
}

private NotifySingleTask task;
private CloseableHttpAsyncClient httpclient;
}

在回调函数的三个方法中,对于除成功以外的操作,都会将通知任务重新创建,放入executor中等待重新执行

其他节点接收此更新事件的后续操作

在接收到更新事件通知后,CommunicationController.notifyConfigInfo会执行下面的方法

1
2
3
4
5
if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
} else {
dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
}

而最终,会进入到TaskManager对象中,并且由DumpProcessor进行处理;在DumpProcessorprocess方法中,调用了一个重要的对象——ConfigService。这个对象的所有数据更新操作,都会发布一个事件LocalDataChangeEvent,而这个事件,在LongPollingService处理,而这里存储的,是nacos-client为了获取配置信息而发起的长链接的AsyncServlet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void onEvent(Event event) {
if (isFixedPolling()) {
// ignore
} else {

// LocalDataChangeEvent 事件在 ConfigService 中发布
// 如果本地配置文件信息发生变更,则触发响应的事件
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
}

LongPollingService会根据LocalDataChangeEvent创建出一个DataChangeTask任务,而这个对象,就是将nacos-client感兴趣的配置信息主动告诉nacos-client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void run() {
try {
ConfigService.getContentBetaMd5(groupKey);
// 遍历所有长链接用户列表
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
if (clientSub.clientMd5Map.containsKey(groupKey)) {
// 如果beta发布且不在beta列表直接跳过
if (isBeta && !betaIps.contains(clientSub.ip)) {
continue;
}

// 如果tag发布且不在tag列表直接跳过
if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
continue;
}

getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove(); // 删除订阅关系
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance", RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()), "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
clientSub.sendResponse(Arrays.asList(groupKey));
}
}
} catch (Throwable t) {
LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause());
}
}

至此,链接了nacos-Anacos-client,如果开发人员在nacos-B上进行配置信息发布,也能够接受到最新的配置信息

代码流程图

nacos-server config集群间同步

DumpService为什么要dump出文件

其实并不是DumpService执行了dump文件的操作,该操作最终是由ConfigService执行的,这里选择一个方法做展示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static public boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
makeSure(groupKey);
final int lockResult = tryWriteLock(groupKey);
assert (lockResult != 0);

if (lockResult < 0) {
dumpLog.warn("[dump-error] write lock failed. {}", groupKey);
return false;
}

try {
final String md5 = MD5.getInstance().getMD5String(content);
if (md5.equals(ConfigService.getContentMd5(groupKey))) {
dumpLog.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, " + "lastModifiedNew={}", groupKey, md5, ConfigService.getLastModifiedTs(groupKey), lastModifiedTs);
} else if (!STANDALONE_MODE || PropertyUtil.isStandaloneUseMysql()) {
DiskUtil.saveToDisk(dataId, group, tenant, content);
}
updateMd5(groupKey, md5, lastModifiedTs);
return true;
} catch (IOException ioe) {
dumpLog.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
if (ioe.getMessage() != null) {
String errMsg = ioe.getMessage();
if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg.contains(DISK_QUATA_EN)) {
// 磁盘写满保护代码
fatalLog.error("磁盘满自杀退出", ioe);
System.exit(0);
}
}
return false;
} finally {
releaseWriteLock(groupKey);
}
}

这里可以看到,在集群模式下,会选择将发布的配置信息进行一次dump到文件的操作,而这dump出的文件,最终被用于client拉取配置文件时,直接从该文件中获取;

1
2
3
4
5
6
7
8
9
// ConfigServletInner.doGetConfig

md5 = cacheItem.getMd5();
lastModified = cacheItem.getLastModifiedTs();
if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
} else {
file = DiskUtil.targetFile(dataId, group, tenant);
}

之所以采用这个操作,而不是直接读MySQL,是为了降低MySQL的数据读取压力,因为在集群模式下,虽然推荐MySQL采用高可用的部署,但是由于配置信息如果在发布时,可能导致大量的查询语句落在MySQL上,因此采用本地文件缓存的形式,以降低MySQL的数据读取压力