kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [2/2] kudu git commit: consensus queue: Clarify return value of ResponseFromPeer()
Date Tue, 26 Jun 2018 21:08:00 GMT
consensus queue: Clarify return value of ResponseFromPeer()

This patch simply makes some minor semantic improvements to the
ConsensusQueue::ResponseFromPeer() API by returning the value of whether
to immediately send another request and improving the header file comment
documentation about those semantics.

Change-Id: Ibe73b5c69630898327ac7787e3dfcb0b761f323e
Reviewed-on: http://gerrit.cloudera.org:8080/10821
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/97210811
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/97210811
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/97210811

Branch: refs/heads/master
Commit: 97210811b6118a3a57712262acfb2c561762ebe2
Parents: a935f70
Author: Mike Percy <mpercy@apache.org>
Authored: Mon Jun 25 18:14:15 2018 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Tue Jun 26 21:07:48 2018 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus_peers.cc      |   5 +-
 src/kudu/consensus/consensus_queue-test.cc | 108 ++++++++++++------------
 src/kudu/consensus/consensus_queue.cc      |  33 +++++---
 src/kudu/consensus/consensus_queue.h       |  22 +++--
 4 files changed, 89 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/97210811/src/kudu/consensus/consensus_peers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index b4df599..c7cf2fe 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -365,8 +365,7 @@ void Peer::DoProcessResponse() {
   VLOG_WITH_PREFIX_UNLOCKED(2) << "Response from peer " << peer_pb().permanent_uuid()
<< ": "
       << SecureShortDebugString(response_);
 
-  bool more_pending;
-  queue_->ResponseFromPeer(peer_pb_.permanent_uuid(), response_, &more_pending);
+  bool send_more_immediately = queue_->ResponseFromPeer(peer_pb_.permanent_uuid(), response_);
 
   {
     std::unique_lock<simple_spinlock> lock(peer_lock_);
@@ -377,7 +376,7 @@ void Peer::DoProcessResponse() {
   // We're OK to read the state_ without a lock here -- if we get a race,
   // the worst thing that could happen is that we'll make one more request before
   // noticing a close.
-  if (more_pending) {
+  if (send_more_immediately) {
     SendNextRequest(true);
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/97210811/src/kudu/consensus/consensus_queue-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc
index c3cfd24..840816c 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -143,7 +143,7 @@ class ConsensusQueueTest : public KuduTest {
                                const OpId& last_received,
                                const OpId& last_received_current_leader,
                                int last_committed_idx,
-                               bool* more_pending) {
+                               bool* send_more_immediately) {
 
     queue_->TrackPeer(MakePeer(kPeerUuid, RaftPeerPB::VOTER));
     response->set_responder_uuid(kPeerUuid);
@@ -160,7 +160,7 @@ class ConsensusQueueTest : public KuduTest {
     // that our last operation is actually 'last_received'.
     RefuseWithLogPropertyMismatch(response, last_received, last_received_current_leader);
     response->mutable_status()->set_last_committed_idx(last_committed_idx);
-    queue_->ResponseFromPeer(response->responder_uuid(), *response, more_pending);
+    *send_more_immediately = queue_->ResponseFromPeer(response->responder_uuid(), *response);
     request->Clear();
     response->mutable_status()->Clear();
   }
@@ -170,10 +170,10 @@ class ConsensusQueueTest : public KuduTest {
                                ConsensusResponsePB* response,
                                const OpId& last_received,
                                const OpId& last_received_current_leader,
-                               bool* more_pending) {
+                               bool* send_more_immediately) {
     return UpdatePeerWatermarkToOp(request, response, last_received,
                                    last_received_current_leader,
-                                   last_received.index(), more_pending);
+                                   last_received.index(), send_more_immediately);
   }
 
   void RefuseWithLogPropertyMismatch(ConsensusResponsePB* response,
@@ -245,15 +245,15 @@ TEST_F(ConsensusQueueTest, TestStartTrackingAfterStart) {
   ConsensusRequestPB request;
   ConsensusResponsePB response;
   response.set_responder_uuid(kPeerUuid);
-  bool more_pending = false;
+  bool send_more_immediately = false;
 
   // Peer already has some messages, last one being 7.50
   OpId last_received = MakeOpId(7, 50);
   OpId last_received_current_leader = MinimumOpId();
 
   UpdatePeerWatermarkToOp(&request, &response, last_received,
-                          last_received_current_leader, &more_pending);
-  ASSERT_TRUE(more_pending);
+                          last_received_current_leader, &send_more_immediately);
+  ASSERT_TRUE(send_more_immediately);
 
   // Getting a new request should get all operations after 7.50
   vector<ReplicateRefPtr> refs;
@@ -263,8 +263,8 @@ TEST_F(ConsensusQueueTest, TestStartTrackingAfterStart) {
   ASSERT_EQ(50, request.ops_size());
 
   SetLastReceivedAndLastCommitted(&response, request.ops(49).id());
-  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
-  ASSERT_FALSE(more_pending) << "Queue still had requests pending";
+  send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
+  ASSERT_FALSE(send_more_immediately) << "Queue still had requests pending";
 
   // if we ask for a new request, it should come back empty
   ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy));
@@ -307,10 +307,11 @@ TEST_F(ConsensusQueueTest, TestGetPagedMessages) {
   ConsensusRequestPB request;
   ConsensusResponsePB response;
   response.set_responder_uuid(kPeerUuid);
-  bool more_pending = false;
+  bool send_more_immediately = false;
 
-  UpdatePeerWatermarkToOp(&request, &response, MinimumOpId(), MinimumOpId(), &more_pending);
-  ASSERT_TRUE(more_pending);
+  UpdatePeerWatermarkToOp(&request, &response, MinimumOpId(), MinimumOpId(),
+                          &send_more_immediately);
+  ASSERT_TRUE(send_more_immediately);
 
   // Append the messages after the queue is tracked. Otherwise the ops might
   // get evicted from the cache immediately and the requests below would
@@ -328,8 +329,8 @@ TEST_F(ConsensusQueueTest, TestGetPagedMessages) {
     last = request.ops(request.ops_size() -1).id();
     SetLastReceivedAndLastCommitted(&response, last);
     VLOG(1) << "Faking received up through " << last;
-    queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
-    ASSERT_TRUE(more_pending);
+    send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
+    ASSERT_TRUE(send_more_immediately);
   }
   vector<ReplicateRefPtr> refs;
   bool needs_tablet_copy;
@@ -338,8 +339,8 @@ TEST_F(ConsensusQueueTest, TestGetPagedMessages) {
   ASSERT_EQ(1, request.ops_size());
   last = request.ops(request.ops_size() -1).id();
   SetLastReceivedAndLastCommitted(&response, last);
-  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
-  ASSERT_FALSE(more_pending);
+  send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
+  ASSERT_FALSE(send_more_immediately);
 
   // extract the ops from the request to avoid double free
   request.mutable_ops()->ExtractSubrange(0, request.ops_size(), nullptr);
@@ -364,10 +365,10 @@ TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
   ConsensusRequestPB request;
   ConsensusResponsePB response;
   response.set_responder_uuid(kPeerUuid);
-  bool more_pending = false;
+  bool send_more_immediately = false;
 
-  UpdatePeerWatermarkToOp(&request, &response, first_msg, MinimumOpId(), &more_pending);
-  ASSERT_TRUE(more_pending);
+  UpdatePeerWatermarkToOp(&request, &response, first_msg, MinimumOpId(), &send_more_immediately);
+  ASSERT_TRUE(send_more_immediately);
 
   // Tracking a peer a new peer should have moved the all replicated watermark back.
   ASSERT_EQ(queue_->GetAllReplicatedIndex(), 0);
@@ -384,8 +385,8 @@ TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
   SetLastReceivedAndLastCommitted(&response, request.ops(49).id());
   response.set_responder_term(28);
 
-  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
-  ASSERT_TRUE(more_pending) << "Queue didn't have anymore requests pending";
+  send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
+  ASSERT_TRUE(send_more_immediately) << "Queue didn't have anymore requests pending";
 
   ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 100);
   ASSERT_EQ(queue_->GetAllReplicatedIndex(), 100);
@@ -399,8 +400,8 @@ TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
 
   SetLastReceivedAndLastCommitted(&response, expected);
   response.set_responder_term(expected.term());
-  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
-  ASSERT_FALSE(more_pending) << "Queue didn't have anymore requests pending";
+  send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
+  ASSERT_FALSE(send_more_immediately) << "Queue didn't have anymore requests pending";
 
   WaitForLocalPeerToAckIndex(expected.index());
 
@@ -433,23 +434,23 @@ TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) {
   ConsensusResponsePB response;
   response.set_responder_term(1);
 
-  bool more_pending;
+  bool send_more_immediately;
   OpId last_sent = MakeOpId(0, 5);
 
   // Ack the first five operations for peer-1.
   response.set_responder_uuid("peer-1");
   SetLastReceivedAndLastCommitted(&response, last_sent, MinimumOpId().index());
 
-  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
-  ASSERT_TRUE(more_pending);
+  send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
+  ASSERT_TRUE(send_more_immediately);
 
   // Committed index should be the same
   ASSERT_EQ(queue_->GetCommittedIndex(), 0);
 
   // Ack the first five operations for peer-2.
   response.set_responder_uuid("peer-2");
-  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
-  ASSERT_TRUE(more_pending);
+  send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
+  ASSERT_TRUE(send_more_immediately);
 
   // A majority has now replicated up to 0.5: local, 'peer-1', and 'peer-2'.
   ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), 5);
@@ -464,10 +465,10 @@ TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) {
   response.set_responder_uuid("peer-3");
   last_sent = MakeOpId(1, 10);
   SetLastReceivedAndLastCommitted(&response, last_sent, MinimumOpId().index());
-  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
+  send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
 
   // peer-3 now has all operations, and the commit index hasn't advanced.
-  EXPECT_FALSE(more_pending);
+  EXPECT_FALSE(send_more_immediately);
 
   // Watermarks should remain the same as above: we still have not majority-replicated
   // anything in the current term, so committed index cannot advance.
@@ -477,8 +478,8 @@ TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) {
 
   // Ack the remaining operations for peer-4.
   response.set_responder_uuid("peer-4");
-  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
-  EXPECT_TRUE(more_pending);
+  send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
+  EXPECT_TRUE(send_more_immediately);
 
   // Now that a majority of peers have replicated an operation in the queue's
   // term the committed index should advance.
@@ -525,9 +526,8 @@ TEST_F(ConsensusQueueTest, TestNonVoterAcksDontCountTowardMajority) {
                                   /*last_received=*/ MakeOpId(kCurrentTerm, kNumMessages),
                                   /*last_committed_idx=*/ kNoneCommittedIndex);
 
-  bool more_pending;
-  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
-  ASSERT_FALSE(more_pending);
+  bool send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
+  ASSERT_FALSE(send_more_immediately);
 
   // Committed index should be the same.
   ASSERT_EQ(kNoneCommittedIndex, queue_->GetCommittedIndex());
@@ -535,8 +535,8 @@ TEST_F(ConsensusQueueTest, TestNonVoterAcksDontCountTowardMajority) {
   // 4. Send an identical ack from the 2nd VOTER peer. This should cause the
   // operation to be committed.
   response.set_responder_uuid(kOtherVoterPeer);
-  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
-  ASSERT_TRUE(more_pending); // The committed index has increased.
+  send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
+  ASSERT_TRUE(send_more_immediately); // The committed index has increased.
 
   // The committed index should include the full set of ops now.
   ASSERT_EQ(kNumMessages, queue_->GetCommittedIndex());
@@ -545,8 +545,8 @@ TEST_F(ConsensusQueueTest, TestNonVoterAcksDontCountTowardMajority) {
                                   /*last_received=*/ MakeOpId(kCurrentTerm, kNumMessages),
                                   /*last_committed_idx=*/ kNumMessages);
 
-  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
-  ASSERT_FALSE(more_pending);
+  send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
+  ASSERT_FALSE(send_more_immediately);
 }
 
 // In this test we append a sequence of operations to a log
@@ -580,7 +580,7 @@ TEST_F(ConsensusQueueTest, TestQueueLoadsOperationsForPeer) {
   ConsensusRequestPB request;
   ConsensusResponsePB response;
   response.set_responder_uuid(kPeerUuid);
-  bool more_pending = false;
+  bool send_more_immediately = false;
 
   // The peer will actually be behind the first operation in the queue.
   // In this case about 50 operations before.
@@ -594,10 +594,10 @@ TEST_F(ConsensusQueueTest, TestQueueLoadsOperationsForPeer) {
                                                   &response,
                                                   peers_last_op,
                                                   MinimumOpId(),
-                                                  &more_pending));
+                                                  &send_more_immediately));
 
   // The queue should reply that there are more messages for the peer.
-  ASSERT_TRUE(more_pending);
+  ASSERT_TRUE(send_more_immediately);
 
   // When we get another request for the peer the queue should load
   // the missing operations.
@@ -656,7 +656,7 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
   ConsensusResponsePB response;
   vector<ReplicateRefPtr> refs;
   response.set_responder_uuid(kPeerUuid);
-  bool more_pending = false;
+  bool send_more_immediately = false;
 
   queue_->TrackPeer(MakePeer(kPeerUuid, RaftPeerPB::VOTER));
 
@@ -682,11 +682,11 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
   error->set_code(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH);
   StatusToPB(Status::IllegalState("LMP failed."), error->mutable_status());
 
-  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
+  send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
   request.Clear();
 
   // The queue should reply that there are more operations pending.
-  ASSERT_TRUE(more_pending);
+  ASSERT_TRUE(send_more_immediately);
 
   // We're waiting for a two nodes. The all committed watermark should be
   // 0.0 since we haven't had a successful exchange with the 'remote' peer.
@@ -710,7 +710,8 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
   // Now when we respond the watermarks should advance.
   response.mutable_status()->clear_error();
   SetLastReceivedAndLastCommitted(&response, MakeOpId(2, 21), 5);
-  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
+  send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
+  ASSERT_TRUE(send_more_immediately);
 
   // Now the watermark should have advanced.
   ASSERT_EQ(queue_->GetAllReplicatedIndex(), 21);
@@ -797,7 +798,7 @@ TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog)
   ConsensusResponsePB response;
   vector<ReplicateRefPtr> refs;
 
-  bool more_pending;
+  bool send_more_immediately;
   // We expect the majority replicated watermark to start at the committed index.
   int64_t expected_majority_replicated = kInitialCommittedIndex;
   // We expect the all replicated watermark to be reset when we track a new peer.
@@ -806,8 +807,9 @@ TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog)
   ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), expected_majority_replicated);
   ASSERT_EQ(queue_->GetAllReplicatedIndex(), expected_all_replicated);
 
