Nacos中的DistroConsistencyServiceImpl工作浅析

Eureka 一致性策略

Eureka是一个AP模式的服务发现框架,在Eureka集群模式下,Eureka采取的是Server之间互相广播各自的数据进行数据复制、更新操作;并且Eureka在客户端与注册中心出现网络故障时,依然能够获取服务注册信息——Eureka实现了客户端对于服务注册信息的缓存

DiscoveryClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void fetchRegistryFromBackup() {
try {
@SuppressWarnings("deprecation")
BackupRegistry backupRegistryInstance = newBackupRegistryInstance();
if (null == backupRegistryInstance) { // backward compatibility with the old protected method, in case it is being used.
backupRegistryInstance = backupRegistryProvider.get();
}

if (null != backupRegistryInstance) {
Applications apps = null;
if (isFetchingRemoteRegionRegistries()) {
String remoteRegionsStr = remoteRegionsToFetch.get();
if (null != remoteRegionsStr) {
apps = backupRegistryInstance.fetchRegistry(remoteRegionsStr.split(","));
}
} else {
apps = backupRegistryInstance.fetchRegistry();
}
if (apps != null) {
final Applications applications = this.filterAndShuffle(apps);
applications.setAppsHashCode(applications.getReconcileHashCode());
localRegionApps.set(applications);
logTotalInstances();
logger.info("Fetched registry successfully from the backup");
}
} else {
logger.warn("No backup registry instance defined & unable to find any discovery servers.");
}
} catch (Throwable e) {
logger.warn("Cannot fetch applications from apps although backup registry was specified", e);
}
}

正因为Eureka为了能够在Eureka集群无法工作时不影响消费者调用服务提供者而设置的客户端缓存,因此Eureka无法保证服务注册信息的强一致性(CP模式),只能满足数据的最终一致性(AP模式)

Nacos一致性策略——Distro

Nacos在AP模式下的一致性策略就类似于Eureka,采用Server之间互相的数据同步来实现数据在集群中的同步、复制操作。

触发数据广播
1
2
3
4
5
6
7
DistroConsistencyServiceImpl.java

@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);
taskDispatcher.addTask(key);
}

当调用ConsistencyService接口定义的putremove方法时,涉及到了Server端数据的变更,此时会创建一个任务,将数据的key传入taskDispatcher.addTask方法中,用于后面数据变更时数据查找操作

1
2
3
4
5
TaskDispatcher.java

public void addTask(String key) {
taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
}

这里有一个方法需要注意——shakeUp,查看官方代码注解可知这是将keykey可以看作是一次数据变更事件)这里应该是将任务均匀的路由到不同的TaskScheduler对象,确保每个TaskScheduler所承担的任务都差不多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class TaskScheduler implements Runnable {
private int dataSize = 0;
private long lastDispatchTime = 0L;
private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);
...
public void addTask(String key) {
queue.offer(key);
}

@Override
public void run() {
List<String> keys = new ArrayList<>();
while (true) {
try {
String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),TimeUnit.MILLISECONDS);
if (Loggers.EPHEMERAL.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.EPHEMERAL.debug("got key: {}", key);
}
if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
continue;
}
if (StringUtils.isBlank(key)) {
continue;
}
if (dataSize == 0) {
keys = new ArrayList<>();
}
keys.add(key);
dataSize++;
if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
(System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
for (Server member : dataSyncer.getServers()) {
// 自己不需要进行数据广播操作
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
SyncTask syncTask = new SyncTask();
syncTask.setKeys(keys);
syncTask.setTargetServer(member.getKey());
if (Loggers.EPHEMERAL.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.EPHEMERAL.debug("add sync task: {}", JSON.toJSONString(syncTask));
}
dataSyncer.submit(syncTask, 0);
}
lastDispatchTime = System.currentTimeMillis();
dataSize = 0;
}
} catch (Exception e) {
Loggers.EPHEMERAL.error("dispatch sync task failed.", e);
}
}
}
}

核心方法就是for (Server member : dataSyncer.getServers()) {..}循环体内的代码,此处就是将数据在Nacos Server中进行广播操作;具体步骤如下

