Nacos一致性协议实现之`Distro`协议浅析

前期导读

Nacos 中的 DistroConsistencyServiceImpl 工作浅析

之前的文章说的很浅显,这次打算重头好好解析下Nacos中使用的alibaba自研的AP协议——Distro

核心代码实现

Nacos Naming 模块启动做的时数据同步

DistroConsistencyServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void load() throws Exception {
if (SystemUtils.STANDALONE_MODE) {
initialized = true;
return;
}
// size = 1 means only myself in the list, we need at least one another server alive:
// 集群模式下,需要等待至少两个节点才可以将逻辑进行
while (serverListManager.getHealthyServers().size() <= 1) {
Thread.sleep(1000L);
Loggers.DISTRO.info("waiting server list init...");
}

// 获取所有健康的集群节点
for (Server server : serverListManager.getHealthyServers()) {
// 自己则不需要进行数据同步广播操作
if (NetUtils.localServer().equals(server.getKey())) {
continue;
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync from " + server);
}
// 从别的服务器进行全量数据拉取操作,只需要执行一次即可,剩下的交由增量同步任务去完成
if (syncAllDataFromRemote(server)) {
initialized = true;
return;
}
}
}
全量数据拉取的动作

数据拉取执行者的动作

1
2
3
4
5
6
7
8
9
10
11
12
public boolean syncAllDataFromRemote(Server server) {
try {
// 获取数据
byte[] data = NamingProxy.getAllData(server.getKey());
// 接收到的数据进行处理
processData(data);
return true;
} catch (Exception e) {
Loggers.DISTRO.error("sync full data from " + server + " failed!", e);
return false;
}
}

数据提供者的响应

1
2
3
4
5
6
@RequestMapping(value = "/datums", method = RequestMethod.GET)
public ResponseEntity getAllDatums(HttpServletRequest request, HttpServletResponse response) throws Exception {
// 直接将存储的数据容器——Map进行序列化传输
String content = new String(serializer.serialize(dataStore.getDataMap()), StandardCharsets.UTF_8);
return ResponseEntity.ok(content);
}

接下来,当从某一个Server Node拉取了全量数据后的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public void processData(byte[] data) throws Exception {
if (data.length > 0) {
// 先将数据进行反序列化
Map<String, Datum<Instances>> datumMap =
serializer.deserializeMap(data, Instances.class);

// 对数据进行遍历处理
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
// 数据放入数据存储容器——DataStore中
dataStore.put(entry.getKey(), entry.getValue());
// 判断监听器是否包含了对这个Key的监听,如果没有,表明是一个新的数据
if (!listeners.containsKey(entry.getKey())) {
// pretty sure the service not exist:
if (switchDomain.isDefaultInstanceEphemeral()) {
// create empty service
Loggers.DISTRO.info("creating service {}", entry.getKey());
Service service = new Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(Constants.DEFAULT_GROUP);
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
// 回调 Listener 监听器,告知新的Service数据
listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0)
.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
}
}
}
// 进行 Listener 的监听回调
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
if (!listeners.containsKey(entry.getKey())) {
// Should not happen:
Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
continue;
}

try {
for (RecordListener listener : listeners.get(entry.getKey())) {
listener.onChange(entry.getKey(), entry.getValue().value);
}
} catch (Exception e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
continue;
}

// Update data store if listener executed successfully:
dataStore.put(entry.getKey(), entry.getValue());
}
}
}

到这里,Nacos Naming模块的Distro协议的初次启动时的数据全量同步到这里就告一段落了,接下来就是数据的增量同步了,首先要介绍一个Distro协议的一个概念——权威Server

