Raft 协议的读操作处理 Raft
协议是一个强一致性协议。因此,在T1
时刻发生的一个对于A数据的写操作通过Raft
协议同步其他节点之后,在T2
时刻发起的对A数据的读操作,一定能够读到被修改过的A的值。那么如果说按照传统的走Raft Log
去实现读的话,其实是会对写操作请求有影响的,Raft
协议本质是一个分布式日志复制状态机,所有的日志回放,最终都是在StateMachine
里面单线程串行执行的。而实际上,读操作对应的Raft Log
日志,无论它是串行还是并行,都不会对最终的数据有影响,因此,我们希望说,让Raft StateMachine
尽可能的只处理涉及写操作的Raft Log
,换句话说,就是我们在向Raft
提交一个数据操作时,这个数据操作最好是对数据的写操作,而不是读操作。
既然说最好让StateMachine
处理的Raft Log
是数据的写操作,那么读请求该怎么处理?这里就要引出Raft
针对读操作做出的优化了~
Follower读 因为Raft Log
最终会同步到每个节点,每个节点在StateMachine
内在顺序Apply
每一个Raft Log
,同时,在对Raft Log
做commit
以及apply
的时候,每个节点内部会维护两个标识——lastCommitedId
以及lasrAppliedId
。Leader以及Follower,根据commitedId
,可以知道当前Follower
和Leader
的已提交日志位点差距;通过appliedId
,就可以知道Follower
和Leader
的已应用日志位点差距。而要实现Follower
读,最最重要的的一个变量,就是这个appliedId
。
JRaft 怎么实现线性读的 使用JRaft发起线性读 1 Node.readIndex(final byte [] requestContext, final ReadIndexClosure done);
可以看到,使用JRaft
的线性读能力就是这么简单,传入本次读操作的上下文requestContext
以及当可以进行读操作时的回调函数ReadIndexClosure
。
内部怎么处理线性读的 当调用Node.readIndex
的时候
1 2 3 4 5 6 7 8 9 @Override public void readIndex (final byte [] requestContext, final ReadIndexClosure done) { if (this .shutdownLatch != null ) { Utils.runClosureInThread(done, new Status(RaftError.ENODESHUTDOWN, "Node is shutting down." )); throw new IllegalStateException("Node is shutting down" ); } Requires.requireNonNull(done, "Null closure" ); this .readOnlyService.addRequest(requestContext, done); }
如果当前节点没有被关闭的化,则会将当前的读请求封装成一个Event
通过Disruptor
队列进行发布出去,其代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Override public void addRequest (final byte [] reqCtx, final ReadIndexClosure closure) { if (this .shutdownLatch != null ) { Utils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "Was stopped" )); throw new IllegalStateException("Service already shutdown." ); } try { EventTranslator<ReadIndexEvent> translator = (event, sequence) -> { event.done = closure; event.requestContext = new Bytes(reqCtx); event.startTime = Utils.monotonicMs(); }; int retryTimes = 0 ; while (true ) { if (this .readIndexQueue.tryPublishEvent(translator)) { break ; } else { retryTimes++; if (retryTimes > MAX_ADD_REQUEST_RETRY_TIMES) { Utils.runClosureInThread(closure, new Status(RaftError.EBUSY, "Node is busy, has too many read-only requests." )); this .nodeMetrics.recordTimes("read-index-overload-times" , 1 ); LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload." , this .node.getNodeId()); return ; } ThreadHelper.onSpinWait(); } } } catch (final Exception e) { Utils.runClosureInThread(closure, new Status(RaftError.EPERM, "Node is down." )); } }
加入ReadIndexEvent
发布成功,则会进入到如下流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private class ReadIndexEventHandler implements EventHandler <ReadIndexEvent > { private final List<ReadIndexEvent> events = new ArrayList<>( ReadOnlyServiceImpl.this .raftOptions.getApplyBatch()); @Override public void onEvent (final ReadIndexEvent newEvent, final long sequence, final boolean endOfBatch) throws Exception { if (newEvent.shutdownLatch != null ) { executeReadIndexEvents(this .events); this .events.clear(); newEvent.shutdownLatch.countDown(); return ; } this .events.add(newEvent); if (this .events.size() >= ReadOnlyServiceImpl.this .raftOptions.getApplyBatch() || endOfBatch) { executeReadIndexEvents(this .events); this .events.clear(); } } }
如果当前是批量事件发布的最后一个事件或者当前任务的暂存个数已经达到了最大的批处理数量,因此要开始处理ReadIndexEvent
了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void executeReadIndexEvents (final List<ReadIndexEvent> events) { if (events.isEmpty()) { return ; } final ReadIndexRequest.Builder rb = ReadIndexRequest.newBuilder() .setGroupId(this .node.getGroupId()) .setServerId(this .node.getServerId().toString()); final List<ReadIndexState> states = new ArrayList<>(events.size()); for (final ReadIndexEvent event : events) { rb.addEntries(ZeroByteStringHelper.wrap(event.requestContext.get())); states.add(new ReadIndexState(event.requestContext, event.done, event.startTime)); } final ReadIndexRequest request = rb.build(); this .node.handleReadIndexRequest(request, new ReadIndexResponseClosure(states, request)); }
接着,这里会调用Node.handleReadIndexRequest
去开始真正处理读请求操作了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public void handleReadIndexRequest (final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> done) { final long startMs = Utils.monotonicMs(); this .readLock.lock(); try { switch (this .state) { case STATE_LEADER: readLeader(request, ReadIndexResponse.newBuilder(), done); break ; case STATE_FOLLOWER: readFollower(request, done); break ; case STATE_TRANSFERRING: done.run(new Status(RaftError.EBUSY, "Is transferring leadership." )); break ; default : done.run(new Status(RaftError.EPERM, "Invalid state for readIndex: %s." , this .state)); break ; } } finally { this .readLock.unlock(); this .metrics.recordLatency("handle-read-index" , Utils.monotonicMs() - startMs); this .metrics.recordSize("handle-read-index-entries" , request.getEntriesCount()); } }
可以看到,当处理ReadIndexEvent
的时候,会根据当前节点的状态进行判断怎么处理这个线性读的请求,首先来看下,如果当前节点是Leader节点的怎么处理的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 private void readLeader (final ReadIndexRequest request, final ReadIndexResponse.Builder respBuilder, final RpcResponseClosure<ReadIndexResponse> closure) { final int quorum = getQuorum(); if (quorum <= 1 ) { respBuilder.setSuccess(true ) .setIndex(this .ballotBox.getLastCommittedIndex()); closure.setResponse(respBuilder.build()); closure.run(Status.OK()); return ; } final long lastCommittedIndex = this .ballotBox.getLastCommittedIndex(); if (this .logManager.getTerm(lastCommittedIndex) != this .currTerm) { closure .run(new Status( RaftError.EAGAIN, "ReadIndex request rejected because leader has not committed any log entry at its term, logIndex=%d, currTerm=%d." , lastCommittedIndex, this .currTerm)); return ; } respBuilder.setIndex(lastCommittedIndex); if (request.getPeerId() != null ) { final PeerId peer = new PeerId(); peer.parse(request.getServerId()); if (!this .conf.contains(peer) && !this .conf.containsLearner(peer)) { closure .run(new Status(RaftError.EPERM, "Peer %s is not in current configuration: %s." , peer, this .conf)); return ; } } ReadOnlyOption readOnlyOpt = this .raftOptions.getReadOnlyOptions(); if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && !isLeaderLeaseValid()) { readOnlyOpt = ReadOnlyOption.ReadOnlySafe; } switch (readOnlyOpt) { case ReadOnlySafe: final List<PeerId> peers = this .conf.getConf().getPeers(); Requires.requireTrue(peers != null && !peers.isEmpty(), "Empty peers" ); final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure(closure, respBuilder, quorum, peers.size()); for (final PeerId peer : peers) { if (peer.equals(this .serverId)) { continue ; } this .replicatorGroup.sendHeartbeat(peer, heartbeatDone); } break ; case ReadOnlyLeaseBased: respBuilder.setSuccess(true ); closure.setResponse(respBuilder.build()); closure.run(Status.OK()); break ; } }
可以看出来,当ReadIndexRequest
被Leader处理的时候,会去通过一系列的规则去判断当前的读请求能否去执行,并且需要通过心跳机制或者Leader
租约机制进行判断当前的Leader是否真的是Leader。
那么,如果读请求是Follower
发起的,那么是怎么处理的呢?
1 2 3 4 5 6 7 8 9 10 11 12 private void readFollower (final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> closure) { if (this .leaderId == null || this .leaderId.isEmpty()) { closure.run(new Status(RaftError.EPERM, "No leader at term %d." , this .currTerm)); return ; } final ReadIndexRequest newRequest = ReadIndexRequest.newBuilder() .mergeFrom(request) .setPeerId(this .leaderId.toString()) .build(); this .rpcService.readIndex(this .leaderId.getEndpoint(), newRequest, -1 , closure); }
这里就要看ReadIndexResponseClosure
这个类了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 class ReadIndexResponseClosure extends RpcResponseClosureAdapter <ReadIndexResponse > { final List<ReadIndexState> states; final ReadIndexRequest request; public ReadIndexResponseClosure (final List<ReadIndexState> states, final ReadIndexRequest request) { super (); this .states = states; this .request = request; } @Override public void run (final Status status) { if (!status.isOk()) { notifyFail(status); return ; } final ReadIndexResponse readIndexResponse = getResponse(); if (!readIndexResponse.getSuccess()) { notifyFail(new Status(-1 , "Fail to run ReadIndex task, maybe the leader stepped down." )); return ; } final ReadIndexStatus readIndexStatus = new ReadIndexStatus(this .states, this .request, readIndexResponse.getIndex()); for (final ReadIndexState state : this .states) { state.setIndex(readIndexResponse.getIndex()); } boolean doUnlock = true ; ReadOnlyServiceImpl.this .lock.lock(); try { if (readIndexStatus.isApplied(ReadOnlyServiceImpl.this .fsmCaller.getLastAppliedIndex())) { ReadOnlyServiceImpl.this .lock.unlock(); doUnlock = false ; notifySuccess(readIndexStatus); } else { ReadOnlyServiceImpl.this .pendingNotifyStatus .computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10 )) .add(readIndexStatus); } } finally { if (doUnlock) { ReadOnlyServiceImpl.this .lock.unlock(); } } } private void notifyFail (final Status status) { final long nowMs = Utils.monotonicMs(); for (final ReadIndexState state : this .states) { ReadOnlyServiceImpl.this .nodeMetrics.recordLatency("read-index" , nowMs - state.getStartTimeMs()); final ReadIndexClosure done = state.getDone(); if (done != null ) { final Bytes reqCtx = state.getRequestContext(); done.run(status, ReadIndexClosure.INVALID_LOG_INDEX, reqCtx != null ? reqCtx.get() : null ); } } } }
以上就是JRaft
实现的线性读机制。接下来我贴一下我在nacos-1.3.0-beta
对JRaft
读操作的设计。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 CompletableFuture<Response> get (final GetRequest request) { final String group = request.getGroup(); CompletableFuture<Response> future = new CompletableFuture<>(); final RaftGroupTuple tuple = findTupleByGroup(group); if (Objects.isNull(tuple)) { future.completeExceptionally(new NoSuchRaftGroupException(group)); return future; } final Node node = tuple.node; final LogProcessor processor = tuple.processor; try { node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { @Override public void run (Status status, long index, byte [] reqCtx) { if (status.isOk()) { try { Response response = processor.onRequest(request); future.complete(response); } catch (Throwable t) { MetricsMonitor.raftReadIndexFailed(); future.completeExceptionally(new ConsistencyException("The conformance protocol is temporarily unavailable for reading" , t)); } return ; } MetricsMonitor.raftReadIndexFailed(); Loggers.RAFT.error("ReadIndex has error : {}" , status.getErrorMsg()); future.completeExceptionally( new ConsistencyException("The conformance protocol is temporarily unavailable for reading, " + status.getErrorMsg())); } }); return future; } catch (Throwable e) { MetricsMonitor.raftReadFromLeader(); Loggers.RAFT.warn("Raft linear read failed, go to Leader read logic : {}" , e.toString()); readFromLeader(request, future); return future; } } public void readFromLeader (final GetRequest request,final CompletableFuture<Response> future) { commit(request.getGroup(), request, future) .whenComplete(new BiConsumer<Response, Throwable>() { @Override public void accept (Response response, Throwable throwable) { if (Objects.nonNull(throwable)) { future.completeExceptionally(new ConsistencyException("The conformance protocol is temporarily unavailable for reading" , throwable)); return ; } if (response.getSuccess()) { future.complete(response); } else { future.completeExceptionally( new ConsistencyException("The conformance protocol is temporarily unavailable for reading, " + response.getErrMsg())); } } }); }