Jraft中节点提交一个任务源码浅析

JRaft 中的事务提交

Apply Task
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
public void apply(final Task task) {
if (this.shutdownLatch != null) {
Utils.runClosureInThread(task.getDone(), new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
throw new IllegalStateException("Node is shutting down");
}
Requires.requireNonNull(task, "Null task");

// 创建一个 LogEntry 对象用于承载日志数据
final LogEntry entry = new LogEntry();
entry.setData(task.getData());
int retryTimes = 0;
try {
final EventTranslator<LogEntryAndClosure> translator = (event, sequence) -> {
event.reset();
event.done = task.getDone();
event.entry = entry;
event.expectedTerm = task.getExpectedTerm();
};
while (true) {
// 异步化发送日志
if (this.applyQueue.tryPublishEvent(translator)) {
break;
} else {
retryTimes++;
if (retryTimes > MAX_APPLY_RETRY_TIMES) {
Utils.runClosureInThread(task.getDone(), new Status(RaftError.EBUSY, "Node is busy, has too many tasks."));
LOG.warn("Node {} applyQueue is overload.", getNodeId());
this.metrics.recordTimes("apply-task-overload-times", 1);
return;
}
ThreadHelper.onSpinWait();
}
}

} catch (final Exception e) {
Utils.runClosureInThread(task.getDone(), new Status(RaftError.EPERM, "Node is down."));
}
}

这里采用了异步任务的方式,通过publishEvent进行事件发布LogEntryAndClosure任务,随后由对应的事件处理器进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private class LogEntryAndClosureHandler implements EventHandler<LogEntryAndClosure> {
// task list for batch
// 暂存任务队列
private final List<LogEntryAndClosure> tasks = new ArrayList<>(NodeImpl.this.raftOptions.getApplyBatch());

@Override
public void onEvent(final LogEntryAndClosure event, final long sequence, final boolean endOfBatch) throws Exception {
// 如果出现了Node的shutdown事件,则尽量将未提交的任务进行apply
if (event.shutdownLatch != null) {
if (!this.tasks.isEmpty()) {
executeApplyingTasks(this.tasks);
}
final int num = GLOBAL_NUM_NODES.decrementAndGet();
LOG.info("The number of active nodes decrement to {}.", num);
event.shutdownLatch.countDown();
return;
}

this.tasks.add(event);
// jraft 采用了任务批量提交的形式,将多个LogEntry缓存在一个List数组中,然后通过判断是否
// 到达了批次值以及是否是消息队列批次的最后一个,进行任务的批量提交
if (this.tasks.size() >= NodeImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
executeApplyingTasks(this.tasks);
this.tasks.clear();
}
}
}
处理任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {
this.writeLock.lock();
try {
final int size = tasks.size();
// 当前节点类型判断,如果不是Leader节点,则拒绝任务执行
if (this.state != State.STATE_LEADER) {
final Status st = new Status();
if (this.state != State.STATE_TRANSFERRING) {
st.setError(RaftError.EPERM, "Is not leader.");
} else {
st.setError(RaftError.EBUSY, "Is transferring leadership.");
}
LOG.debug("Node {} can't apply, status={}.", getNodeId(), st);
for (int i = 0; i < size; i++) {
Utils.runClosureInThread(tasks.get(i).done, st);
}
return;
}
// 创建一个 LogEntry 列表
final List<LogEntry> entries = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
// 从中获取一个任务对象
final LogEntryAndClosure task = tasks.get(i);
// 任务的期望任期与当前节点的任期进行比对
// 如果不满足条件,拒绝任务提交
if (task.expectedTerm != -1 && task.expectedTerm != this.currTerm) {
LOG.debug("Node {} can't apply task whose expectedTerm={} doesn't match currTerm={}.", getNodeId(), task.expectedTerm, this.currTerm);
if (task.done != null) {
final Status st = new Status(RaftError.EPERM, "expected_term=%d doesn't match current_term=%d", task.expectedTerm, this.currTerm);
Utils.runClosureInThread(task.done, st);
}
continue;
}
// 添加本次日志同步的投票器
if (!this.ballotBox.appendPendingTask(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf(), task.done)) {
Utils.runClosureInThread(task.done, new Status(RaftError.EINTERNAL, "Fail to append task."));
continue;
}
// set task entry info before adding to list.
task.entry.getId().setTerm(this.currTerm);
task.entry.setType(EnumOutter.EntryType.ENTRY_TYPE_DATA);
entries.add(task.entry);
}
this.logManager.appendEntries(entries, new LeaderStableClosure(entries));
// update conf.first
this.conf = this.logManager.checkAndSetConfiguration(this.conf);
} finally {
this.writeLock.unlock();
}
}

