JRaft 选主流程浅析

JRaft 启动流程

1
2
3
4
5
6
7
8
public synchronized Node start(final boolean startRpcServer) {
...
//Adds RPC server to Server.
NodeManager.getInstance().addAddress(this.serverId.getEndpoint());
// 工厂方法,创建并初始化一个 Raft Node 节点,会初始化一些Raft协议中的一些配置以及相应的触发器
this.node = RaftServiceFactory.createAndInitRaftNode(this.groupId, this.serverId, this.nodeOptions);
...
}

现在看下这个RaftServiceFactory.createAndInitRaftNode做了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public boolean init(final NodeOptions opts) {
...
// 构建时间触发器,
// 这里构建了一个选主投票的时间触发器,当节点启动时直接触发,等待一段时间,如果还没有选举出Leader
// 自己就会因为选举超时的原因,自己提升自己的节点角色,开启自选举以及发起Leader选举
this.voteTimer = new RepeatedTimer("JRaft-VoteTimer", this.options.getElectionTimeoutMs()) {

@Override
protected void onTrigger() {
// 当超时后,触发投票动作
handleVoteTimeout();
}

@Override
protected int adjustTimeout(final int timeoutMs) {
// 随机时间调整,具体参考Raft论文中的细则
return randomTimeout(timeoutMs);
}
};
// 构建了一个选主的时间触发器,当Leader的租约到期没有继续续约时,会触发这个Timer
this.electionTimer = new RepeatedTimer("JRaft-ElectionTimer", this.options.getElectionTimeoutMs()) {

@Override
protected void onTrigger() {
// 当超时后,触发选举动作
handleElectionTimeout();
}

@Override
protected int adjustTimeout(final int timeoutMs) {
return randomTimeout(timeoutMs);
}
};
...
}

voteTimer

handleVoteTimeout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
private void handleVoteTimeout() {
this.writeLock.lock();
if (this.state == State.STATE_CANDIDATE) {
LOG.debug("Node {} term {} retry elect.", getNodeId(), this.currTerm);
// 自我选举
electSelf();
} else {
this.writeLock.unlock();
}
}

private void electSelf() {
long oldTerm;
try {
LOG.info("Node {} start vote and grant vote self, term={}.", getNodeId(), this.currTerm);
// 如果当前的配置元数据中布包好自己的ip信息,结束本次自我选举操作
if (!this.conf.contains(this.serverId)) {
LOG.warn("Node {} can't do electSelf as it is not in {}.", getNodeId(), this.conf);
return;
}
// 判断当前节点的状态是否是Follower状态,如果是,接收electionTimer
if (this.state == State.STATE_FOLLOWER) {
LOG.debug("Node {} stop election timer, term={}.", getNodeId(), this.currTerm);
this.electionTimer.stop();
}
// 重置Leader Ip信息,置为 0.0.0.0(即任何一ip都可能为Ledaer节点)
resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "A follower's leader_id is reset to NULL as it begins to request_vote."));
// 设置当前节点状态为 candidate
this.state = State.STATE_CANDIDATE;
// 任期自增
this.currTerm++;
// 自我投票
this.votedId = this.serverId.copy();
LOG.debug("Node {} start vote timer, term={} .", getNodeId(), this.currTerm);
// 投票器开启
this.voteTimer.start();
// 投票上下文开启并初始化
this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
// 记录旧的 Term 信息
oldTerm = this.currTerm;
} finally {
this.writeLock.unlock();
}

// 获取当前最新的日志索引信息
final LogId lastLogId = this.logManager.getLastLogId(true);

this.writeLock.lock();
try {
// vote need defense ABA after unlock&writeLock
if (oldTerm != this.currTerm) {
LOG.warn("Node {} raise term {} when getLastLogId.", getNodeId(), this.currTerm);
return;
}
// 遍历所以的 Raft Node 节点,进行发起 Rpc 请求
for (final PeerId peer : this.conf.listPeers()) {
if (peer.equals(this.serverId)) {
continue;
}
// 连接远端节点
if (!this.rpcService.connect(peer.getEndpoint())) {
LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
continue;
}
// 创建一个 RequestVote 的 RPC 请求
final OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, this.currTerm, this);
done.request = RequestVoteRequest.newBuilder() //
.setPreVote(false) // It's not a pre-vote request.
.setGroupId(this.groupId) //
.setServerId(this.serverId.toString()) //
.setPeerId(peer.toString()) //
.setTerm(this.currTerm) //
.setLastLogIndex(lastLogId.getIndex()) //
.setLastLogTerm(lastLogId.getTerm()) //
.build();
// 发起 rpc 请求
this.rpcService.requestVote(peer.getEndpoint(), done.request, done);
}
// 元数据存储,记录当前任期以及自己的投票结果
this.metaStorage.setTermAndVotedFor(this.currTerm, this.serverId);
// 这里是进行投票结果判断
this.voteCtx.grant(this.serverId);
if (this.voteCtx.isGranted()) {
// 成为 Leader 节点
becomeLeader();
}
} finally {
this.writeLock.unlock();
}
}