-  UpdatePeerWatermarkToOp(&request, &response, MakeOpId(75, 49), MinimumOpId(), 31,
&more_pending);
-  ASSERT_TRUE(more_pending);
+  UpdatePeerWatermarkToOp(&request, &response, MakeOpId(75, 49), MinimumOpId(), 31,
+                          &send_more_immediately);
+  ASSERT_TRUE(send_more_immediately);
 
   for (int i = 31; i <= 53; i++) {
     if (i <= 45) {
@@ -835,8 +837,8 @@ TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog)
   // When the peer acks that it received an operation that is not in our current
   // term, it gets ignored in terms of watermark advancement.
   SetLastReceivedAndLastCommitted(&response, MakeOpId(75, 49), *last_op, 31);
-  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
-  ASSERT_TRUE(more_pending);
+  send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
+  ASSERT_TRUE(send_more_immediately);
 
   // We've sent (and received and ack) up to 72.40 from the remote peer
   expected_majority_replicated = expected_all_replicated = 40;
@@ -854,7 +856,7 @@ TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog)
   last_op = &request.ops(request.ops_size() - 1).id();
 
   SetLastReceivedAndLastCommitted(&response, MakeOpId(75, 49), *last_op, 31);
-  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
+  send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
 
   // We've now sent (and received an ack) up to 73.39
   expected_majority_replicated = expected_all_replicated = 49;