这里看到,在预提交LogEntry的时候,创建了一个new LeaderStableClosure(entries)对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class LeaderStableClosure extends LogManager.StableClosure {

public LeaderStableClosure(final List<LogEntry> entries) {
super(entries);
}

@Override
public void run(final Status status) {
if (status.isOk()) {
// 进行日志提交(内部存在半数同意机制)
NodeImpl.this.ballotBox.commitAt(this.firstLogIndex, this.firstLogIndex + this.nEntries - 1, NodeImpl.this.serverId);
} else {
LOG.error("Node {} append [{}, {}] failed.", getNodeId(), this.firstLogIndex, this.firstLogIndex + this.nEntries - 1);
}
}
}

这里直接使用了LogManagerImpl这个LogManager的实现类提交本批次的LogEntries日志队列,然后进入appendEntries方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@Override
public void appendEntries(final List<LogEntry> entries, final StableClosure done) {
Requires.requireNonNull(done, "done");
if (this.hasError) {
entries.clear();
Utils.runClosureInThread(done, new Status(RaftError.EIO, "Corrupted LogStorage"));
return;
}
boolean doUnlock = true;
this.writeLock.lock();
try {
// 对 entries 进行检查冲突
if (!entries.isEmpty() && !checkAndResolveConflict(entries, done)) {
entries.clear();
Utils.runClosureInThread(done, new Status(RaftError.EINTERNAL, "Fail to checkAndResolveConflict."));
return;
}
for (int i = 0; i < entries.size(); i++) {
final LogEntry entry = entries.get(i);
// Set checksum after checkAndResolveConflict
if (this.raftOptions.isEnableLogEntryChecksum()) {
entry.setChecksum(entry.checksum());
}
// 元数据的日志数据,需要执行额外的操作
if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) {
Configuration oldConf = new Configuration();
if (entry.getOldPeers() != null) {
oldConf = new Configuration(entry.getOldPeers());
}
final ConfigurationEntry conf = new ConfigurationEntry(entry.getId(), new Configuration(entry.getPeers()), oldConf);
this.configManager.add(conf);
}
}
if (!entries.isEmpty()) {
// 设置待提交日志列表中的第一个待提交的日志id信息
done.setFirstLogIndex(entries.get(0).getId().getIndex());
this.logsInMemory.addAll(entries);
}
// 设置 entries 信息
done.setEntries(entries);

int retryTimes = 0;
final EventTranslator<StableClosureEvent> translator = (event, sequence) -> {
event.reset();
event.type = EventType.OTHER;
event.done = done;
};
while (true) {
// 尝试发布一个 StableClosure 事件
if (tryOfferEvent(done, translator)) {
break;
} else {
retryTimes++;
if (retryTimes > APPEND_LOG_RETRY_TIMES) {
reportError(RaftError.EBUSY.getNumber(), "LogManager is busy, disk queue overload.");
return;
}
ThreadHelper.onSpinWait();
}
}
doUnlock = false;
// 此处就是将LogEntries进行通知Follower的关键步奏
if (!wakeupAllWaiter(this.writeLock)) {
notifyLastLogIndexListeners();
}
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}