- 创建`SyncTask`,并设置事件集合(就是`key`集合)
- 将目标`Server`信息设置到`SyncTask`中——`syncTask.setTargetServer(member.getKey())`
- 将数据广播任务提交到`DataSyncer`中
执行数据广播——DataSyncer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public void submit(SyncTask task, long delay) {
// If it's a new task:
if (task.getRetryCount() == 0) {
Iterator<String> iterator = task.getKeys().iterator();
while (iterator.hasNext()) {
String key = iterator.next();
if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
// associated key already exist:
if (Loggers.EPHEMERAL.isDebugEnabled()) {
Loggers.EPHEMERAL.debug("sync already in process, key: {}", key);
}
iterator.remove();
}
}
}
if (task.getKeys().isEmpty()) {
// all keys are removed:
return;
}
GlobalExecutor.submitDataSync(new Runnable() {
@Override
public void run() {
try {
if (servers == null || servers.isEmpty()) {
Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
return;
}
List<String> keys = task.getKeys();
if (Loggers.EPHEMERAL.isDebugEnabled()) {
Loggers.EPHEMERAL.debug("sync keys: {}", keys);
}
Map<String, Datum> datumMap = dataStore.batchGet(keys);
if (datumMap == null || datumMap.isEmpty()) {
// clear all flags of this task:
for (String key : task.getKeys()) {
taskMap.remove(buildKey(key, task.getTargetServer()));
}
return;
}
byte[] data = serializer.serialize(datumMap);
long timestamp = System.currentTimeMillis();
boolean success = NamingProxy.syncData(data, task.getTargetServer());
if (!success) {
SyncTask syncTask = new SyncTask();
syncTask.setKeys(task.getKeys());
syncTask.setRetryCount(task.getRetryCount() + 1);
syncTask.setLastExecuteTime(timestamp);
syncTask.setTargetServer(task.getTargetServer());
retrySync(syncTask);
} else {
// clear all flags of this task:
for (String key : task.getKeys()) {
taskMap.remove(buildKey(key, task.getTargetServer()));
}
}
} catch (Exception e) {
Loggers.EPHEMERAL.error("sync data failed.", e);
}
}
}, delay);
}

GlobalExecutor.submitDataSync(Runnable runnable)提交一个数据广播任务;首先通过SyncTask中的key集合去DataStore中去查询key所对应的数据集合,然后对数据进行序列化操作,转为byte[]数组后,执行Http请求操作——NamingProxy.syncData(data, task.getTargetServer());如果数据广播失败,则将任务重新打包再次压入GlobalExecutor

(这里有一个疑问,SyncTask记录了任务重试的次数,但是却没有根据该次数做一些判断,比如超过多少次server未响应可能是server挂掉了,这里仅仅是记录了重试的次数)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static boolean syncData(byte[] data, String curServer) throws Exception {
try {
Map<String, String> headers = new HashMap<>(128);
headers.put("Client-Version", UtilsAndCommons.SERVER_VERSION);
headers.put("User-Agent", UtilsAndCommons.SERVER_VERSION);
headers.put("Accept-Encoding", "gzip,deflate,sdch");
headers.put("Connection", "Keep-Alive");
headers.put("Content-Encoding", "gzip");

HttpClient.HttpResult result = HttpClient.httpPutLarge("http://" + curServer + RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL, headers, data);

if (HttpURLConnection.HTTP_OK == result.code) {
return true;
}
if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {
return true;
}
throw new IOException("failed to req API:" + "http://" + curServer
+ RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL + ". code:"
+ result.code + " msg: " + result.content);
} catch (Exception e) {
Loggers.SRV_LOG.warn("NamingProxy", e);
}
return false;
}

这里将数据提交到了URL为PUT http://ip:port/nacos/v1/ns//distro/datum中,而该URL对应的处理器为DistroController中的public String onSyncDatum(HttpServletRequest request, HttpServletResponse response)方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public String onSyncDatum(HttpServletRequest request, HttpServletResponse response) throws Exception {

String entity = IOUtils.toString(request.getInputStream(), "UTF-8");

if (StringUtils.isBlank(entity)) {
Loggers.EPHEMERAL.error("[onSync] receive empty entity!");
throw new NacosException(NacosException.INVALID_PARAM, "receive empty entity!");
}

Map<String, Datum<Instances>> dataMap = serializer.deserializeMap(entity.getBytes(), Instances.class);
for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) {
if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) {
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
String serviceName = KeyBuilder.getServiceName(entry.getKey());
if (!serviceManager.containService(namespaceId, serviceName) && switchDomain.isDefaultInstanceEphemeral()) {
serviceManager.createEmptyService(namespaceId, serviceName, true);
}
consistencyService.onPut(entry.getKey(), entry.getValue().value);
}
}
return "ok";
}

这里会调用consistencyService.onPut(entry.getKey(), entry.getValue().value)方法进行数据的更新,注意,onPut方法并不会涉及taskDispatcher.addTask(key);操作,而是将数据更新压入了NotifierTask列表中(Notifier的作用看Nacos Server端注册一个服务实例流程);至此完成了Nacos Server在AP模式下的数据的最终一致性操作。