JRaft 新节点加入后,如何追上旧节点的数据

JRaft

JRaft是蚂蚁金服开源的一个Raft协议的实现

JRatf

新节点如何追上旧节点

官方wiki说明

当一个 raft 节点重启的时候,内存中的状态机的状态将会丢失,在启动过程中将重放日志存储中的所有日志,重建整个状态机实例。这就导致两个问题:

  • 如果任务提交比较频繁,比如消息中间件这个场景,那么会导致整个重建过程很长,启动缓慢。
  • 如果日志很多,节点需要存储所有的日志,这对存储是一个资源占用,不可持续。
  • 如果增加一个节点,新节点需要从 leader 获取所有的日志重放到状态机,这对 leader 和网络带宽都是不小的负担。

因此,通过引入 snapshot 机制来解决这 3 个问题,所谓 snapshot 就是为当前状态机的最新状态打一个”镜像“单独保存,在保存成功后,在这个时刻之前的日志就可以删除,减少了日志存储占用;启动的时候,可以直接加载最新的 snapshot 镜像,然后重放在此之后的日志即可,如果 snapshot 间隔合理,那么整个重放过程会比较快,加快了启动过程。最后,新节点的加入,可以先从 leader 拷贝最新的 snapshot 安装到本地状态机,然后只要拷贝后续的日志即可,可以快速跟上整个 raft group 的进度。

代码浅析

Leader节点发送快照

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
private boolean sendEntries(final long nextSendingIndex) {
final AppendEntriesRequest.Builder rb = AppendEntriesRequest.newBuilder();
// 进行判断,Leader节点在logIndex为{nextSendingIndex - 1}时是否日志做过了snaphot
if (!fillCommonFields(rb, nextSendingIndex - 1, false)) {
// unlock id in installSnapshot
// 发起安装快照的请求
installSnapshot();
return false;
}

ByteBufferCollector dataBuf = null;
final int maxEntriesSize = this.raftOptions.getMaxEntriesSize();
final RecyclableByteBufferList byteBufList = RecyclableByteBufferList.newInstance();
try {
// Leader将自己的日志发送给Follower节点
for (int i = 0; i < maxEntriesSize; i++) {
final RaftOutter.EntryMeta.Builder emb = RaftOutter.EntryMeta.newBuilder();
if (!prepareEntry(nextSendingIndex, i, emb, byteBufList)) {
break;
}
rb.addEntries(emb.build());
}
if (rb.getEntriesCount() == 0) {
// 如果出现日志断节,则直接发送快照
if (nextSendingIndex < this.options.getLogManager().getFirstLogIndex()) {
installSnapshot();
return false;
}
// _id is unlock in _wait_more
waitMoreEntries(nextSendingIndex);
return false;
}
if (byteBufList.getCapacity() > 0) {
dataBuf = ByteBufferCollector.allocateByRecyclers(byteBufList.getCapacity());
for (final ByteBuffer b : byteBufList) {
dataBuf.put(b);
}
final ByteBuffer buf = dataBuf.getBuffer();
buf.flip();
rb.setData(ZeroByteStringHelper.wrap(buf));
}
} finally {
// 回收对象到对象池
RecycleUtil.recycle(byteBufList);
}

// 构建 AppendEntries RPC 的请求体信息
final AppendEntriesRequest request = rb.build();
if (LOG.isDebugEnabled()) {
LOG.debug("Node {} send AppendEntriesRequest to {} term {} lastCommittedIndex {} prevLogIndex {} prevLogTerm {} logIndex {} count {}", this.options.getNode().getNodeId(), this.options.getPeerId(), this.options.getTerm(), request.getCommittedIndex(), request.getPrevLogIndex(), request.getPrevLogTerm(), nextSendingIndex, request.getEntriesCount());
}
// 设置该 Node 节点的状态标识
this.statInfo.runningState = RunningState.APPENDING_ENTRIES;
// 设置当前节点的 LogIndex信息
this.statInfo.firstLogIndex = rb.getPrevLogIndex() + 1;
this.statInfo.lastLogIndex = rb.getPrevLogIndex() + rb.getEntriesCount();

final Recyclable recyclable = dataBuf;
final int v = this.version;
final long monotonicSendTimeMs = Utils.monotonicMs();
final int seq = getAndIncrementReqSeq();
// 发送 RPC 请求给 Follower 节点
final Future<Message> rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(),
request, -1, new RpcResponseClosureAdapter<AppendEntriesResponse>() {

@Override
public void run(final Status status) {
RecycleUtil.recycle(recyclable);
onRpcReturned(Replicator.this.id, RequestType.AppendEntries, status, request, getResponse(), seq, v, monotonicSendTimeMs);
}

});
addInflight(RequestType.AppendEntries, nextSendingIndex, request.getEntriesCount(), request.getData().size(), seq, rpcFuture);
return true;
}