StableClosureEvent事件的处理器如下,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
private class StableClosureEventHandler implements EventHandler<StableClosureEvent> {
LogId lastId = LogManagerImpl.this.diskId;
List<StableClosure> storage = new ArrayList<>(256);
AppendBatcher ab = new AppendBatcher(this.storage, 256, new ArrayList<>(),
LogManagerImpl.this.diskId);

@Override
public void onEvent(final StableClosureEvent event, final long sequence, final boolean endOfBatch) throws Exception {
if (event.type == EventType.SHUTDOWN) {
this.lastId = this.ab.flush();
setDiskId(this.lastId);
LogManagerImpl.this.shutDownLatch.countDown();
return;
}
final StableClosure done = event.done;

// 如果异步回调的 StableClosure 存在日志信息,则进行暂存
// 而上面的方法中,entries有可能为empty的话,因此这里
if (done.getEntries() != null && !done.getEntries().isEmpty()) {
this.ab.append(done);
} else {
// 由于上面一步中,设置为了 type=other,因此switch case中的都没有匹配的条件
// 强制flush日志信息,返回最新的日志条目id信息
this.lastId = this.ab.flush();
boolean ret = true;
switch (event.type) {
case LAST_LOG_ID:
((LastLogIdClosure) done).setLastLogId(this.lastId.copy());
break;
// 与快照有关
case TRUNCATE_PREFIX:
long startMs = Utils.monotonicMs();
try {
final TruncatePrefixClosure tpc = (TruncatePrefixClosure) done;
LOG.debug("Truncating storage to firstIndexKept={}", tpc.firstIndexKept);
ret = LogManagerImpl.this.logStorage.truncatePrefix(tpc.firstIndexKept);
} finally {
LogManagerImpl.this.nodeMetrics.recordLatency("truncate-log-prefix", Utils.monotonicMs()
- startMs);
}
break;
// 与日志条目检查与解决冲突有关
case TRUNCATE_SUFFIX:
startMs = Utils.monotonicMs();
try {
final TruncateSuffixClosure tsc = (TruncateSuffixClosure) done;
LOG.warn("Truncating storage to lastIndexKept={}", tsc.lastIndexKept);
ret = LogManagerImpl.this.logStorage.truncateSuffix(tsc.lastIndexKept);
if (ret) {
this.lastId.setIndex(tsc.lastIndexKept);
this.lastId.setTerm(tsc.lastTermKept);
Requires.requireTrue(this.lastId.getIndex() == 0 || this.lastId.getTerm() != 0);
}
} finally {
LogManagerImpl.this.nodeMetrics.recordLatency("truncate-log-suffix", Utils.monotonicMs()
- startMs);
}
break;
case RESET:
final ResetClosure rc = (ResetClosure) done;
LOG.info("Reseting storage to nextLogIndex={}", rc.nextLogIndex);
ret = LogManagerImpl.this.logStorage.reset(rc.nextLogIndex);
break;
default:
break;
}

if (!ret) {
reportError(RaftError.EIO.getNumber(), "Failed operation in LogStorage");
} else {
done.run(Status.OK());
}
}
if (endOfBatch) {
this.lastId = this.ab.flush();
setDiskId(this.lastId);
}
}
}

唤醒所有的等待者——也就是Leader需要发数据到Follower,而JRaft采取了等待策略,如果当前没有需要同步的的LogEntry,会创建一个onNewLogCallback回调接口,等待当有新的LogEntry进来时,异步通知调用此接口完成LogEntriesFollower的同步,关于创建此接口的位置,就是Replicator——FollowerLeader中的对应处理对象

