JRaft 线性读原理分析

Raft 协议的读操作处理

Raft协议是一个强一致性协议。因此,在T1时刻发生的一个对于A数据的写操作通过Raft协议同步其他节点之后,在T2时刻发起的对A数据的读操作,一定能够读到被修改过的A的值。那么如果说按照传统的走Raft Log去实现读的话,其实是会对写操作请求有影响的,Raft协议本质是一个分布式日志复制状态机,所有的日志回放,最终都是在StateMachine里面单线程串行执行的。而实际上,读操作对应的Raft Log日志,无论它是串行还是并行,都不会对最终的数据有影响,因此,我们希望说,让Raft StateMachine尽可能的只处理涉及写操作的Raft Log,换句话说,就是我们在向Raft提交一个数据操作时,这个数据操作最好是对数据的写操作,而不是读操作。

既然说最好让StateMachine处理的Raft Log是数据的写操作,那么读请求该怎么处理?这里就要引出Raft针对读操作做出的优化了~

Follower读

因为Raft Log最终会同步到每个节点,每个节点在StateMachine内在顺序Apply每一个Raft Log,同时,在对Raft Logcommit以及apply的时候,每个节点内部会维护两个标识——lastCommitedId以及lasrAppliedId。Leader以及Follower,根据commitedId,可以知道当前FollowerLeader的已提交日志位点差距;通过appliedId,就可以知道FollowerLeader的已应用日志位点差距。而要实现Follower读,最最重要的的一个变量,就是这个appliedId

JRaft 怎么实现线性读的

使用JRaft发起线性读

1
Node.readIndex(final byte[] requestContext, final ReadIndexClosure done);

可以看到,使用JRaft的线性读能力就是这么简单,传入本次读操作的上下文requestContext以及当可以进行读操作时的回调函数ReadIndexClosure

内部怎么处理线性读的

当调用Node.readIndex的时候

1
2
3
4
5
6
7
8
9
@Override
public void readIndex(final byte[] requestContext, final ReadIndexClosure done) {
if (this.shutdownLatch != null) {
Utils.runClosureInThread(done, new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
throw new IllegalStateException("Node is shutting down");
}
Requires.requireNonNull(done, "Null closure");
this.readOnlyService.addRequest(requestContext, done);
}

如果当前节点没有被关闭的化,则会将当前的读请求封装成一个Event通过Disruptor队列进行发布出去,其代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) {
if (this.shutdownLatch != null) {
Utils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "Was stopped"));
throw new IllegalStateException("Service already shutdown.");
}
try {
// 这里创建一个 EventTranslator 用于发布 ReadIndexEvent
EventTranslator<ReadIndexEvent> translator = (event, sequence) -> {
event.done = closure;
event.requestContext = new Bytes(reqCtx);
event.startTime = Utils.monotonicMs();
};
int retryTimes = 0;
while (true) {
// 由于 Disruptor 内部是一个数组,采用 CAS 操作去判断是否可以塞入,如果不可以塞入,
// 则会返回一个 False,因此这里需要判断是否插入成功
if (this.readIndexQueue.tryPublishEvent(translator)) {
break;
} else {
// 如果超出重试次数,则这里回调 closeure 报告失败,由上层决定具体操作
retryTimes++;
if (retryTimes > MAX_ADD_REQUEST_RETRY_TIMES) {
Utils.runClosureInThread(closure,
new Status(RaftError.EBUSY, "Node is busy, has too many read-only requests."));
this.nodeMetrics.recordTimes("read-index-overload-times", 1);
LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", this.node.getNodeId());
return;
}
// 这里进行线程的让出 CPU 动作,这里之所以不使用 Thread.Sleep()是因为JDK9开始新增的
// onSpinWait其实现机制要比 Thread.Sleep的方式让出CPU要高效
ThreadHelper.onSpinWait();
}
}
} catch (final Exception e) {
Utils.runClosureInThread(closure, new Status(RaftError.EPERM, "Node is down."));
}
}

