Nacos 是如何同时实现AP与CP的

两种一致性策略如何在nacos中共存

或许会有疑问,为什么早先的cp模式的Zookeeper或者AP模式的Eureka,都只有支持CAP理论下大家常用的AP实现或者CP实现,而nacos却能够两个都实现呢?

其实CAP理论,仅仅是针对分布式下数据的一致性而言,如果你对于数据的一致性要求不高,可忍受最终一致性,那么AP模式的Eureka就可以满足你了,如果说你对数据的一致性要求很高,那么就使用CP模式的Zookeeper,而追其根本,并不是说EurekaAP的,或者说ZookeeperCP的,而是他们存储的数据的一致性,满足AP或者CP,因此也就不难实现在一个组件中实现AP模式与CP模式共存

1
2
3
4
5
6
7
8
9
@Service("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService {

@Autowired
private PersistentConsistencyService persistentConsistencyService;

@Autowired
private EphemeralConsistencyService ephemeralConsistencyService;
}

DelegateConsistencyServiceImpl是一个一致性策略选择的类,根据不同的策略触发条件(在nacos中,CPAP切换的条件是注册的服务实例是否是临时实例),选择PersistentConsistencyService策略或者EphemeralConsistencyService策略,而EphemeralConsistencyService对应的是DistroConsistencyServiceImpl,采用的协议是阿里自研的Distro,我个人觉得就像gossip协议;PersistentConsistencyService对应的是RaftConsistencyServiceImpl,其底层采用的是Raft协议;这两种一致性策略下的数据存储互不影响,所以nacos实现了AP模式与CP模式在一个组件中同时存在

AP实现

Nacos中的DistroConsistencyServiceImpl工作浅析

CP实现

重要的协议——RAFT

动画演示地址:raft-protocol)

nacos是如何实现CP(raft)的

RaftController

RaftController控制器是raft集群内部节点间通信使用的,具体的信息如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
POST HTTP://{ip:port}/v1/ns/raft/vote : 进行投票请求

POST HTTP://{ip:port}/v1/ns/raft/beat : Leader向Follower发送心跳信息

GET HTTP://{ip:port}/v1/ns/raft/peer : 获取该节点的RaftPeer信息

PUT HTTP://{ip:port}/v1/ns/raft/datum/reload : 重新加载某日志信息

POST HTTP://{ip:port}/v1/ns/raft/datum : Leader接收传来的数据并存入

DELETE HTTP://{ip:port}/v1/ns/raft/datum : Leader接收传来的数据删除操作

GET HTTP://{ip:port}/v1/ns/raft/datum : 获取该节点存储的数据信息

GET HTTP://{ip:port}/v1/ns/raft/state : 获取该节点的状态信息{UP or DOWN}

POST HTTP://{ip:port}/v1/ns/raft/datum/commit : Follower节点接收Leader传来得到数据存入操作

DELETE HTTP://{ip:port}/v1/ns/raft/datum : Follower节点接收Leader传来的数据删除操作

GET HTTP://{ip:port}/v1/ns/raft/leader : 获取当前集群的Leader节点信息

GET HTTP://{ip:port}/v1/ns/raft/listeners : 获取当前Raft集群的所有事件监听者

RaftPeerSet

这个对象存储的是所有raft协议下的节点信息,存储的元素如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 集群节点地址管理
private ServerListManager serverListManager;

// 周期数
private AtomicLong localTerm = new AtomicLong(0L);

// 当前周期内的Leader
private RaftPeer leader = null;

// 所有的节点信息
private Map<String, RaftPeer> peers = new HashMap<String, RaftPeer>();

// 暂时不清楚用途
private Set<String> sites = new HashSet<>();

// 本节点是否已准备完毕
private boolean ready = false;

同时还具备了raft协议下必要的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// 当前IP对应的节点是否是Leader
public boolean isLeader(String ip) {
if (STANDALONE_MODE) {
return true;
}
if (leader == null) {
Loggers.RAFT.warn("[IS LEADER] no leader is available now!");
return false;
}
return StringUtils.equals(leader.ip, ip);
}

// 决定Leader节点,根据投票结果以及是否满足majorityCount机制
public RaftPeer decideLeader(RaftPeer candidate) {
peers.put(candidate.ip, candidate);
SortedBag ips = new TreeBag();
int maxApproveCount = 0;
String maxApprovePeer = null;
for (RaftPeer peer : peers.values()) {
if (StringUtils.isEmpty(peer.voteFor)) {
continue;
}
// 选票计数
ips.add(peer.voteFor);
// 如果某节点的得票数大于当前的最大得票数,则更新候选Leader信息
if (ips.getCount(peer.voteFor) > maxApproveCount) {
maxApproveCount = ips.getCount(peer.voteFor);
maxApprovePeer = peer.voteFor;
}
}
// 是否满足majorityCount数量的限制
if (maxApproveCount >= majorityCount()) {
// 若满足则设置Leader节点信息
RaftPeer peer = peers.get(maxApprovePeer);
peer.state = RaftPeer.State.LEADER;

if (!Objects.equals(leader, peer)) {
leader = peer;
Loggers.RAFT.info("{} has become the LEADER", leader.ip);
}
}
return leader;
}