1
2
3
4
5
6
7
8
9
10
11
12
13
private void waitMoreEntries(final long nextWaitIndex) {
try {
LOG.debug("Node {} waits more entries", this.options.getNode().getNodeId());
if (this.waitId >= 0) {
return;
}
this.waitId = this.options.getLogManager().wait(nextWaitIndex - 1,
(arg, errorCode) -> continueSending((ThreadId) arg, errorCode), this.id);
this.statInfo.runningState = RunningState.IDLE;
} finally {
this.id.unlock();
}
}

接着回到LogManager回调触发onNewLogCallback接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private boolean wakeupAllWaiter(final Lock lock) {
if (this.waitMap.isEmpty()) {
lock.unlock();
return false;
}
// 拷贝一份 WaitMeta 列表,然后将原有的数据 clear
final List<WaitMeta> wms = new ArrayList<>(this.waitMap.values());
final int errCode = this.stopped ? RaftError.ESTOP.getNumber() : RaftError.SUCCESS.getNumber();
this.waitMap.clear();
lock.unlock();

final int waiterCount = wms.size();
for (int i = 0; i < waiterCount; i++) {
// 获取 WaitMeta 实例信息
final WaitMeta wm = wms.get(i);
wm.errorCode = errCode;
// 异步执行此回调接口
Utils.runInThread(() -> runOnNewLog(wm));
}
return true;
}

接着来看看这个异步接口做的事情是什么

日志传输给Follower
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
this.waitId = this.options.getLogManager().wait(nextWaitIndex - 1,
(arg, errorCode) -> continueSending((ThreadId) arg, errorCode), this.id);

static boolean continueSending(final ThreadId id, final int errCode) {
if (id == null) {
//It was destroyed already
return true;
}
final Replicator r = (Replicator) id.lock();
if (r == null) {
return false;
}
r.waitId = -1;
// 如果`Leader`自身出现了错误,则本次发送一个空的日志且非心跳请求
if (errCode == RaftError.ETIMEDOUT.getNumber()) {
// Send empty entries after block timeout to check the correct
// _next_index otherwise the replicator is likely waits in executor.shutdown();
// _wait_more_entries and no further logs would be replicated even if the
// last_index of this followers is less than |next_index - 1|
r.sendEmptyEntries(false);
} else if (errCode != RaftError.ESTOP.getNumber()) {
// id is unlock in _send_entries
r.sendEntries();
} else {
LOG.warn("Replicator {} stops sending entries.", id);
id.unlock();
}
return true;
}

private boolean sendEntries(final long nextSendingIndex) {
...
// 使用 RPC 异步提交日志请求——`appendEnrties request`
final Future<Message> rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(),
request, -1, new RpcResponseClosureAdapter<AppendEntriesResponse>() {

@Override
public void run(final Status status) {
RecycleUtil.recycle(recyclable);
// 当请求接收到回复时,触发`onRpcReturned`回调方法做事务判断
onRpcReturned(Replicator.this.id, RequestType.AppendEntries, status, request, getResponse(), seq,
v, monotonicSendTimeMs);
}

});
// 将本次异步请求的`Future`保存在一个`Inflight`队列中
addInflight(RequestType.AppendEntries, nextSendingIndex, request.getEntriesCount(), request.getData().size(), seq, rpcFuture);
...
}