权威Server的判断器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class DistroMapper implements ServerChangeListener {

private List<String> healthyList = new ArrayList<>();

public List<String> getHealthyList() {
return healthyList;
}

@Autowired
private SwitchDomain switchDomain;

@Autowired
private ServerListManager serverListManager;

/**
* init server list
*/
@PostConstruct
public void init() {
serverListManager.listen(this);
}

// 判断该数据是否可以由本节点进行响应
public boolean responsible(Cluster cluster, Instance instance) {
return switchDomain.isHealthCheckEnabled(cluster.getServiceName())
&& !cluster.getHealthCheckTask().isCancelled()
&& responsible(cluster.getServiceName())
&& cluster.contains(instance);
}

// 根据 ServiceName 进行 Hash 计算,找到对应的权威节点的索引,判断是否是本节点,是的话表明该数据可以由本节点进行处理
public boolean responsible(String serviceName) {
if (!switchDomain.isDistroEnabled() || SystemUtils.STANDALONE_MODE) {
return true;
}

if (CollectionUtils.isEmpty(healthyList)) {
// means distro config is not ready yet
return false;
}

int index = healthyList.indexOf(NetUtils.localServer());
int lastIndex = healthyList.lastIndexOf(NetUtils.localServer());
if (lastIndex < 0 || index < 0) {
return true;
}

int target = distroHash(serviceName) % healthyList.size();
return target >= index && target <= lastIndex;
}

// 根据 ServiceName 找到权威 Server 的地址
public String mapSrv(String serviceName) {
if (CollectionUtils.isEmpty(healthyList) || !switchDomain.isDistroEnabled()) {
return NetUtils.localServer();
}

try {
return healthyList.get(distroHash(serviceName) % healthyList.size());
} catch (Exception e) {
Loggers.SRV_LOG.warn("distro mapper failed, return localhost: " + NetUtils.localServer(), e);

return NetUtils.localServer();
}
}

public int distroHash(String serviceName) {
return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE);
}

@Override
public void onChangeServerList(List<Server> latestMembers) {

}

@Override
public void onChangeHealthyServerList(List<Server> latestReachableMembers) {

List<String> newHealthyList = new ArrayList<>();
for (Server server : latestReachableMembers) {
newHealthyList.add(server.getKey());
}
healthyList = newHealthyList;
}
}

上面的组件,就是Distro协议的一个重要部分,根据数据进行 Hash 计算查找集群节点列表中的权威节点