装填请求参数信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private boolean fillCommonFields(final AppendEntriesRequest.Builder rb, long prevLogIndex, final boolean isHeartbeat) {
final long prevLogTerm = this.options.getLogManager().getTerm(prevLogIndex);
// 如果出现 preLogTerm == 0 的情况,说明执行过 snapshot 操作
if (prevLogTerm == 0 && prevLogIndex != 0) {
if (!isHeartbeat) {
Requires.requireTrue(prevLogIndex < this.options.getLogManager().getFirstLogIndex());
LOG.debug("logIndex={} was compacted", prevLogIndex);
return false;
} else {
// The log at prev_log_index has been compacted, which indicates
// we is or is going to install snapshot to the follower. So we let
// both prev_log_index and prev_log_term be 0 in the heartbeat
// request so that follower would do nothing besides updating its
// leader timestamp.
prevLogIndex = 0;
}
}
// 设置请求属性信息
rb.setTerm(this.options.getTerm());
rb.setGroupId(this.options.getGroupId());
rb.setServerId(this.options.getServerId().toString());
rb.setPeerId(this.options.getPeerId().toString());
rb.setPrevLogIndex(prevLogIndex);
rb.setPrevLogTerm(prevLogTerm);
rb.setCommittedIndex(this.options.getBallotBox().getLastCommittedIndex());
return true;
}

接着就是Leader节点将发起AppendEntries RPC请求了

Follower节点接收快照

NodeImpl重要实现方法——InstallSnapshotRequestProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@Override
public Message handleInstallSnapshot(final InstallSnapshotRequest request, final RpcRequestClosure done) {
// 如果快照安装执行器不存在,则抛出异常不支持快照操作
if (this.snapshotExecutor == null) {
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Not supported snapshot");
}
final PeerId serverId = new PeerId();
// 根据请求携带的 serverId 序列化 PeerId
if (!serverId.parse(request.getServerId())) {
LOG.warn("Node {} ignore InstallSnapshotRequest from {} bad server id.", getNodeId(), request.getServerId());
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse serverId failed: %s", request.getServerId());
}

this.writeLock.lock();
try {
// 判断当前节点的状态
if (!this.state.isActive()) {
LOG.warn("Node {} ignore InstallSnapshotRequest as it is not in active state {}.", getNodeId(), this.state);
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s:%s is not in active state, state %s.", this.groupId, this.serverId, this.state.name());
}

// 判断 request 携带的 term 比当前节点的 trem,比较 term 的合法性
if (request.getTerm() < this.currTerm) {
LOG.warn("Node {} ignore stale InstallSnapshotRequest from {}, term={}, currTerm={}.", getNodeId(), request.getPeerId(), request.getTerm(), this.currTerm);
return InstallSnapshotResponse.newBuilder() //
.setTerm(this.currTerm) //
.setSuccess(false) //
.build();
}

// 做相关的检查处理
// Leader的设置、中断之前的快照下载任务...
checkStepDown(request.getTerm(), serverId);

if (!serverId.equals(this.leaderId)) {
LOG.error("Another peer {} declares that it is the leader at term {} which was occupied by leader {}.", serverId, this.currTerm, this.leaderId);
// Increase the term by 1 and make both leaders step down to minimize the
// loss of split brain
stepDown(request.getTerm() + 1, false, new Status(RaftError.ELEADERCONFLICT, "More than one leader in the same term."));
return InstallSnapshotResponse.newBuilder() //
.setTerm(request.getTerm() + 1) //
.setSuccess(false) //
.build();
}

} finally {
this.writeLock.unlock();
}
final long startMs = Utils.monotonicMs();
try {
if (LOG.isInfoEnabled()) {
LOG.info(
"Node {} received InstallSnapshotRequest from {}, lastIncludedLogIndex={}, lastIncludedLogTerm={}, lastLogId={}.",
getNodeId(), request.getServerId(), request.getMeta().getLastIncludedIndex(), request.getMeta()
.getLastIncludedTerm(), this.logManager.getLastLogId(false));
}
// 执行快照安装
this.snapshotExecutor.installSnapshot(request, InstallSnapshotResponse.newBuilder(), done);
return null;
} finally {
this.metrics.recordLatency("install-snapshot", Utils.monotonicMs() - startMs);
}
}