// 处理 Request Vote 投票请求响应的处理
public void handleRequestVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) {
this.writeLock.lock();
try {
// 判断当前节点状态,还有为 candidate状态的节点可以处理,否则直接跳过
if (this.state != State.STATE_CANDIDATE) {
LOG.warn("Node {} received invalid RequestVoteResponse from {}, state not in STATE_CANDIDATE but {}.", getNodeId(), peerId, this.state);
return;
}
// check stale term
// 判断任期是否与当前任期相同
if (term != this.currTerm) {
LOG.warn("Node {} received stale RequestVoteResponse from {}, term={}, currTerm={}.", getNodeId(), peerId, term, this.currTerm);
return;
}
// check response term
// 如果其他节点所处的任期大于自己的任期,则自己调整自己的状态
if (response.getTerm() > this.currTerm) {
LOG.warn("Node {} received invalid RequestVoteResponse from {}, term={}, expect={}.", getNodeId(), peerId, response.getTerm(), this.currTerm);
stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE, "Raft node receives higher term request_vote_response."));
return;
}
// check granted quorum?
// 检查响应是否认可本次发起的选举,认可,投票箱中加一个票数,同时计算此时的投票结果,是否达到了过半的要求
if (response.getGranted()) {
this.voteCtx.grant(peerId);
if (this.voteCtx.isGranted()) {
becomeLeader();
}
}
} finally {
this.writeLock.unlock();
}
}

private void becomeLeader() {
Requires.requireTrue(this.state == State.STATE_CANDIDATE, "Illegal state: " + this.state);
LOG.info("Node {} become leader of group, term={}, conf={}, oldConf={}.", getNodeId(), this.currTerm, this.conf.getConf(), this.conf.getOldConf());
// cancel candidate vote timer
// 停止投票计时器
stopVoteTimer();
// 更新自己的节点状态信息
this.state = State.STATE_LEADER;
// 拷贝自己的地址信息
this.leaderId = this.serverId.copy();
// 重置复制组的任期信息
this.replicatorGroup.resetTerm(this.currTerm);
for (final PeerId peer : this.conf.listPeers()) {
if (peer.equals(this.serverId)) {
continue;
}
LOG.debug("Node {} add replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
// 重新添加 follower
if (!this.replicatorGroup.addReplicator(peer)) {
LOG.error("Fail to add replicator, peer={}.", peer);
}
}
// init commit manager
this.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() + 1);
// Register _conf_ctx to reject configuration changing before the first log
// is committed.
if (this.confCtx.isBusy()) {
throw new IllegalStateException();
}
// 刷新当前元数据信息
this.confCtx.flush(this.conf.getConf(), this.conf.getOldConf());
this.stepDownTimer.start();
}
其他节点如何处理Request Vote请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Override
public Message handleRequestVoteRequest(final RequestVoteRequest request) {
boolean doUnlock = true;
this.writeLock.lock();
try {
// 判断当前节点是否处于活跃状态
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s is not in active state, state %s.",
getNodeId(), this.state.name());
}
// 创建一个 PeerId,用于承载发起其请求的 server 节点
final PeerId candidateId = new PeerId();
// 对发起请求的节点信息进行格式化
if (!candidateId.parse(request.getServerId())) {
LOG.warn("Node {} received RequestVoteRequest from {} serverId bad format.", getNodeId(), request.getServerId());
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse candidateId failed: %s.",
request.getServerId());
}

