1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
| static void onRpcReturned(final ThreadId id, final RequestType reqType, final Status status, final Message request, final Message response, final int seq, final int stateVersion, final long rpcSendTime) { if (id == null) { return; } final long startTimeMs = Utils.nowMs(); Replicator r; if ((r = (Replicator) id.lock()) == null) { return; }
if (stateVersion != r.version) { LOG.debug( "Replicator {} ignored old version response {}, current version is {}, request is {}\n, and response is {}\n, status is {}.", r, stateVersion, r.version, request, response, status); id.unlock(); return; } final PriorityQueue<RpcResponse> holdingQueue = r.pendingResponses; holdingQueue.add(new RpcResponse(reqType, seq, status, request, response, rpcSendTime)); if (holdingQueue.size() > r.raftOptions.getMaxReplicatorInflightMsgs()) { LOG.warn("Too many pending responses {} for replicator {}, maxReplicatorInflightMsgs={}", holdingQueue.size(), r.options.getPeerId(), r.raftOptions.getMaxReplicatorInflightMsgs()); r.resetInflights(); r.state = State.Probe; r.sendEmptyEntries(false); return; }
boolean continueSendEntries = false;
final boolean isLogDebugEnabled = LOG.isDebugEnabled(); StringBuilder sb = null; if (isLogDebugEnabled) { sb = new StringBuilder("Replicator ").append(r).append(" is processing RPC responses,"); } try { int processed = 0; while (!holdingQueue.isEmpty()) { final RpcResponse queuedPipelinedResponse = holdingQueue.peek();
if (queuedPipelinedResponse.seq != r.requiredNextSeq) { if (processed > 0) { if (isLogDebugEnabled) { sb.append("has processed ").append(processed).append(" responses,"); } break; } else { continueSendEntries = false; id.unlock(); return; } } holdingQueue.remove(); processed++; final Inflight inflight = r.pollInflight(); if (inflight == null) { if (isLogDebugEnabled) { sb.append("ignore response because request not found:").append(queuedPipelinedResponse) .append(",\n"); } continue; } if (inflight.seq != queuedPipelinedResponse.seq) { LOG.warn( "Replicator {} response sequence out of order, expect {}, but it is {}, reset state to try again.", r, inflight.seq, queuedPipelinedResponse.seq); r.resetInflights(); r.state = State.Probe; continueSendEntries = false; r.block(Utils.nowMs(), RaftError.EREQUEST.getNumber()); return; } try { switch (queuedPipelinedResponse.requestType) { case AppendEntries: continueSendEntries = onAppendEntriesReturned(id, inflight, queuedPipelinedResponse.status, (AppendEntriesRequest) queuedPipelinedResponse.request, (AppendEntriesResponse) queuedPipelinedResponse.response, rpcSendTime, startTimeMs, r); break; case Snapshot: continueSendEntries = onInstallSnapshotReturned(id, r, queuedPipelinedResponse.status, (InstallSnapshotRequest) queuedPipelinedResponse.request, (InstallSnapshotResponse) queuedPipelinedResponse.response); break; } } finally { if (continueSendEntries) { r.getAndIncrementRequiredNextSeq(); } else { break; } } } } finally { if (isLogDebugEnabled) { sb.append(", after processed, continue to send entries: ").append(continueSendEntries); LOG.debug(sb.toString()); } if (continueSendEntries) { r.sendEntries(); } } }
|