下载快照文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public void installSnapshot(final InstallSnapshotRequest request, final InstallSnapshotResponse.Builder response, final RpcRequestClosure done) {
final SnapshotMeta meta = request.getMeta();
// 创建一个下载快照的任务对象
final DownloadingSnapshot ds = new DownloadingSnapshot(request, response, done);
//DON'T access request, response, and done after this point
//as the retry snapshot will replace this one.
// 将下载快照任务进行注册,同时会设置 this.curCopier 为 Future对象
if (!registerDownloadingSnapshot(ds)) {
LOG.warn("Fail to register downloading snapshot");
// This RPC will be responded by the previous session
return;
}
Requires.requireNonNull(this.curCopier, "curCopier");
try {
// 阻塞等待 copy 任务完成
this.curCopier.join();
} catch (final InterruptedException e) {
// 中断补偿,如果 curCopier 任务被中断过,表明有更新的 snapshot 在接受了,旧的 snapshot 被停止下载
Thread.currentThread().interrupt();
LOG.warn("Install snapshot copy job was canceled.");
return;
}
// 装载下载好的 snapshot 文件
loadDownloadingSnapshot(ds, meta);
}

加载下载好的 snapshot 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void loadDownloadingSnapshot(final DownloadingSnapshot ds, final SnapshotMeta meta) {
SnapshotReader reader;
this.lock.lock();
try {
// 获取快照任务的结果,如果不相等则表示新的 snapshot 在接收
if (ds != this.downloadingSnapshot.get()) {
//It is interrupted and response by other request,just return
return;
}
Requires.requireNonNull(this.curCopier, "curCopier");
reader = this.curCopier.getReader();
if (!this.curCopier.isOk()) {
if (this.curCopier.getCode() == RaftError.EIO.getNumber()) {
reportError(this.curCopier.getCode(), this.curCopier.getErrorMsg());
}
Utils.closeQuietly(reader);
ds.done.run(this.curCopier);
Utils.closeQuietly(this.curCopier);
this.curCopier = null;
this.downloadingSnapshot.set(null);
this.runningJobs.countDown();
return;
}
Utils.closeQuietly(this.curCopier);
this.curCopier = null;
if (reader == null || !reader.isOk()) {
Utils.closeQuietly(reader);
this.downloadingSnapshot.set(null);
ds.done.sendResponse(RpcResponseFactory.newResponse(RaftError.EINTERNAL, "Fail to copy snapshot from %s", ds.request.getUri()));
this.runningJobs.countDown();
return;
}
this.loadingSnapshot = true;
this.loadingSnapshotMeta = meta;
} finally {
this.lock.unlock();
}
// 下载 snapshot 成功,进入状态机进行 snapshot 安装
final InstallSnapshotDone installSnapshotDone = new InstallSnapshotDone(reader);
// 送入状态机执行快照安装事件
if (!this.fsmCaller.onSnapshotLoad(installSnapshotDone)) {
LOG.warn("Fail to call fsm onSnapshotLoad");
installSnapshotDone.run(new Status(RaftError.EHOSTDOWN, "This raft node is down"));
}
}

发布、接收快照安装事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public boolean onSnapshotLoad(final LoadSnapshotClosure done) {
// 这里是用了 disruptor 队列来异步化 snapshot 快照事件操作
return enqueueTask((task, sequence) -> {
task.type = TaskType.SNAPSHOT_LOAD;
task.done = done;
});
}

// 应用事件处理器
private class ApplyTaskHandler implements EventHandler<ApplyTask> {
// max committed index in current batch, reset to -1 every batch
private long maxCommittedIndex = -1;

@Override
public void onEvent(final ApplyTask event, final long sequence, final boolean endOfBatch) throws Exception {
// 每次 apply task 时都更新最大提交索引记录
this.maxCommittedIndex = runApplyTask(event, this.maxCommittedIndex, endOfBatch);
}
}