节点间的数据增量同步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public class TaskDispatcher {

@Autowired
private GlobalConfig partitionConfig;

@Autowired
private DataSyncer dataSyncer;

private List<TaskScheduler> taskSchedulerList = new ArrayList<>();

private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();

@PostConstruct
public void init() {
// 构建任务执行器
for (int i = 0; i < cpuCoreCount; i++) {
TaskScheduler taskScheduler = new TaskScheduler(i);
taskSchedulerList.add(taskScheduler);
// 任务调度执行器提交
GlobalExecutor.submitTaskDispatch(taskScheduler);
}
}

public void addTask(String key) {
// 根据 Key 进行 Hash 找到一个 TaskScheduler 进行任务提交
taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
}

public class TaskScheduler implements Runnable {

private int index;

private int dataSize = 0;

private long lastDispatchTime = 0L;

private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);

public TaskScheduler(int index) {
this.index = index;
}

public void addTask(String key) {
queue.offer(key);
}

public int getIndex() {
return index;
}

@Override
public void run() {
List<String> keys = new ArrayList<>();
while (true) {
try {
// 从任务缓存队列中获取一个任务(存在超时设置)
String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
TimeUnit.MILLISECONDS);
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("got key: {}", key);
}
// 如果不存在集群或者集群节点为空
if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
continue;
}
if (StringUtils.isBlank(key)) {
continue;
}
if (dataSize == 0) {
keys = new ArrayList<>();
}
// 进行批量任务处理,这里做一次暂存操作,为了避免
keys.add(key);
dataSize++;
// 如果此时的任务暂存数量达到了指定的批量,或者任务的时间达到了最大设定,进行数据同步任务
if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
(System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
// 为每一个server创建一个SyncTask任务
for (Server member : dataSyncer.getServers()) {
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
SyncTask syncTask = new SyncTask();
syncTask.setKeys(keys);
syncTask.setTargetServer(member.getKey());
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
}
// 进行任务提交,同时设置任务延迟执行时间,这里设置为立即执行
dataSyncer.submit(syncTask, 0);
}
lastDispatchTime = System.currentTimeMillis();
dataSize = 0;
}
} catch (Exception e) {
Loggers.DISTRO.error("dispatch sync task failed.", e);
}
}
}
}
}
DataSyncer 数据同步任务的真正执行者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
public class DataSyncer {

...

@PostConstruct
public void init() {
// 执行定期的数据同步任务(每五秒执行一次)
startTimedSync();
}

// 任务提交
public void submit(SyncTask task, long delay) {
// If it's a new task:
if (task.getRetryCount() == 0) {
// 遍历所有的任务 Key
Iterator<String> iterator = task.getKeys().iterator();
while (iterator.hasNext()) {
String key = iterator.next();
// 数据任务放入 Map 中,避免数据同步任务重复提交
if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
// associated key already exist:
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync already in process, key: {}", key);
}
// 如果任务已经存在,则移除该任务的 Key
iterator.remove();
}
}
}
// 如果所有的任务都已经移除了,结束本次任务提交
if (task.getKeys().isEmpty()) {
// all keys are removed:
return;
}
// 异步任务执行数据同步
GlobalExecutor.submitDataSync(() -> {
// 1. check the server
if (getServers() == null || getServers().isEmpty()) {
Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
return;
}
// 获取数据同步任务的实际同步数据
List<String> keys = task.getKeys();
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("try to sync data for this keys {}.", keys);
}
// 2. get the datums by keys and check the datum is empty or not
// 通过key进行批量数据获取
Map<String, Datum> datumMap = dataStore.batchGet(keys);
// 如果数据已经被移除了,取消本次任务
if (datumMap == null || datumMap.isEmpty()) {
// clear all flags of this task:
for (String key : keys) {
taskMap.remove(buildKey(key, task.getTargetServer()));
}
return;
}
// 数据序列化
byte[] data = serializer.serialize(datumMap);
long timestamp = System.currentTimeMillis();
// 进行增量数据同步提交给其他节点
boolean success = NamingProxy.syncData(data, task.getTargetServer());
// 如果本次数据同步任务失败,则重新创建SyncTask,设置重试的次数信息
if (!success) {
SyncTask syncTask = new SyncTask();
syncTask.setKeys(task.getKeys());
syncTask.setRetryCount(task.getRetryCount() + 1);
syncTask.setLastExecuteTime(timestamp);
syncTask.setTargetServer(task.getTargetServer());
retrySync(syncTask);
} else {
// clear all flags of this task:
for (String key : task.getKeys()) {
taskMap.remove(buildKey(key, task.getTargetServer()));
}
}
}, delay);
}

// 任务重试
public void retrySync(SyncTask syncTask) {
Server server = new Server();
server.setIp(syncTask.getTargetServer().split(":")[0]);
server.setServePort(Integer.parseInt(syncTask.getTargetServer().split(":")[1]));
if (!getServers().contains(server)) {
// if server is no longer in healthy server list, ignore this task:
return;
}
// TODO may choose other retry policy.
// 自动延迟重试任务的下次执行时间
submit(syncTask, partitionConfig.getSyncRetryDelay());
}

public void startTimedSync() {
GlobalExecutor.schedulePartitionDataTimedSync(new TimedSync());
}