接着看当触发onRpcReturned回调函数时做了什么事情

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
static void onRpcReturned(final ThreadId id, final RequestType reqType, final Status status, final Message request, final Message response, final int seq, final int stateVersion, final long rpcSendTime) {
if (id == null) {
return;
}
// 开启任务处理耗时计时
final long startTimeMs = Utils.nowMs();
Replicator r;
if ((r = (Replicator) id.lock()) == null) {
return;
}

// version 判断
if (stateVersion != r.version) {
LOG.debug(
"Replicator {} ignored old version response {}, current version is {}, request is {}\n, and response is {}\n, status is {}.",
r, stateVersion, r.version, request, response, status);
id.unlock();
return;
}
// 一个根据请求 seq 排序的优先队列(seq是全局单调递增的)
final PriorityQueue<RpcResponse> holdingQueue = r.pendingResponses;
holdingQueue.add(new RpcResponse(reqType, seq, status, request, response, rpcSendTime));
// hold 主的 Rpc返回结果太多了,该节点的未处理数量超出所能承受的范围
if (holdingQueue.size() > r.raftOptions.getMaxReplicatorInflightMsgs()) {
LOG.warn("Too many pending responses {} for replicator {}, maxReplicatorInflightMsgs={}", holdingQueue.size(), r.options.getPeerId(), r.raftOptions.getMaxReplicatorInflightMsgs());
// 重置节点的状态
r.resetInflights();
r.state = State.Probe;
r.sendEmptyEntries(false);
return;
}

boolean continueSendEntries = false;

final boolean isLogDebugEnabled = LOG.isDebugEnabled();
StringBuilder sb = null;
if (isLogDebugEnabled) {
sb = new StringBuilder("Replicator ").append(r).append(" is processing RPC responses,");
}
try {
int processed = 0;
// 循环处理
while (!holdingQueue.isEmpty()) {
final RpcResponse queuedPipelinedResponse = holdingQueue.peek();

//sequence mismatch, waiting for next response.
// 表明当前乱序了
if (queuedPipelinedResponse.seq != r.requiredNextSeq) {
// 如果之前存在处理,则到此直接break循环
if (processed > 0) {
if (isLogDebugEnabled) {
sb.append("has processed ").append(processed).append(" responses,");
}
break;
} else {
//Do not processed any responses, UNLOCK id and return.
continueSendEntries = false;
id.unlock();
return;
}
}
holdingQueue.remove();
processed++;
// 获取最先一次任务的Inflight数据(本质是一个队列)
final Inflight inflight = r.pollInflight();
if (inflight == null) {
// The previous in-flight requests were cleared.
if (isLogDebugEnabled) {
sb.append("ignore response because request not found:").append(queuedPipelinedResponse)
.append(",\n");
}
continue;
}
// 序列不匹配,重置节点状态,阻止继续向该节点发送 LogEntries
if (inflight.seq != queuedPipelinedResponse.seq) {
// reset state
LOG.warn(
"Replicator {} response sequence out of order, expect {}, but it is {}, reset state to try again.",
r, inflight.seq, queuedPipelinedResponse.seq);
r.resetInflights();
r.state = State.Probe;
continueSendEntries = false;
// 锁住节点,根据错误类别等待一段时间
r.block(Utils.nowMs(), RaftError.EREQUEST.getNumber());
return;
}
try {
// 对请求类型进判断
switch (queuedPipelinedResponse.requestType) {
// 如果是 AppendEntriesRequest,则触发onAppendEntriesReturned回调
case AppendEntries:
continueSendEntries = onAppendEntriesReturned(id, inflight, queuedPipelinedResponse.status,
(AppendEntriesRequest) queuedPipelinedResponse.request,
(AppendEntriesResponse) queuedPipelinedResponse.response, rpcSendTime, startTimeMs, r);
break;
// 如果是 InstallSnapshotRequest,则触发onInstallSnapshotReturned回调
case Snapshot:
continueSendEntries = onInstallSnapshotReturned(id, r, queuedPipelinedResponse.status,
(InstallSnapshotRequest) queuedPipelinedResponse.request,
(InstallSnapshotResponse) queuedPipelinedResponse.response);
break;
}
} finally {
// 判断是否可以继续发送下一条日志
if (continueSendEntries) {
// Success, increase the response sequence.
r.getAndIncrementRequiredNextSeq();
} else {
// The id is already unlocked in onAppendEntriesReturned/onInstallSnapshotReturned, we SHOULD break out.
break;
}
}
}
} finally {
if (isLogDebugEnabled) {
sb.append(", after processed, continue to send entries: ").append(continueSendEntries);
LOG.debug(sb.toString());
}
if (continueSendEntries) {
// unlock in sendEntries.
// 继续发送日志
r.sendEntries();
}
}
}