// noinspection ConstantConditions
do {
// check term
// 如果请求的 term 大于自己节点当前所处的 term,则按照raft协议的原则,接受本次的氢气
if (request.getTerm() >= this.currTerm) {
LOG.info("Node {} received RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm);
// increase current term, change state to follower
// 如果是大于,进行 term 的切换
if (request.getTerm() > this.currTerm) {
stepDown(request.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE, "Raft node receives higher term RequestVoteRequest."));
}
} else {
// 如果是等于,直接忽略
// ignore older term
LOG.info("Node {} ignore RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm);
break;
}
doUnlock = false;
this.writeLock.unlock();

// 获取最新的日志信息
final LogId lastLogId = this.logManager.getLastLogId(true);

doUnlock = true;
this.writeLock.lock();
// vote need ABA check after unlock&writeLock
if (request.getTerm() != this.currTerm) {
LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);
break;
}
// 根据请求携带的最新的日志索引信息以及最新日志所处的term信息构建一个LogId,和当前节点的最新LogId进行比较
final boolean logIsOk = new LogId(request.getLastLogIndex(), request.getLastLogTerm()).compareTo(lastLogId) >= 0;
// 如果验证通过,并且没有投票给其他节点
if (logIsOk && (this.votedId == null || this.votedId.isEmpty())) {
// 修改自己所处的term任期
stepDown(request.getTerm(), false, new Status(RaftError.EVOTEFORCANDIDATE, "Raft node votes for some candidate, step down to restart election_timer."));
// 记录当前自己的投票
this.votedId = candidateId.copy();
// 元数据记录自己所投的票
this.metaStorage.setVotedFor(candidateId);
}
} while (false);
// 构建Request Vote请求处理后的Response
return RequestVoteResponse.newBuilder() //
.setTerm(this.currTerm) //
// 自己是否同意当前的请求
.setGranted(request.getTerm() == this.currTerm && candidateId.equals(this.votedId)) //
.build();
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}

那么这个投票,其他节点是怎么来判断该请求后是否可以同意的呢?关键在于对term以及LogId的比较,这里看来LogId的比较

1
2
3
4
5
6
7
8
9
10
@Override
public int compareTo(final LogId o) {
// 优先比较term,如果term相等,在比较日志索引的新旧关系
final int c = Long.compare(getTerm(), o.getTerm());
if (c == 0) {
return Long.compare(getIndex(), o.getIndex());
} else {
return c;
}
}

如果request中携带的term大于节点自身所处的term,则直接同意request,如果term相同,则需要比较request携带的LogId Index信息与节点自身当前的LogId Index最新信息比较

electionTimer

handleElectionTimeout
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
private void handleElectionTimeout() {
boolean doUnlock = true;
this.writeLock.lock();
try {
// 如果当前节点的状态非 Follower,直接结束本次触发任务
if (this.state != State.STATE_FOLLOWER) {
return;
}
// 判断当前的 Leader 的租约是否有效
if (isCurrentLeaderValid()) {
return;
}
// 如果租约过期,重新设置 Leader 信息
resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "Lost connection from leader %s.", this.leaderId));
doUnlock = false;
// 开启预投票机制(为了避免 term 因为某些情况迅速提升)
preVote();
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}

private void preVote() {
long oldTerm;
try {
LOG.info("Node {} term {} start preVote.", getNodeId(), this.currTerm);
// 判断当前的快照执行器是否正在执行快照安装操作
if (this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) {
LOG.warn("Node {} term {} doesn't do preVote when installing snapshot as the configuration may be out of date.", getNodeId());
return;
}
if (!this.conf.contains(this.serverId)) {
LOG.warn("Node {} can't do preVote as it is not in conf <{}>.", getNodeId(), this.conf);
return;
}
oldTerm = this.currTerm;
} finally {
this.writeLock.unlock();
}

final LogId lastLogId = this.logManager.getLastLogId(true);

boolean doUnlock = true;
this.writeLock.lock();
try {
// pre_vote need defense ABA after unlock&writeLock
if (oldTerm != this.currTerm) {
LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);
return;
}
// 预投票箱初始化
this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
for (final PeerId peer : this.conf.listPeers()) {
if (peer.equals(this.serverId)) {
continue;
}
if (!this.rpcService.connect(peer.getEndpoint())) {
LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
continue;
}
// 发起预投票请求
final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
done.request = RequestVoteRequest.newBuilder() //
.setPreVote(true) // it's a pre-vote request.
.setGroupId(this.groupId) //
.setServerId(this.serverId.toString()) //
.setPeerId(peer.toString()) //
// 很重要的一点,preVote 之所以能够避免 term 在某些极端情况下快速上升,就是在这里做了优化
.setTerm(this.currTerm + 1) // next term
.setLastLogIndex(lastLogId.getIndex()) //
.setLastLogTerm(lastLogId.getTerm()) //
.build();
this.rpcService.preVote(peer.getEndpoint(), done.request, done);
}
// 同刚刚的自选举一样的意思
// 如果发现自己的预投票确认箱得到半数同意,则开启自投票选举
this.prevVoteCtx.grant(this.serverId);
if (this.prevVoteCtx.isGranted()) {
doUnlock = false;
electSelf();
}
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}