// 执行周期任务
// 每次将自己负责的数据进行广播到其他的 Server 节点
public class TimedSync implements Runnable {

@Override
public void run() {
try {
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("server list is: {}", getServers());
}
// send local timestamps to other servers:
Map<String, String> keyChecksums = new HashMap<>(64);
// 对数据存储容器的
for (String key : dataStore.keys()) {
// 如果自己不是负责此数据的权威 Server,则无权对此数据做集群间的广播通知操作
if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
continue;
}
// 获取数据操作,
Datum datum = dataStore.get(key);
if (datum == null) {
continue;
}
// 放入数据广播列表
keyChecksums.put(key, datum.value.getChecksum());
}
if (keyChecksums.isEmpty()) {
return;
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync checksums: {}", keyChecksums);
}
// 对集群的所有节点(除了自己),做数据广播操作
for (Server member : getServers()) {
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
// 集群间的数据广播操作
NamingProxy.syncCheckSums(keyChecksums, member.getKey());
}
} catch (Exception e) {
Loggers.DISTRO.error("timed sync task failed.", e);
}
}
}

public List<Server> getServers() {
return serverListManager.getHealthyServers();
}

public String buildKey(String key, String targetServer) {
return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer;
}
}

那么其他节点在接受到数据后的操作是什么

1
2
3
4
5
6
7
8
9
10
11
@RequestMapping(value = "/checksum", method = RequestMethod.PUT)
public ResponseEntity syncChecksum(HttpServletRequest request, HttpServletResponse response) throws Exception {
// 由那个节点传输而来的数据
String source = WebUtils.required(request, "source");
String entity = IOUtils.toString(request.getInputStream(), "UTF-8");
// 数据序列化
Map<String, String> dataMap = serializer.deserialize(entity.getBytes(), new TypeReference<Map<String, String>>() {});
// 数据接收操作
consistencyService.onReceiveChecksums(dataMap, source);
return ResponseEntity.ok("ok");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public void onReceiveChecksums(Map<String, String> checksumMap, String server) {
if (syncChecksumTasks.containsKey(server)) {
// Already in process of this server:
Loggers.DISTRO.warn("sync checksum task already in process with {}", server);
return;
}
// 标记当前 Server 传来的数据正在处理
syncChecksumTasks.put(server, "1");
try {
// 需要更新的 key
List<String> toUpdateKeys = new ArrayList<>();
// 需要删除的 Key
List<String> toRemoveKeys = new ArrayList<>();
// 对传来的数据进行遍历操作
for (Map.Entry<String, String> entry : checksumMap.entrySet()) {
// 如果传来的数据存在由本节点负责的数据,则直接退出本次数据同步操作(违反了权威server的设定要求)
if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) {
// this key should not be sent from remote server:
Loggers.DISTRO.error("receive responsible key timestamp of " + entry.getKey() + " from " + server);
// abort the procedure:
return;
}
// 如果当前数据存储容器不存在这个数据,或者校验值不一样,则进行数据更新操作
if (!dataStore.contains(entry.getKey()) ||
dataStore.get(entry.getKey()).value == null ||
!dataStore.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) {
toUpdateKeys.add(entry.getKey());
}
}
// 直接遍历数据存储容器的所有数据
for (String key : dataStore.keys()) {
// 如果数据不是 source server 负责的,则跳过
if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) {
continue;
}
// 如果同步的数据不包含这个key,表明这个key是需要被删除的
if (!checksumMap.containsKey(key)) {
toRemoveKeys.add(key);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.info("to remove keys: {}, to update keys: {}, source: {}", toRemoveKeys, toUpdateKeys, server);
}
// 执行数据闪出去操作
for (String key : toRemoveKeys) {
onRemove(key);
}
if (toUpdateKeys.isEmpty()) {
return;
}
try {
// 根据需要更新的key进行数据拉取,然后对同步的数据进行操作,剩下的如同最开始的全量数据同步所做的操作
byte[] result = NamingProxy.getData(toUpdateKeys, server);
processData(result);
} catch (Exception e) {
Loggers.DISTRO.error("get data from " + server + " failed!", e);
}
finally {
// Remove this 'in process' flag:
// 移除本次 source server 的数据同步任务标识
syncChecksumTasks.remove(server);
}
}