这里可以看到,当一切顺利时,会触发onAppendEntriesReturned函数回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight inflight, final Status status, final AppendEntriesRequest request, final AppendEntriesResponse response, final long rpcSendTime, final long startTimeMs, final Replicator r) {
if (inflight.startIndex != request.getPrevLogIndex() + 1) {
LOG.warn(
"Replicator {} received invalid AppendEntriesResponse, in-flight startIndex={}, requset prevLogIndex={}, reset the replicator state and probe again.",
r, inflight.startIndex, request.getPrevLogIndex());
r.resetInflights();
r.state = State.Probe;
//unlock id in sendEmptyEntries
r.sendEmptyEntries(false);
return false;
}
//record metrics
if (request.getEntriesCount() > 0) {
r.nodeMetrics.recordLatency("replicate-entries", Utils.monotonicMs() - rpcSendTime);
r.nodeMetrics.recordSize("replicate-entries-count", request.getEntriesCount());
r.nodeMetrics.recordSize("replicate-entries-bytes", request.getData() != null ? request.getData().size()
: 0);
}

final boolean isLogDebugEnabled = LOG.isDebugEnabled();
StringBuilder sb = null;
if (isLogDebugEnabled) {
sb = new StringBuilder("Node "). //
append(r.options.getGroupId()).append(":").append(r.options.getServerId()). //
append(" received AppendEntriesResponse from "). //
append(r.options.getPeerId()). //
append(" prevLogIndex=").append(request.getPrevLogIndex()). //
append(" prevLogTerm=").append(request.getPrevLogTerm()). //
append(" count=").append(request.getEntriesCount());
}
if (!status.isOk()) {
// If the follower crashes, any RPC to the follower fails immediately,
// so we need to block the follower for a while instead of looping until
// it comes back or be removed
// dummy_id is unlock in block
if (isLogDebugEnabled) {
sb.append(" fail, sleep.");
LOG.debug(sb.toString());
}
if (++r.consecutiveErrorTimes % 10 == 0) {
LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(),
r.consecutiveErrorTimes, status);
}
r.resetInflights();
r.state = State.Probe;
//unlock in in block
r.block(startTimeMs, status.getCode());
return false;
}
r.consecutiveErrorTimes = 0;
if (!response.getSuccess()) {
// Leader 的切换,表明可能出现过一次网络分区,从新跟随新的 Leader
if (response.getTerm() > r.options.getTerm()) {
if (isLogDebugEnabled) {
sb.append(" fail, greater term ").append(response.getTerm()).append(" expect term ")
.append(r.options.getTerm());
LOG.debug(sb.toString());
}
// 获取当前本节点的表示对象——NodeImpl
final NodeImpl node = r.options.getNode();
r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true);
r.destroy();
// 调整自己的 term 任期值
node.increaseTermTo(response.getTerm(), new Status(RaftError.EHIGHERTERMRESPONSE,
"Leader receives higher term heartbeat_response from peer:%s", r.options.getPeerId()));
return false;
}
if (isLogDebugEnabled) {
sb.append(" fail, find nextIndex remote lastLogIndex ").append(response.getLastLogIndex())
.append(" local nextIndex ").append(r.nextIndex);
LOG.debug(sb.toString());
}
if (rpcSendTime > r.lastRpcSendTimestamp) {
r.lastRpcSendTimestamp = rpcSendTime;
}
//Fail, reset the state to try again from nextIndex.
r.resetInflights();
// prev_log_index and prev_log_term doesn't match
if (response.getLastLogIndex() + 1 < r.nextIndex) {
LOG.debug("LastLogIndex at peer={} is {}", r.options.getPeerId(), response.getLastLogIndex());
// The peer contains less logs than leader
r.nextIndex = response.getLastLogIndex() + 1;
} else {
// The peer contains logs from old term which should be truncated,
// decrease _last_log_at_peer by one to test the right index to keep
if (r.nextIndex > 1) {
LOG.debug("logIndex={} dismatch", r.nextIndex);
r.nextIndex--;
} else {
LOG.error("Peer={} declares that log at index=0 doesn't match, which is not supposed to happen",
r.options.getPeerId());
}
}
// dummy_id is unlock in _send_heartbeat
r.sendEmptyEntries(false);
return false;
}
if (isLogDebugEnabled) {
sb.append(", success");
LOG.debug(sb.toString());
}
// success
if (response.getTerm() != r.options.getTerm()) {
r.resetInflights();
r.state = State.Probe;
LOG.error("Fail, response term {} dismatch, expect term {}", response.getTerm(), r.options.getTerm());
id.unlock();
return false;
}
if (rpcSendTime > r.lastRpcSendTimestamp) {
r.lastRpcSendTimestamp = rpcSendTime;
}
// 本次提交的日志数量
final int entriesSize = request.getEntriesCount();
if (entriesSize > 0) {
// 节点确认提交
r.options.getBallotBox().commitAt(r.nextIndex, r.nextIndex + entriesSize - 1, r.options.getPeerId());
if (LOG.isDebugEnabled()) {
LOG.debug("Replicated logs in [{}, {}] to peer {}", r.nextIndex, r.nextIndex + entriesSize - 1,
r.options.getPeerId());
}
} else {
//The request is probe request, change the state into Replicate.
r.state = State.Replicate;
}
r.nextIndex += entriesSize;
r.hasSucceeded = true;
r.notifyOnCaughtUp(0, false);
// dummy_id is unlock in _send_entries
if (r.timeoutNowIndex > 0 && r.timeoutNowIndex < r.nextIndex) {
r.sendTimeoutNow(false, false);
}
return true;
}