应用到状态机处理 snapshot load task 任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private long runApplyTask(final ApplyTask task, long maxCommittedIndex, final boolean endOfBatch) {
CountDownLatch shutdown = null;
// 判断任务类型
if (task.type == TaskType.COMMITTED) {
if (task.committedIndex > maxCommittedIndex) {
maxCommittedIndex = task.committedIndex;
}
} else {
// 快照任务
if (maxCommittedIndex >= 0) {
this.currTask = TaskType.COMMITTED;
// 回放 snapshot 中不存在的 log, apply 到状态机
doCommitted(maxCommittedIndex);
// 重置最大已提交索引,避免日志被重复回放
maxCommittedIndex = -1L; // reset maxCommittedIndex
}
...
case SNAPSHOT_LOAD:
this.currTask = TaskType.SNAPSHOT_LOAD;
// 预先判断状态机的状态信息
if (passByStatus(task.done)) {
// 执行 snapshot load
doSnapshotLoad((LoadSnapshotClosure) task.done);
}
break;
...
}
try {
// 同上,做日志回放的动作
if (endOfBatch && maxCommittedIndex >= 0) {
this.currTask = TaskType.COMMITTED;
doCommitted(maxCommittedIndex);
maxCommittedIndex = -1L; // reset maxCommittedIndex
}
this.currTask = TaskType.IDLE;
return maxCommittedIndex;
} finally {
if (shutdown != null) {
shutdown.countDown();
}
}
}

执行 snapshot load 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private void doSnapshotLoad(final LoadSnapshotClosure done) {
Requires.requireNonNull(done, "LoadSnapshotClosure is null");
final SnapshotReader reader = done.start();
if (reader == null) {
done.run(new Status(RaftError.EINVAL, "open SnapshotReader failed"));
return;
}
// 读取 snapshot 元数据信息
final RaftOutter.SnapshotMeta meta = reader.load();
if (meta == null) {
done.run(new Status(RaftError.EINVAL, "SnapshotReader load meta failed"));
if (reader.getRaftError() == RaftError.EIO) {
final RaftException err = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT, RaftError.EIO, "Fail to load snapshot meta");
setError(err);
}
return;
}
// 获取最后一次 appliedId 信息
final LogId lastAppliedId = new LogId(this.lastAppliedIndex.get(), this.lastAppliedTerm);
// 获取快照的 appliedId 信息
final LogId snapshotId = new LogId(meta.getLastIncludedIndex(), meta.getLastIncludedTerm());
// 两个 id 信息进行比较,如果 apply id 大于 snapshot id,则该snapshot不合法,不会被使用
if (lastAppliedId.compareTo(snapshotId) > 0) {
done.run(new Status(RaftError.ESTALE, "Loading a stale snapshot last_applied_index=%d last_applied_term=%d snapshot_index=%d snapshot_term=%d", lastAppliedId.getIndex(), lastAppliedId.getTerm(), snapshotId.getIndex(), snapshotId.getTerm()));
return;
}
// 由用户实现的状态机执行 snapshot load
if (!this.fsm.onSnapshotLoad(reader)) {
done.run(new Status(-1, "StateMachine onSnapshotLoad failed"));
final RaftException e = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_STATE_MACHINE, RaftError.ESTATEMACHINE, "StateMachine onSnapshotLoad failed");
setError(e);
return;
}
if (meta.getOldPeersCount() == 0) {
// Joint stage is not supposed to be noticeable by end users.
final Configuration conf = new Configuration();
for (int i = 0, size = meta.getPeersCount(); i < size; i++) {
final PeerId peer = new PeerId();
Requires.requireTrue(peer.parse(meta.getPeers(i)), "Parse peer failed");
conf.addPeer(peer);
}
this.fsm.onConfigurationCommitted(conf);
}
// 设置最新的提交索引,以便日志回放时对不存在日志执行 apply 操作
this.lastAppliedIndex.set(meta.getLastIncludedIndex());
// 设置新的节点所在任期
this.lastAppliedTerm = meta.getLastIncludedTerm();
done.run(Status.OK());
}