加入ReadIndexEvent发布成功,则会进入到如下流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private class ReadIndexEventHandler implements EventHandler<ReadIndexEvent> {
// task list for batch
private final List<ReadIndexEvent> events = new ArrayList<>(
ReadOnlyServiceImpl.this.raftOptions.getApplyBatch());

@Override
public void onEvent(final ReadIndexEvent newEvent, final long sequence, final boolean endOfBatch) throws Exception {
if (newEvent.shutdownLatch != null) {
executeReadIndexEvents(this.events);
this.events.clear();
newEvent.shutdownLatch.countDown();
return;
}

// 由于上层的事件发布可能存在批量发布的操作,因此这里会做一个任务攒批的操作
this.events.add(newEvent);
if (this.events.size() >= ReadOnlyServiceImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
// 执行 ReadIndexEvent
executeReadIndexEvents(this.events);
this.events.clear();
}
}
}

如果当前是批量事件发布的最后一个事件或者当前任务的暂存个数已经达到了最大的批处理数量,因此要开始处理ReadIndexEvent了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
if (events.isEmpty()) {
return;
}
// 构建线性读请求
final ReadIndexRequest.Builder rb = ReadIndexRequest.newBuilder() //
.setGroupId(this.node.getGroupId()) //
.setServerId(this.node.getServerId().toString());

final List<ReadIndexState> states = new ArrayList<>(events.size());

// 进行请求体构建
for (final ReadIndexEvent event : events) {
rb.addEntries(ZeroByteStringHelper.wrap(event.requestContext.get()));
states.add(new ReadIndexState(event.requestContext, event.done, event.startTime));
}
final ReadIndexRequest request = rb.build();

// RPC调用处理 ReadIndexRequest
this.node.handleReadIndexRequest(request, new ReadIndexResponseClosure(states, request));
}

接着,这里会调用Node.handleReadIndexRequest去开始真正处理读请求操作了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void handleReadIndexRequest(final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> done) {
final long startMs = Utils.monotonicMs();
this.readLock.lock();
try {
switch (this.state) {
case STATE_LEADER:
// 如果当前节点是 Leader 节点,则走 Leader 读的方式
readLeader(request, ReadIndexResponse.newBuilder(), done);
break;
case STATE_FOLLOWER:
// 如果当前节点是 Follower 节点,则走 Follower 读的方式(重点)
readFollower(request, done);
break;
case STATE_TRANSFERRING:
// 当前正在发生选主过程,无法满足线性读的要求
done.run(new Status(RaftError.EBUSY, "Is transferring leadership."));
break;
default:
done.run(new Status(RaftError.EPERM, "Invalid state for readIndex: %s.", this.state));
break;
}
} finally {
this.readLock.unlock();
this.metrics.recordLatency("handle-read-index", Utils.monotonicMs() - startMs);
this.metrics.recordSize("handle-read-index-entries", request.getEntriesCount());
}
}

可以看到,当处理ReadIndexEvent的时候,会根据当前节点的状态进行判断怎么处理这个线性读的请求,首先来看下,如果当前节点是Leader节点的怎么处理的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
private void readLeader(final ReadIndexRequest request, final ReadIndexResponse.Builder respBuilder,
final RpcResponseClosure<ReadIndexResponse> closure) {
final int quorum = getQuorum();
// 如果当前过半的节点数量小于等1,表明目前集群成员只有一个节点,快速结束读请求,直接返回成功
if (quorum <= 1) {
// Only one peer, fast path.
respBuilder.setSuccess(true) //
.setIndex(this.ballotBox.getLastCommittedIndex());
closure.setResponse(respBuilder.build());
closure.run(Status.OK());
return;
}

// 获取当前最新的 committedIndex 位点信息
final long lastCommittedIndex = this.ballotBox.getLastCommittedIndex();
// 根据当前的 lastCommittedIndex 获取对应的任期信息,判断是否等自己的任期,如果不想等,则说明
// 自己在这个任期之内还没有 committed 任何日志
if (this.logManager.getTerm(lastCommittedIndex) != this.currTerm) {
// Reject read only request when this leader has not committed any log entry at its term
closure
.run(new Status(
RaftError.EAGAIN,
"ReadIndex request rejected because leader has not committed any log entry at its term, logIndex=%d, currTerm=%d.",
lastCommittedIndex, this.currTerm));
return;
}
respBuilder.setIndex(lastCommittedIndex);

// 如果是从 Follower 节点或者 Learner 节点发起的读请求,则需要判断他们在不在当前的Raft Conf里面
if (request.getPeerId() != null) {
// request from follower or learner, check if the follower/learner is in current conf.
final PeerId peer = new PeerId();
peer.parse(request.getServerId());
if (!this.conf.contains(peer) && !this.conf.containsLearner(peer)) {
closure
.run(new Status(RaftError.EPERM, "Peer %s is not in current configuration: %s.", peer, this.conf));
return;
}
}

// 这里需要获取当前线性读的设置,是采用心跳确认机制还是使用Leader租约机制确认
ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions();
if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && !isLeaderLeaseValid()) {
// If leader lease timeout, we must change option to ReadOnlySafe
readOnlyOpt = ReadOnlyOption.ReadOnlySafe;
}