@@ -874,7 +876,7 @@ TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog)
   expected_majority_replicated = expected_all_replicated = 53;
 
   SetLastReceivedAndLastCommitted(&response, MakeOpId(76, 53), 31);
-  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
+  send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response);
 
   ASSERT_EQ(queue_->GetMajorityReplicatedIndexForTests(), expected_majority_replicated);
   ASSERT_EQ(queue_->GetAllReplicatedIndex(), expected_all_replicated);

http://git-wip-us.apache.org/repos/asf/kudu/blob/97210811/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index 343a591..0c9a753 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -350,8 +350,7 @@ void PeerMessageQueue::LocalPeerAppendFinished(const OpId& id,
     std::lock_guard<simple_spinlock> lock(queue_lock_);
     fake_response.mutable_status()->set_last_committed_idx(queue_state_.committed_index);
   }
-  bool junk;
-  ResponseFromPeer(local_peer_pb_.permanent_uuid(), fake_response, &junk);
+  ResponseFromPeer(local_peer_pb_.permanent_uuid(), fake_response);
 
   callback.Run(status);
 }
@@ -1025,13 +1024,13 @@ void PeerMessageQueue::PromoteIfNeeded(TrackedPeer* peer, const TrackedPeer&
pre
   }
 }
 
-void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
-                                        const ConsensusResponsePB& response,
-                                        bool* more_pending) {
+bool PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
+                                        const ConsensusResponsePB& response) {
   DCHECK(response.IsInitialized()) << "Error: Uninitialized: "
       << response.InitializationErrorString() << ". Response: " << SecureShortDebugString(response);
   CHECK(!response.has_error());
 
+  bool send_more_immediately = false;
   boost::optional<int64_t> updated_commit_index;
   Mode mode_copy;
   {
@@ -1041,8 +1040,7 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
     if (PREDICT_FALSE(queue_state_.state != kQueueOpen || peer == nullptr)) {
       LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Queue is closed or peer was untracked,
disregarding "
           "peer response. Response: " << SecureShortDebugString(response);
-      *more_pending = false;
-      return;
+      return send_more_immediately;
     }
 
     // Sanity checks.
@@ -1066,7 +1064,10 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
     const TrackedPeer prev_peer_state = *peer;
 
     // Update the peer's last exchange status based on the response.
-    UpdateExchangeStatus(peer, prev_peer_state, response, more_pending);
+    // In this case, if there is a log matching property (LMP) mismatch, we
+    // want to immediately send another request as we attempt to sync the log
+    // offset between the local leader and the remote peer.
+    UpdateExchangeStatus(peer, prev_peer_state, response, &send_more_immediately);
 
     // If the reported last-received op for the replica is in our local log,
     // then resume sending entries from that point onward. Otherwise, resume
@@ -1107,8 +1108,10 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
     }
 
     if (peer->last_exchange_status != PeerStatus::OK) {
-      // In this case, *more_pending has has already been set to false by UpdateExchangeStatus().
-      return;
+      // In this case, 'send_more_immediately' has already been set by
+      // UpdateExchangeStatus() to true in the case of an LMP mismatch, false
+      // otherwise.
+      return send_more_immediately;
     }
 
     if (response.has_responder_term()) {
@@ -1187,10 +1190,10 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
       }
     }
 