然后会触发NodeImpl中的BallotBox成员的commitAt方法来确认本次日志同步的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public boolean commitAt(long firstLogIndex, long lastLogIndex, PeerId peer) {
//TODO use lock-free algorithm here?
final long stamp = stampedLock.writeLock();
long lastCommittedIndex = 0;
try {
if (pendingIndex == 0) {
return false;
}
if (lastLogIndex < pendingIndex) {
return true;
}

if (lastLogIndex >= pendingIndex + pendingMetaQueue.size()) {
throw new ArrayIndexOutOfBoundsException();
}

// 日志开始位置
final long startAt = Math.max(pendingIndex, firstLogIndex);
// 位置指针
Ballot.PosHint hint = new Ballot.PosHint();
for (long logIndex = startAt; logIndex <= lastLogIndex; logIndex++) {
final Ballot bl = this.pendingMetaQueue.get((int) (logIndex - pendingIndex));
// 进行投票统计
hint = bl.grant(peer, hint);
// 如果半数同意了,则设置最新的日志索引条目
if (bl.isGranted()) {
lastCommittedIndex = logIndex;
}
}
if (lastCommittedIndex == 0) {
return true;
}
// When removing a peer off the raft group which contains even number of
// peers, the quorum would decrease by 1, e.g. 3 of 4 changes to 2 of 3. In
// this case, the log after removal may be committed before some previous
// logs, since we use the new configuration to deal the quorum of the
// removal request, we think it's safe to commit all the uncommitted
// previous logs, which is not well proved right now
pendingMetaQueue.removeRange(0, (int) (lastCommittedIndex - pendingIndex) + 1);
LOG.debug("Committed log fromIndex={}, toIndex={}.", pendingIndex, lastCommittedIndex);
pendingIndex = lastCommittedIndex + 1;
// 更新最新的日志提交位置索引
this.lastCommittedIndex = lastCommittedIndex;
} finally {
stampedLock.unlockWrite(stamp);
}
// 进行日志提交——交由状态会处理
this.waiter.onCommitted(lastCommittedIndex);
return true;
}