switch (readOnlyOpt) {
// 线性安全读的模式,需要向所有的成员发送心跳信息
case ReadOnlySafe:
final List<PeerId> peers = this.conf.getConf().getPeers();
Requires.requireTrue(peers != null && !peers.isEmpty(), "Empty peers");
// 这个 Closure 内部会有ack机制,如果过半的节点确认自己是 Leader,则触发回调允许线性读的发生
final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure(closure,
respBuilder, quorum, peers.size());
// Send heartbeat requests to followers
for (final PeerId peer : peers) {
if (peer.equals(this.serverId)) {
continue;
}
this.replicatorGroup.sendHeartbeat(peer, heartbeatDone);
}
break;
// 这里直接使用 Leader 的租约机制来快速判断自己是否还是Leader,该模式是最快的,但是也是对时钟的要求最高的,这里需要确保每个节点的时钟都一致。
case ReadOnlyLeaseBased:
// Responses to followers and local node.
respBuilder.setSuccess(true);
closure.setResponse(respBuilder.build());
closure.run(Status.OK());
break;
}
}

可以看出来,当ReadIndexRequest被Leader处理的时候,会去通过一系列的规则去判断当前的读请求能否去执行,并且需要通过心跳机制或者Leader租约机制进行判断当前的Leader是否真的是Leader。

那么,如果读请求是Follower发起的,那么是怎么处理的呢?

1
2
3
4
5
6
7
8
9
10
11
12
private void readFollower(final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> closure) {
if (this.leaderId == null || this.leaderId.isEmpty()) {
closure.run(new Status(RaftError.EPERM, "No leader at term %d.", this.currTerm));
return;
}
// send request to leader.
final ReadIndexRequest newRequest = ReadIndexRequest.newBuilder() //
.mergeFrom(request) //
.setPeerId(this.leaderId.toString()) //
.build();
this.rpcService.readIndex(this.leaderId.getEndpoint(), newRequest, -1, closure);
}