public RaftPeer makeLeader(RaftPeer candidate) {
// 如果当前Leader与Candidate节点不一样,则进行Leader信息更改
if (!Objects.equals(leader, candidate)) {
leader = candidate;
Loggers.RAFT.info("{} has become the LEADER, local: {}, leader: {}",
leader.ip, JSON.toJSONString(local()), JSON.toJSONString(leader));
}

for (final RaftPeer peer : peers.values()) {
Map<String, String> params = new HashMap<String, String>(1);
// 如果当前节点与远程Leader节点不等且是Follower节点
if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) {
try {
// 获取每个节点的RaftPeer节点信息对象数据
String url = RaftCore.buildURL(peer.ip, RaftCore.API_GET_PEER);
HttpClient.asyncHttpGet(url, null, params, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.error("[NACOS-RAFT] get peer failed: {}, peer: {}", response.getResponseBody(), peer.ip);
peer.state = RaftPeer.State.FOLLOWER;
return 1;
}
update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
return 0;
}
});
} catch (Exception e) {
peer.state = RaftPeer.State.FOLLOWER;
Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: {}", peer.ip);
}
}
}
return update(candidate);
}

RaftCore

该对象是nacosraft协议的主要实现,在启动之初,会进行一系列初始化的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@PostConstruct
public void init() throws Exception {
Loggers.RAFT.info("initializing Raft sub-system");
executor.submit(notifier);
long start = System.currentTimeMillis();
// 进行日志文件的加载到内存数据对象Datums的操作
datums = raftStore.loadDatums(notifier);
// 设置当前的周期数
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
while (true) {
// 等待上一步的数据加载任务全部完成
if (notifier.tasks.size() <= 0) {
break;
}
Thread.sleep(1000L);
}
// 初始化标识更改
initialized = true;
Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
// 开启定时的Leader选举任务
GlobalExecutor.registerMasterElection(new MasterElection());
// 开启定时的Leader心跳服务
GlobalExecutor.registerHeartbeat(new HeartBeat());

Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}

初始化的一系列操作完成后,此时集群还无法对外提供服务,因为此时Leader还未选举出来,需要在MasterElection选举Leader成功后才可以对外提供服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// Leader 选举任务
public class MasterElection implements Runnable {
@Override
public void run() {
try {
// 当前节点是否已准备完毕
if (!peers.isReady()) {
return;
}
// 获取自身节点信息
RaftPeer local = peers.local();
// 本地存储的Leader任期时间
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
// 如果Leader任期时间还在允许范围内,则不进行Leader选举
if (local.leaderDueMs > 0) {
return;
}
// reset timeout
local.resetLeaderDue();
local.resetHeartbeatDue();
// 向其他节点发起投票请求
sendVote();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while master election {}", e);
}
}

public void sendVote() {
RaftPeer local = peers.get(NetUtils.localServer());
Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}",
JSON.toJSONString(getLeader()), local.term);

// Raft node cluster rest
peers.reset();
local.term.incrementAndGet();
// 设置给自己投票
local.voteFor = local.ip;
// update node status to CANDIDATE
local.state = RaftPeer.State.CANDIDATE;
Map<String, String> params = new HashMap<String, String>(1);
params.put("vote", JSON.toJSONString(local));
// 遍历所有的节点信息(除了自己之外)
for (final String server : peers.allServersWithoutMySelf()) {
final String url = buildURL(server, API_VOTE);
try {
HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url);
return 1;
}
// 获取投票结果,并进行Leader的选举工作
RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class);
Loggers.RAFT.info("received approve from peer: {}", JSON.toJSONString(peer));
peers.decideLeader(peer);
return 0;
}
});
} catch (Exception e) {
Loggers.RAFT.warn("error while sending vote to server: {}", server);
}
}
}
}

每个节点启动时,都会认为自己可以作为Leader,因此都会以自去己作为被选举人,向其他节点发起投票请求,而其他节点在接收到投票请求后的工作流程如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 其他节点接收到投票请求后的反应
public RaftPeer receivedVote(RaftPeer remote) {
// 被选举人是否在raft集群节点列表中
if (!peers.contains(remote)) {
throw new IllegalStateException("can not find peer: " + remote.ip);
}

// 获取自身节点信息
RaftPeer local = peers.get(NetUtils.localServer());
// 如果被选举节点的周期数小于本节点的周期数,则将自己的投票投给自己并告诉被选举者
if (remote.term.get() <= local.term.get()) {
String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term;
Loggers.RAFT.info(msg);
if (StringUtils.isEmpty(local.voteFor)) {
local.voteFor = local.ip;
}
return local;
}
// 满足投票条件后,本节点确认将自己的票投给被选举者
local.resetLeaderDue();
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
local.term.set(remote.term.get());
Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);
return local;
}