-    // If our log has the next request for the peer or if the peer's committed index is
-    // lower than our own, set 'more_pending' to true.
-    *more_pending = log_cache_.HasOpBeenWritten(peer->next_index) ||
-        (peer->last_known_committed_index < queue_state_.committed_index);
+    // If the peer's committed index is lower than our own, or if our log has
+    // the next request for the peer, set 'send_more_immediately' to true.
+    send_more_immediately = peer->last_known_committed_index < queue_state_.committed_index
||
+                            log_cache_.HasOpBeenWritten(peer->next_index);
 
     log_cache_.EvictThroughOp(queue_state_.all_replicated_index);
 
@@ -1200,6 +1203,8 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
   if (mode_copy == LEADER && updated_commit_index != boost::none) {
     NotifyObserversOfCommitIndexChange(*updated_commit_index);
   }
+
+  return send_more_immediately;
 }
 
 PeerMessageQueue::TrackedPeer PeerMessageQueue::GetTrackedPeerForTests(const string&
uuid) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/97210811/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index 27e52e1..844fefd 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -287,11 +287,13 @@ class PeerMessageQueue {
                         PeerStatus ps,
                         const Status& status);
 
-  // Updates the request queue with the latest response of a peer, returns
-  // whether this peer has more requests pending.
-  void ResponseFromPeer(const std::string& peer_uuid,
-                        const ConsensusResponsePB& response,
-                        bool* more_pending);
+  // Updates the request queue with the latest response from a request to a
+  // consensus peer.
+  // Returns true iff there are more requests pending in the queue for this
+  // peer and another request should be sent immediately, with no intervening
+  // delay.
+  bool ResponseFromPeer(const std::string& peer_uuid,
+                        const ConsensusResponsePB& response);
 
   // Called by the consensus implementation to update the queue's watermarks
   // based on information provided by the leader. This is used for metrics and
@@ -303,9 +305,9 @@ class PeerMessageQueue {
   // This should not be called by a leader.
   void UpdateLastIndexAppendedToLeader(int64_t last_idx_appended_to_leader);
 
-  // Closes the queue, peers are still allowed to call UntrackPeer() and
-  // ResponseFromPeer() but no additional peers can be tracked or messages
-  // queued.
+  // Closes the queue. Once the queue is closed, peers are still allowed to
+  // call UntrackPeer() and ResponseFromPeer(), however no additional peers may
+  // be tracked and no additional messages may be enqueued.
   void Close();
 
   int64_t GetQueuedOperationsSizeBytesForTests() const;
@@ -445,7 +447,9 @@ class PeerMessageQueue {
   void UpdatePeerHealthUnlocked(TrackedPeer* peer);
 
   // Update the peer's last exchange status, and other fields, based on the
-  // response.
+  // response. Sets 'lmp_mismatch' to true if the given response indicates
+  // there was a log-matching property mismatch on the remote, otherwise sets
+  // it to false.
   void UpdateExchangeStatus(TrackedPeer* peer, const TrackedPeer& prev_peer_state,
                             const ConsensusResponsePB& response, bool* lmp_mismatch);
 


Mime
View raw message