两种一致性策略如何在nacos中共存
或许会有疑问,为什么早先的cp
模式的Zookeeper
或者AP
模式的Eureka
,都只有支持CAP
理论下大家常用的AP
实现或者CP
实现,而nacos却能够两个都实现呢?
其实CAP
理论,仅仅是针对分布式下数据的一致性而言,如果你对于数据的一致性要求不高,可忍受最终一致性,那么AP
模式的Eureka
就可以满足你了,如果说你对数据的一致性要求很高,那么就使用CP
模式的Zookeeper
,而追其根本,并不是说Eureka
是AP
的,或者说Zookeeper
是CP
的,而是他们存储的数据的一致性,满足AP
或者CP
,因此也就不难实现在一个组件中实现AP
模式与CP
模式共存
1 2 3 4 5 6 7 8 9
| @Service("consistencyDelegate") public class DelegateConsistencyServiceImpl implements ConsistencyService {
@Autowired private PersistentConsistencyService persistentConsistencyService;
@Autowired private EphemeralConsistencyService ephemeralConsistencyService; }
|
DelegateConsistencyServiceImpl
是一个一致性策略选择的类,根据不同的策略触发条件(在nacos中,CP
与AP
切换的条件是注册的服务实例是否是临时实例),选择PersistentConsistencyService
策略或者EphemeralConsistencyService
策略,而EphemeralConsistencyService
对应的是DistroConsistencyServiceImpl
,采用的协议是阿里自研的Distro
,我个人觉得就像gossip
协议;PersistentConsistencyService
对应的是RaftConsistencyServiceImpl
,其底层采用的是Raft
协议;这两种一致性策略下的数据存储互不影响,所以nacos
实现了AP
模式与CP
模式在一个组件中同时存在
AP实现
Nacos中的DistroConsistencyServiceImpl工作浅析
CP实现
重要的协议——RAFT
动画演示地址:raft-protocol)
nacos是如何实现CP(raft)的
RaftController
RaftController
控制器是raft
集群内部节点间通信使用的,具体的信息如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| POST HTTP://{ip:port}/v1/ns/raft/vote : 进行投票请求
POST HTTP://{ip:port}/v1/ns/raft/beat : Leader向Follower发送心跳信息
GET HTTP://{ip:port}/v1/ns/raft/peer : 获取该节点的RaftPeer信息
PUT HTTP://{ip:port}/v1/ns/raft/datum/reload : 重新加载某日志信息
POST HTTP://{ip:port}/v1/ns/raft/datum : Leader接收传来的数据并存入
DELETE HTTP://{ip:port}/v1/ns/raft/datum : Leader接收传来的数据删除操作
GET HTTP://{ip:port}/v1/ns/raft/datum : 获取该节点存储的数据信息
GET HTTP://{ip:port}/v1/ns/raft/state : 获取该节点的状态信息{UP or DOWN}
POST HTTP://{ip:port}/v1/ns/raft/datum/commit : Follower节点接收Leader传来得到数据存入操作
DELETE HTTP://{ip:port}/v1/ns/raft/datum : Follower节点接收Leader传来的数据删除操作
GET HTTP://{ip:port}/v1/ns/raft/leader : 获取当前集群的Leader节点信息
GET HTTP://{ip:port}/v1/ns/raft/listeners : 获取当前Raft集群的所有事件监听者
|
RaftPeerSet
这个对象存储的是所有raft
协议下的节点信息,存储的元素如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| private ServerListManager serverListManager;
private AtomicLong localTerm = new AtomicLong(0L);
private RaftPeer leader = null;
private Map<String, RaftPeer> peers = new HashMap<String, RaftPeer>();
private Set<String> sites = new HashSet<>();
private boolean ready = false;
|
同时还具备了raft
协议下必要的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| public boolean isLeader(String ip) { if (STANDALONE_MODE) { return true; } if (leader == null) { Loggers.RAFT.warn("[IS LEADER] no leader is available now!"); return false; } return StringUtils.equals(leader.ip, ip); }
public RaftPeer decideLeader(RaftPeer candidate) { peers.put(candidate.ip, candidate); SortedBag ips = new TreeBag(); int maxApproveCount = 0; String maxApprovePeer = null; for (RaftPeer peer : peers.values()) { if (StringUtils.isEmpty(peer.voteFor)) { continue; } ips.add(peer.voteFor); if (ips.getCount(peer.voteFor) > maxApproveCount) { maxApproveCount = ips.getCount(peer.voteFor); maxApprovePeer = peer.voteFor; } } if (maxApproveCount >= majorityCount()) { RaftPeer peer = peers.get(maxApprovePeer); peer.state = RaftPeer.State.LEADER;
if (!Objects.equals(leader, peer)) { leader = peer; Loggers.RAFT.info("{} has become the LEADER", leader.ip); } } return leader; }
public RaftPeer makeLeader(RaftPeer candidate) { if (!Objects.equals(leader, candidate)) { leader = candidate; Loggers.RAFT.info("{} has become the LEADER, local: {}, leader: {}", leader.ip, JSON.toJSONString(local()), JSON.toJSONString(leader)); }
for (final RaftPeer peer : peers.values()) { Map<String, String> params = new HashMap<String, String>(1); if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) { try { String url = RaftCore.buildURL(peer.ip, RaftCore.API_GET_PEER); HttpClient.asyncHttpGet(url, null, params, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT.error("[NACOS-RAFT] get peer failed: {}, peer: {}", response.getResponseBody(), peer.ip); peer.state = RaftPeer.State.FOLLOWER; return 1; } update(JSON.parseObject(response.getResponseBody(), RaftPeer.class)); return 0; } }); } catch (Exception e) { peer.state = RaftPeer.State.FOLLOWER; Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: {}", peer.ip); } } } return update(candidate); }
|
RaftCore
该对象是nacos
中raft
协议的主要实现,在启动之初,会进行一系列初始化的操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @PostConstruct public void init() throws Exception { Loggers.RAFT.info("initializing Raft sub-system"); executor.submit(notifier); long start = System.currentTimeMillis(); datums = raftStore.loadDatums(notifier); setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L)); Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm()); while (true) { if (notifier.tasks.size() <= 0) { break; } Thread.sleep(1000L); } initialized = true; Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start)); GlobalExecutor.registerMasterElection(new MasterElection()); GlobalExecutor.registerHeartbeat(new HeartBeat());
Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}", GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS); }
|
初始化的一系列操作完成后,此时集群还无法对外提供服务,因为此时Leader
还未选举出来,需要在MasterElection
选举Leader
成功后才可以对外提供服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| public class MasterElection implements Runnable { @Override public void run() { try { if (!peers.isReady()) { return; } RaftPeer local = peers.local(); local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS; if (local.leaderDueMs > 0) { return; } local.resetLeaderDue(); local.resetHeartbeatDue(); sendVote(); } catch (Exception e) { Loggers.RAFT.warn("[RAFT] error while master election {}", e); } } public void sendVote() { RaftPeer local = peers.get(NetUtils.localServer()); Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JSON.toJSONString(getLeader()), local.term);
peers.reset(); local.term.incrementAndGet(); local.voteFor = local.ip; local.state = RaftPeer.State.CANDIDATE; Map<String, String> params = new HashMap<String, String>(1); params.put("vote", JSON.toJSONString(local)); for (final String server : peers.allServersWithoutMySelf()) { final String url = buildURL(server, API_VOTE); try { HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url); return 1; } RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class); Loggers.RAFT.info("received approve from peer: {}", JSON.toJSONString(peer)); peers.decideLeader(peer); return 0; } }); } catch (Exception e) { Loggers.RAFT.warn("error while sending vote to server: {}", server); } } } }
|
每个节点启动时,都会认为自己可以作为Leader
,因此都会以自去己作为被选举人,向其他节点发起投票请求,而其他节点在接收到投票请求后的工作流程如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public RaftPeer receivedVote(RaftPeer remote) { if (!peers.contains(remote)) { throw new IllegalStateException("can not find peer: " + remote.ip); }
RaftPeer local = peers.get(NetUtils.localServer()); if (remote.term.get() <= local.term.get()) { String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term; Loggers.RAFT.info(msg); if (StringUtils.isEmpty(local.voteFor)) { local.voteFor = local.ip; } return local; } local.resetLeaderDue(); local.state = RaftPeer.State.FOLLOWER; local.voteFor = remote.ip; local.term.set(remote.term.get()); Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term); return local; }
|
通过以上步骤,最终选举出了Leader
节点,接下来,就可以对外提供服务了
因为是CP
模式,所以操作都是通过Leader
节点进行传达的,Follower
节点本身不与Client
进行联系,Follower
只能接受来自Leader
的操作请求,因此就存在请求转发的问题。因此在RaftCore
中的singlePublish
以及singleDelete
中,存在着对Leader
节点的判断以及请求转发的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public void signalPublish(String key, Record value) throws Exception { if (!isLeader()) { JSONObject params = new JSONObject(); params.put("key", key); params.put("value", value); Map<String, String> parameters = new HashMap<>(1); parameters.put("key", key); raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters); return; } ... }
public void signalDelete(final String key) throws Exception { OPERATE_LOCK.lock(); try { if (!isLeader()) { Map<String, String> params = new HashMap<>(1); params.put("key", URLEncoder.encode(key, "UTF-8")); raftProxy.proxy(getLeader().ip, API_DEL, params, HttpMethod.DELETE); return; } ... } }
|
同时,还有一个重要的机制——心跳机制,raft
通过心跳机制来维持Leader
以及Follower
的关系
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
| public class HeartBeat implements Runnable { @Override public void run() { try { if (!peers.isReady()) { return; } RaftPeer local = peers.local(); local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS; if (local.heartbeatDueMs > 0) { return; } local.resetHeartbeatDue(); sendBeat(); } catch (Exception e) { Loggers.RAFT.warn("[RAFT] error while sending beat {}", e); } } public void sendBeat() throws IOException, InterruptedException { RaftPeer local = peers.local(); if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) { return; } Loggers.RAFT.info("[RAFT] send beat with {} keys.", datums.size()); local.resetLeaderDue(); JSONObject packet = new JSONObject(); packet.put("peer", local); JSONArray array = new JSONArray(); if (switchDomain.isSendBeatOnly()) { Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", String.valueOf(switchDomain.isSendBeatOnly())); } if (!switchDomain.isSendBeatOnly()) { for (Datum datum : datums.values()) { JSONObject element = new JSONObject(); if (KeyBuilder.matchServiceMetaKey(datum.key)) { element.put("key", KeyBuilder.briefServiceMetaKey(datum.key)); } else if (KeyBuilder.matchInstanceListKey(datum.key)) { element.put("key", KeyBuilder.briefInstanceListkey(datum.key)); } element.put("timestamp", datum.timestamp); array.add(element); } } else { Loggers.RAFT.info("[RAFT] send beat only."); } packet.put("datums", array); Map<String, String> params = new HashMap<String, String>(1); params.put("beat", JSON.toJSONString(packet)); String content = JSON.toJSONString(params); ByteArrayOutputStream out = new ByteArrayOutputStream(); GZIPOutputStream gzip = new GZIPOutputStream(out); gzip.write(content.getBytes("UTF-8")); gzip.close();
byte[] compressedBytes = out.toByteArray(); String compressedContent = new String(compressedBytes, "UTF-8"); Loggers.RAFT.info("raw beat data size: {}, size of compressed data: {}", content.length(), compressedContent.length()); for (final String server : peers.allServersWithoutMySelf()) { try { final String url = buildURL(server, API_BEAT); Loggers.RAFT.info("send beat to server " + server); HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", response.getResponseBody(), server); MetricsMonitor.getLeaderSendBeatFailedException().increment(); } peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class)); Loggers.RAFT.info("receive beat response from: {}", url); return 0; }
@Override public void onThrowable(Throwable t) { Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server, t); MetricsMonitor.getLeaderSendBeatFailedException().increment(); } }); } catch (Exception e) { Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e); MetricsMonitor.getLeaderSendBeatFailedException().increment(); } } } }
|
至于心跳接收的回复操作基本就是Follower
节点将自己当前的信息进行数据打包发送给Leader
节点,同时也会重置当前Leader
的任期时间信息,并且根据接收到心跳信息,进行拉取Leader
节点的最新数据信息
为什么要同时实现CP和AP两套一致性策略模式?
或许有的人会问,为什么Nacos
要同时实现CP
以及AP
两种数据的一致性策略。其实在一个组件中同时实现两种数据一致性策略,我觉得这样在做服务注册中心选型时,就不必操心AP
选什么组件,CP
选什么组件,直接采用nacos
就好了,同时满足你AP
以及CP
的数据一致性需求;直接在一个组件中,享受Zookeeper
以及Eureka
组件的服务,避免了需要同时维护两种不同的组件的运维代价,只需要根据自己的实例需求,选择不同的注册模式即可。