通过以上步骤,最终选举出了Leader节点,接下来,就可以对外提供服务了

因为是CP模式,所以操作都是通过Leader节点进行传达的,Follower节点本身不与Client进行联系,Follower只能接受来自Leader的操作请求,因此就存在请求转发的问题。因此在RaftCore中的singlePublish以及singleDelete中,存在着对Leader节点的判断以及请求转发的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void signalPublish(String key, Record value) throws Exception {
if (!isLeader()) {
JSONObject params = new JSONObject();
params.put("key", key);
params.put("value", value);
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
// 请求转发
raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
return;
}
...
}

public void signalDelete(final String key) throws Exception {
OPERATE_LOCK.lock();
try {
if (!isLeader()) {
Map<String, String> params = new HashMap<>(1);
params.put("key", URLEncoder.encode(key, "UTF-8"));
// 删除请求进行转发给 leader 进行处理
raftProxy.proxy(getLeader().ip, API_DEL, params, HttpMethod.DELETE);
return;
}
...
}
}

同时,还有一个重要的机制——心跳机制,raft通过心跳机制来维持Leader以及Follower的关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// 心跳任务,如果成为Leader,需要对 follower 发送心跳信息
public class HeartBeat implements Runnable {
@Override
public void run() {
try {
// 程序是否已准备完毕
if (!peers.isReady()) {
return;
}
RaftPeer local = peers.local();
local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
// 心跳周期判断
if (local.heartbeatDueMs > 0) {
return;
}
// 重置心跳发送周期
local.resetHeartbeatDue();
// 发送心跳信息
sendBeat();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
}
}

public void sendBeat() throws IOException, InterruptedException {
RaftPeer local = peers.local();
// 如果自己不是Leader节点或者处于单机模式下,则直接返回
if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) {
return;
}
Loggers.RAFT.info("[RAFT] send beat with {} keys.", datums.size());
// 重置Leader任期时间
local.resetLeaderDue();
// build data
JSONObject packet = new JSONObject();
packet.put("peer", local);
JSONArray array = new JSONArray();
if (switchDomain.isSendBeatOnly()) {
Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", String.valueOf(switchDomain.isSendBeatOnly()));
}
if (!switchDomain.isSendBeatOnly()) {
// 如果开启了在心跳包中携带Leader存储的数据进行发送,则对数据进行打包操作
for (Datum datum : datums.values()) {
JSONObject element = new JSONObject();
if (KeyBuilder.matchServiceMetaKey(datum.key)) {
element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
} else if (KeyBuilder.matchInstanceListKey(datum.key)) {
element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
}
element.put("timestamp", datum.timestamp);
array.add(element);
}
} else {
Loggers.RAFT.info("[RAFT] send beat only.");
}
packet.put("datums", array);
// broadcast
Map<String, String> params = new HashMap<String, String>(1);
params.put("beat", JSON.toJSONString(packet));
// 将参数信息进行 Gzip算法压缩,降低网络消耗
String content = JSON.toJSONString(params);
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out);
gzip.write(content.getBytes("UTF-8"));
gzip.close();

byte[] compressedBytes = out.toByteArray();
String compressedContent = new String(compressedBytes, "UTF-8");
Loggers.RAFT.info("raw beat data size: {}, size of compressed data: {}", content.length(), compressedContent.length());
// 遍历所有的Follower节点进行发送心跳数据包
for (final String server : peers.allServersWithoutMySelf()) {
try {
final String url = buildURL(server, API_BEAT);
Loggers.RAFT.info("send beat to server " + server);
// 采用异步HTTP请求进行心跳数据发送
HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", response.getResponseBody(), server);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
// 成功后接收Follower节点的心跳回复(Follower节点的当前信息)进行节点更新操作
peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
Loggers.RAFT.info("receive beat response from: {}", url);
return 0;
}

@Override
public void onThrowable(Throwable t) {
Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server, t);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
});
} catch (Exception e) {
Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
}
}
}

至于心跳接收的回复操作基本就是Follower节点将自己当前的信息进行数据打包发送给Leader节点,同时也会重置当前Leader的任期时间信息,并且根据接收到心跳信息,进行拉取Leader节点的最新数据信息

为什么要同时实现CP和AP两套一致性策略模式?

或许有的人会问,为什么Nacos要同时实现CP以及AP两种数据的一致性策略。其实在一个组件中同时实现两种数据一致性策略,我觉得这样在做服务注册中心选型时,就不必操心AP选什么组件,CP选什么组件,直接采用nacos就好了,同时满足你AP以及CP的数据一致性需求;直接在一个组件中,享受Zookeeper以及Eureka组件的服务,避免了需要同时维护两种不同的组件的运维代价,只需要根据自己的实例需求,选择不同的注册模式即可。