这里就要看ReadIndexResponseClosure这个类了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class ReadIndexResponseClosure extends RpcResponseClosureAdapter<ReadIndexResponse> {

final List<ReadIndexState> states;
final ReadIndexRequest request;

public ReadIndexResponseClosure(final List<ReadIndexState> states, final ReadIndexRequest request) {
super();
this.states = states;
this.request = request;
}

/**
* 这里会等待向Leader发送的读请求的返回。
* 注意,由于在 Follower 发起的读请求,最终需要从 Leader 获取 Leader 当前最新的 commited index
* 位点信息,并且需要等待自己的状态机回放日志到该位点,因此,这一段的时间是不可知的,有可能需要等待很长的
* 时间,因此建议使用者最好加上读取超时时间,然后采用 Leader 读作为兜底机制。尽可能的让读取数据操作
* 成功。
*/
@Override
public void run(final Status status) {
// 如果读失败,通知所有读请求的回调告知失败。
if (!status.isOk()) {
notifyFail(status);
return;
}
final ReadIndexResponse readIndexResponse = getResponse();
if (!readIndexResponse.getSuccess()) {
notifyFail(new Status(-1, "Fail to run ReadIndex task, maybe the leader stepped down."));
return;
}
// 获取当前Leader返回的 index 位点信息
final ReadIndexStatus readIndexStatus = new ReadIndexStatus(this.states, this.request,
readIndexResponse.getIndex());
for (final ReadIndexState state : this.states) {
// Records current commit log index.
state.setIndex(readIndexResponse.getIndex());
}

boolean doUnlock = true;
ReadOnlyServiceImpl.this.lock.lock();
try {
// 判断当前 Follower 的状态机是否已经 apply 到了 Leader 传回来的 index 位点位置
if (readIndexStatus.isApplied(ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex())) {
// 如果当前的状态机回放日志的位点已经达到或者超过了 Leader 回传的 Index,
// 则通知所有线性读回调函数可以正确执行了。
ReadOnlyServiceImpl.this.lock.unlock();
doUnlock = false;
notifySuccess(readIndexStatus);
} else {
// 如果还没有回放到对应的日志位点的话,需要将当前的信息进行暂存在一个队列中。
ReadOnlyServiceImpl.this.pendingNotifyStatus
.computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //
.add(readIndexStatus);
}
} finally {
if (doUnlock) {
ReadOnlyServiceImpl.this.lock.unlock();
}
}
}

private void notifyFail(final Status status) {
final long nowMs = Utils.monotonicMs();
// 通知所有在等待的线性读请求,可以进行读请求的执行了。
for (final ReadIndexState state : this.states) {
ReadOnlyServiceImpl.this.nodeMetrics.recordLatency("read-index", nowMs - state.getStartTimeMs());
final ReadIndexClosure done = state.getDone();
if (done != null) {
final Bytes reqCtx = state.getRequestContext();
done.run(status, ReadIndexClosure.INVALID_LOG_INDEX, reqCtx != null ? reqCtx.get() : null);
}
}
}
}

以上就是JRaft实现的线性读机制。接下来我贴一下我在nacos-1.3.0-betaJRaft读操作的设计。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
CompletableFuture<Response> get(final GetRequest request) {
final String group = request.getGroup();
CompletableFuture<Response> future = new CompletableFuture<>();
final RaftGroupTuple tuple = findTupleByGroup(group);
if (Objects.isNull(tuple)) {
future.completeExceptionally(new NoSuchRaftGroupException(group));
return future;
}
final Node node = tuple.node;
final LogProcessor processor = tuple.processor;
try {
node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
if (status.isOk()) {
try {
Response response = processor.onRequest(request);
future.complete(response);
}
catch (Throwable t) {
MetricsMonitor.raftReadIndexFailed();
future.completeExceptionally(new ConsistencyException("The conformance protocol is temporarily unavailable for reading", t));
}
return;
}
MetricsMonitor.raftReadIndexFailed();
Loggers.RAFT.error("ReadIndex has error : {}", status.getErrorMsg());
future.completeExceptionally(
new ConsistencyException("The conformance protocol is temporarily unavailable for reading, " + status.getErrorMsg()));
}
});
return future;
}
catch (Throwable e) {
MetricsMonitor.raftReadFromLeader();
Loggers.RAFT.warn("Raft linear read failed, go to Leader read logic : {}", e.toString());
// run raft read
readFromLeader(request, future);
return future;
}
}

public void readFromLeader(final GetRequest request,final CompletableFuture<Response> future) {
commit(request.getGroup(), request, future)
.whenComplete(new BiConsumer<Response, Throwable>() {
@Override
public void accept(Response response, Throwable throwable) {
if (Objects.nonNull(throwable)) {
future.completeExceptionally(new ConsistencyException("The conformance protocol is temporarily unavailable for reading", throwable));
return;
}
if (response.getSuccess()) {
future.complete(response);
} else {
future.completeExceptionally(
new ConsistencyException("The conformance protocol is temporarily unavailable for reading, " + response.getErrMsg()));
}
}
});
}