snapshot load操作完成后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private void onSnapshotLoadDone(final Status st) {
DownloadingSnapshot m;
boolean doUnlock = true;
this.lock.lock();
try {
Requires.requireTrue(this.loadingSnapshot, "Not loading snapshot");
m = this.downloadingSnapshot.get();
if (st.isOk()) {
// 更改最新一次快照的index信息
this.lastSnapshotIndex = this.loadingSnapshotMeta.getLastIncludedIndex();
// 记录最新一次快照的任期
this.lastSnapshotTerm = this.loadingSnapshotMeta.getLastIncludedTerm();
doUnlock = false;
this.lock.unlock();
// logManager 设置 snapshot 元数据信息
this.logManager.setSnapshot(this.loadingSnapshotMeta); //should be out of lock
doUnlock = true;
this.lock.lock();
}
final StringBuilder sb = new StringBuilder();
if (this.node != null) {
sb.append("Node ").append(this.node.getNodeId()).append(" ");
}
sb.append("onSnapshotLoadDone, ").append(this.loadingSnapshotMeta);
LOG.info(sb.toString());
doUnlock = false;
this.lock.unlock();
if (this.node != null) {
// 在安装完 snapshot 后更新整个节点的配置信息
this.node.updateConfigurationAfterInstallingSnapshot();
}
doUnlock = true;
this.lock.lock();
// 快照任务状态设置结束
this.loadingSnapshot = false;
// 快照下载任务置为null,标识任务成功
this.downloadingSnapshot.set(null);

} finally {
if (doUnlock) {
this.lock.unlock();
}
}
if (m != null) {
// Respond RPC
if (!st.isOk()) {
m.done.run(st);
} else {
m.responseBuilder.setSuccess(true);
m.done.sendResponse(m.responseBuilder.build());
}
}
this.runningJobs.countDown();
}

实现日志回放操作(如果接受快照的节点,存在日志缺省的情况,通过日志回放操作补全 logEntry)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
private void doCommitted(final long committedIndex) {
if (!this.error.getStatus().isOk()) {
return;
}
// 获取最后一次的 apply id
final long lastAppliedIndex = this.lastAppliedIndex.get();
// We can tolerate the disorder of committed_index
// 不存在日志缺省
if (lastAppliedIndex >= committedIndex) {
return;
}
final long startMs = Utils.monotonicMs();
try {
final List<Closure> closures = new ArrayList<>();
final List<TaskClosure> taskClosures = new ArrayList<>();
final long firstClosureIndex = this.closureQueue.popClosureUntil(committedIndex, closures, taskClosures);
// Calls TaskClosure#onCommitted if necessary
onTaskCommitted(taskClosures);

Requires.requireTrue(firstClosureIndex >= 0, "Invalid firstClosureIndex");
// 创建 LogEntry 迭代器,从 lastAppliedIndex 到 committedIndex之间缺省的日志会被重新回放
final IteratorImpl iterImpl = new IteratorImpl(this.fsm, this.logManager, closures, firstClosureIndex, lastAppliedIndex, committedIndex, this.applyingIndex);
// 是否可以继续
while (iterImpl.isGood()) {
final LogEntry logEntry = iterImpl.entry();
if (logEntry.getType() != EnumOutter.EntryType.ENTRY_TYPE_DATA) {
if (logEntry.getType() == EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
if (logEntry.getOldPeers() != null && !logEntry.getOldPeers().isEmpty()) {
// Joint stage is not supposed to be noticeable by end users.
this.fsm.onConfigurationCommitted(new Configuration(iterImpl.entry().getPeers()));
}
}
if (iterImpl.done() != null) {
// For other entries, we have nothing to do besides flush the
// pending tasks and run this closure to notify the caller that the
// entries before this one were successfully committed and applied.
iterImpl.done().run(Status.OK());
}
iterImpl.next();
continue;
}

// 用户态数据,转交给用户态的状态机去 apply LogEntry
// Apply data task to user state machine
doApplyTasks(iterImpl);
}

if (iterImpl.hasError()) {
setError(iterImpl.getError());
iterImpl.runTheRestClosureWithError();
}
final long lastIndex = iterImpl.getIndex() - 1;
final long lastTerm = this.logManager.getTerm(lastIndex);
final LogId lastAppliedId = new LogId(lastIndex, lastTerm);
// 重新设置最新 apply id 信息
this.lastAppliedIndex.set(committedIndex);
// 更新最新任期信息
this.lastAppliedTerm = lastTerm;
this.logManager.setAppliedId(lastAppliedId);
notifyLastAppliedIndexUpdated(committedIndex);
} finally {
this.nodeMetrics.recordLatency("fsm-commit", Utils.monotonicMs() - startMs);
}
}