kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [1/2] kudu git commit: KUDU-699. consensus: Peer::Close() should not block on outstanding requests
Date Mon, 19 Dec 2016 09:11:43 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 2840a586d -> b9dadad6b


KUDU-699. consensus: Peer::Close() should not block on outstanding requests

Prior to this patch, Peer::Close() would wait on a semaphore which was
held by outbound RPC requests. This meant that, in the case of the
default 1 second consensus RPC timeout, Close() would commonly block for
a fairly long time.

This blocking wasn't necessary for semantic correctness: even if the
response came back with some kind of successful result, the result would
be ignored because the leader was already in the process of stepping
down. Instead, the blocking was an implementation detail: the
outstanding RPC requests needed to keep alive the RPC request/response
protobufs until they finished, and those protobufs are owned by the Peer
object.

In addition to being unnecessary, the blocking actually causes a couple
serious issues:

1) in an overloaded cluster, we often see a lot of leader election churn
and slow UpdateConsensus() calls. When UpdateConsensus() calls are slow,
this would cause leader step-down to also be slow because of the
PeerManager::Close() call taking a long time. This slow step-down
process would hold an RPC thread, increasing the possibility of other
RPCs getting rejected, retried, etc, contributing to the overall
problems on the cluster. This is often visible in 'pstack' as 5-10
threads stuck in 'PeerManager::Close()'

2) KUDU-1788 describes an issue in which the short timeout we're
currently using for consensus RPCs ends up resulting in those RPCs never
succeeding, and wasting a lot of network bandwidth with repeated
retries. Part of the solution to this issue is likely to involve
boosting the timeout. With a longer RPC timeout, the blocking behavior
on Close() described above is even more problematic.

This patch fixes the issue as follows:

1) Peers now have shared ownership and inherit from
std::enable_shared_from_this. When we make an RPC, we bind a shared_ptr
to the Peer in the RPC's callback. This ensures that the Peer will not
be destructed while an RPC is still in-flight.

2) We no longer need to use the 'sem_' variable to track whether an RPC
is in flight. The purpose of this semaphore was two-fold: (a) to cause
Close() to block, and (b) to prevent a second RPC from being sent while
one was already in flight. The former purpose is no longer a goal. The
latter purpose is attained just as easily using a simple boolean. So,
this patch removes the semaphore and instead just uses a
'request_pending_' boolean.

3) While making these changes, I removed the 'state_' member and
enum. The state was used to flag that Close() had been called, and to
flag whether the first request had been sent yet. I replaced the state
with two booleans, which I found simpler to reason about.

A new test is included which sets the consensus RPC timeout to be long,
stops, a follower, and then asks the leader to step down. Prior to this
patch, the step-down would take as long as the consensus RPC timeout and
cause the test to fail. With this patch, the step-down occurs immediately.

Change-Id: I4e1bc80f536defad28f4d7b51fb95aa32dc9fca0
Reviewed-on: http://gerrit.cloudera.org:8080/5490
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a32ea341
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a32ea341
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a32ea341

Branch: refs/heads/master
Commit: a32ea34148249b92c4428322f5778665b6fb1357
Parents: 2840a58
Author: Todd Lipcon <todd@apache.org>
Authored: Tue Dec 13 16:12:36 2016 +0700
Committer: Todd Lipcon <todd@apache.org>
Committed: Mon Dec 19 09:08:50 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus_peers-test.cc      |  15 +-
 src/kudu/consensus/consensus_peers.cc           | 185 +++++++++++--------
 src/kudu/consensus/consensus_peers.h            |  43 ++---
 src/kudu/consensus/peer_manager.cc              |  21 +--
 src/kudu/consensus/peer_manager.h               |   2 +-
 .../integration-tests/raft_consensus-itest.cc   |  34 ++++
 6 files changed, 175 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/a32ea341/src/kudu/consensus/consensus_peers-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers-test.cc b/src/kudu/consensus/consensus_peers-test.cc
index 626afeb..40ba0d2 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <memory>
+
 #include <gtest/gtest.h>
 
 #include "kudu/common/schema.h"
@@ -38,6 +40,7 @@ namespace consensus {
 
 using log::Log;
 using log::LogOptions;
+using std::shared_ptr;
 
 const char* kTabletId = "test-peers-tablet";
 const char* kLeaderUuid = "peer-0";
@@ -80,7 +83,7 @@ class ConsensusPeersTest : public KuduTest {
 
   DelayablePeerProxy<NoOpTestPeerProxy>* NewRemotePeer(
       const string& peer_name,
-      gscoped_ptr<Peer>* peer) {
+      shared_ptr<Peer>* peer) {
     RaftPeerPB peer_pb;
     peer_pb.set_permanent_uuid(peer_name);
     auto proxy_ptr = new DelayablePeerProxy<NoOpTestPeerProxy>(
@@ -145,7 +148,7 @@ TEST_F(ConsensusPeersTest, TestRemotePeer) {
                                 kMinimumTerm,
                                 BuildRaftConfigPBForTests(3));
 
-  gscoped_ptr<Peer> remote_peer;
+  shared_ptr<Peer> remote_peer;
   DelayablePeerProxy<NoOpTestPeerProxy>* proxy =
       NewRemotePeer(kFollowerUuid, &remote_peer);
 
@@ -170,11 +173,11 @@ TEST_F(ConsensusPeersTest, TestRemotePeers) {
                                 BuildRaftConfigPBForTests(3));
 
   // Create a set of remote peers
-  gscoped_ptr<Peer> remote_peer1;
+  shared_ptr<Peer> remote_peer1;
   DelayablePeerProxy<NoOpTestPeerProxy>* remote_peer1_proxy =
       NewRemotePeer("peer-1", &remote_peer1);
 
-  gscoped_ptr<Peer> remote_peer2;
+  shared_ptr<Peer> remote_peer2;
   DelayablePeerProxy<NoOpTestPeerProxy>* remote_peer2_proxy =
       NewRemotePeer("peer-2", &remote_peer2);
 
@@ -229,7 +232,7 @@ TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress)
{
                                 BuildRaftConfigPBForTests(3));
 
   auto mock_proxy = new MockedPeerProxy(pool_.get());
-  gscoped_ptr<Peer> peer;
+  shared_ptr<Peer> peer;
   ASSERT_OK(Peer::NewRemotePeer(FakeRaftPeerPB(kFollowerUuid),
                                 kTabletId,
                                 kLeaderUuid,
@@ -267,7 +270,7 @@ TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) {
                                 BuildRaftConfigPBForTests(3));
 
   auto mock_proxy = new MockedPeerProxy(pool_.get());
-  gscoped_ptr<Peer> peer;
+  shared_ptr<Peer> peer;
   ASSERT_OK(Peer::NewRemotePeer(FakeRaftPeerPB(kFollowerUuid),
                                 kTabletId,
                                 kLeaderUuid,

http://git-wip-us.apache.org/repos/asf/kudu/blob/a32ea341/src/kudu/consensus/consensus_peers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index 92bb35a..5102dcd 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -40,6 +40,10 @@
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/threadpool.h"
 
+// This file uses C++14 'generalized lambda capture' syntax, which is supported
+// in C++11 mode both by clang and by GCC. Disable the accompanying warning.
+#pragma clang diagnostic ignored "-Wc++14-extensions"
+
 DEFINE_int32(consensus_rpc_timeout_ms, 1000,
              "Timeout used for all consensus internal RPC communications.");
 TAG_FLAG(consensus_rpc_timeout_ms, advanced);
@@ -84,16 +88,16 @@ Status Peer::NewRemotePeer(const RaftPeerPB& peer_pb,
                            PeerMessageQueue* queue,
                            ThreadPool* thread_pool,
                            gscoped_ptr<PeerProxy> proxy,
-                           gscoped_ptr<Peer>* peer) {
-
-  gscoped_ptr<Peer> new_peer(new Peer(peer_pb,
-                                      tablet_id,
-                                      leader_uuid,
-                                      std::move(proxy),
-                                      queue,
-                                      thread_pool));
+                           shared_ptr<Peer>* peer) {
+
+  shared_ptr<Peer> new_peer(new Peer(peer_pb,
+                                     tablet_id,
+                                     leader_uuid,
+                                     std::move(proxy),
+                                     queue,
+                                     thread_pool));
   RETURN_NOT_OK(new_peer->Init());
-  peer->reset(new_peer.release());
+  *peer = std::move(new_peer);
   return Status::OK();
 }
 
@@ -106,69 +110,66 @@ Peer::Peer(const RaftPeerPB& peer_pb, string tablet_id, string leader_uuid,
       proxy_(std::move(proxy)),
       queue_(queue),
       failed_attempts_(0),
-      sem_(1),
       heartbeater_(
           peer_pb.permanent_uuid(),
           MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms),
           boost::bind(&Peer::SignalRequest, this, true)),
-      thread_pool_(thread_pool),
-      state_(kPeerCreated) {}
-
+      thread_pool_(thread_pool) {
+}
 
 Status Peer::Init() {
   std::lock_guard<simple_spinlock> lock(peer_lock_);
   queue_->TrackPeer(peer_pb_.permanent_uuid());
   RETURN_NOT_OK(heartbeater_.Start());
-  state_ = kPeerStarted;
   return Status::OK();
 }
 
 Status Peer::SignalRequest(bool even_if_queue_empty) {
-  // If the peer is currently sending, return Status::OK().
-  // If there are new requests in the queue we'll get them on ProcessResponse().
-  if (!sem_.TryAcquire()) {
-    return Status::OK();
-  }
-  {
-    std::lock_guard<simple_spinlock> l(peer_lock_);
+  std::lock_guard<simple_spinlock> l(peer_lock_);
 
-    if (PREDICT_FALSE(state_ == kPeerClosed)) {
-      sem_.Release();
-      return Status::IllegalState("Peer was closed.");
-    }
-
-    // For the first request sent by the peer, we send it even if the queue is empty,
-    // which it will always appear to be for the first request, since this is the
-    // negotiation round.
-    if (PREDICT_FALSE(state_ == kPeerStarted)) {
-      even_if_queue_empty = true;
-      state_ = kPeerRunning;
-    }
-    DCHECK_EQ(state_, kPeerRunning);
-
-    // If our last request generated an error, and this is not a normal
-    // heartbeat request, then don't send the "per-RPC" request. Instead,
-    // we'll wait for the heartbeat.
-    //
-    // TODO: we could consider looking at the number of consecutive failed
-    // attempts, and instead of ignoring the signal, ask the heartbeater
-    // to "expedite" the next heartbeat in order to achieve something like
-    // exponential backoff after an error. As it is implemented today, any
-    // transient error will result in a latency blip as long as the heartbeat
-    // period.
-    if (failed_attempts_ > 0 && !even_if_queue_empty) {
-      sem_.Release();
-      return Status::OK();
-    }
+  if (PREDICT_FALSE(closed_)) {
+    return Status::IllegalState("Peer was closed.");
   }
 
-
-  RETURN_NOT_OK(thread_pool_->SubmitClosure(
-                  Bind(&Peer::SendNextRequest, Unretained(this), even_if_queue_empty)));
+  RETURN_NOT_OK(thread_pool_->SubmitFunc([=, s_this = shared_from_this()]() {
+        s_this->SendNextRequest(even_if_queue_empty);
+      }));
   return Status::OK();
 }
 
 void Peer::SendNextRequest(bool even_if_queue_empty) {
+  std::unique_lock<simple_spinlock> l(peer_lock_);
+  if (PREDICT_FALSE(closed_)) {
+    return;
+  }
+
+  // Only allow one request at a time.
+  if (request_pending_) {
+    return;
+  }
+
+  // For the first request sent by the peer, we send it even if the queue is empty,
+  // which it will always appear to be for the first request, since this is the
+  // negotiation round.
+  if (!has_sent_first_request_) {
+    even_if_queue_empty = true;
+    has_sent_first_request_ = true;
+  }
+
+  // If our last request generated an error, and this is not a normal
+  // heartbeat request, then don't send the "per-op" request. Instead,
+  // we'll wait for the heartbeat.
+  //
+  // TODO(todd): we could consider looking at the number of consecutive failed
+  // attempts, and instead of ignoring the signal, ask the heartbeater
+  // to "expedite" the next heartbeat in order to achieve something like
+  // exponential backoff after an error. As it is implemented today, any
+  // transient error will result in a latency blip as long as the heartbeat
+  // period.
+  if (failed_attempts_ > 0 && !even_if_queue_empty) {
+    return;
+  }
+
   // The peer has no pending request nor is sending: send the request.
   bool needs_tablet_copy = false;
   int64_t commit_index_before = request_.has_committed_index() ?
@@ -181,17 +182,25 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
   if (PREDICT_FALSE(!s.ok())) {
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Could not obtain request from queue for peer:
"
         << peer_pb_.permanent_uuid() << ". Status: " << s.ToString();
-    sem_.Release();
     return;
   }
 
   if (PREDICT_FALSE(needs_tablet_copy)) {
-    Status s = SendTabletCopyRequest();
+    Status s = PrepareTabletCopyRequest();
     if (!s.ok()) {
       LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to generate Tablet Copy request
for peer: "
                                         << s.ToString();
-      sem_.Release();
     }
+
+    controller_.Reset();
+    request_pending_ = true;
+    l.unlock();
+    // Capture a shared_ptr reference into the RPC callback so that we're guaranteed
+    // that this object outlives the RPC.
+    proxy_->StartTabletCopy(&tc_request_, &tc_response_, &controller_,
+                            [s_this = shared_from_this()]() {
+                              s_this->ProcessTabletCopyResponse();
+                            });
     return;
   }
 
@@ -203,7 +212,6 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
   // If the queue is empty, check if we were told to send a status-only
   // message, if not just return.
   if (PREDICT_FALSE(!req_has_ops && !even_if_queue_empty)) {
-    sem_.Release();
     return;
   }
 
@@ -220,15 +228,23 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
       << request_.ShortDebugString();
   controller_.Reset();
 
+  request_pending_ = true;
+  l.unlock();
+  // Capture a shared_ptr reference into the RPC callback so that we're guaranteed
+  // that this object outlives the RPC.
   proxy_->UpdateAsync(&request_, &response_, &controller_,
-                      boost::bind(&Peer::ProcessResponse, this));
+                      [s_this = shared_from_this()]() {
+                        s_this->ProcessResponse();
+                      });
 }
 
 void Peer::ProcessResponse() {
   // Note: This method runs on the reactor thread.
-
-  DCHECK_EQ(0, sem_.GetValue())
-    << "Got a response when nothing was pending";
+  std::unique_lock<simple_spinlock> lock(peer_lock_);
+  if (closed_) {
+    return;
+  }
+  CHECK(request_pending_);
 
   MAYBE_FAULT(FLAGS_fault_crash_after_leader_request_fraction);
 
@@ -263,16 +279,17 @@ void Peer::ProcessResponse() {
   // the WAL) and SendNextRequest() may do the same thing. So we run the rest
   // of the response handling logic on our thread pool and not on the reactor
   // thread.
-  Status s = thread_pool_->SubmitClosure(Bind(&Peer::DoProcessResponse, Unretained(this)));
+  Status s = thread_pool_->SubmitFunc([s_this = shared_from_this()]() {
+      s_this->DoProcessResponse();
+    });
   if (PREDICT_FALSE(!s.ok())) {
     LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to process peer response: " <<
s.ToString()
         << ": " << response_.ShortDebugString();
-    sem_.Release();
+    request_pending_ = false;
   }
 }
 
 void Peer::DoProcessResponse() {
-  failed_attempts_ = 0;
 
   VLOG_WITH_PREFIX_UNLOCKED(2) << "Response from peer " << peer_pb().permanent_uuid()
<< ": "
       << response_.ShortDebugString();
@@ -280,30 +297,42 @@ void Peer::DoProcessResponse() {
   bool more_pending;
   queue_->ResponseFromPeer(peer_pb_.permanent_uuid(), response_, &more_pending);
 
+  {
+    std::unique_lock<simple_spinlock> lock(peer_lock_);
+    CHECK(request_pending_);
+    failed_attempts_ = 0;
+    request_pending_ = false;
+  }
   // We're OK to read the state_ without a lock here -- if we get a race,
   // the worst thing that could happen is that we'll make one more request before
   // noticing a close.
-  if (more_pending && ANNOTATE_UNPROTECTED_READ(state_) != kPeerClosed) {
+  if (more_pending) {
     SendNextRequest(true);
-  } else {
-    sem_.Release();
   }
 }
 
-Status Peer::SendTabletCopyRequest() {
+Status Peer::PrepareTabletCopyRequest() {
   if (!FLAGS_enable_tablet_copy) {
     failed_attempts_++;
     return Status::NotSupported("Tablet Copy is disabled");
   }
 
   RETURN_NOT_OK(queue_->GetTabletCopyRequestForPeer(peer_pb_.permanent_uuid(), &tc_request_));
-  controller_.Reset();
-  proxy_->StartTabletCopy(&tc_request_, &tc_response_, &controller_,
-                          boost::bind(&Peer::ProcessTabletCopyResponse, this));
+
   return Status::OK();
 }
 
 void Peer::ProcessTabletCopyResponse() {
+  // If the peer is already closed return.
+  {
+    std::unique_lock<simple_spinlock> lock(peer_lock_);
+    if (closed_) {
+      return;
+    }
+    CHECK(request_pending_);
+    request_pending_ = false;
+  }
+
   if (controller_.status().ok() && tc_response_.has_error()) {
     // ALREADY_INPROGRESS is expected, so we do not log this error.
     if (tc_response_.error().code() ==
@@ -314,7 +343,6 @@ void Peer::ProcessTabletCopyResponse() {
                                         << tc_response_.ShortDebugString();
     }
   }
-  sem_.Release();
 }
 
 void Peer::ProcessResponseError(const Status& status) {
@@ -331,7 +359,7 @@ void Peer::ProcessResponseError(const Status& status) {
       << " Status: " << status.ToString() << "."
       << " Retrying in the next heartbeat period."
       << " Already tried " << failed_attempts_ << " times.";
-  sem_.Release();
+  request_pending_ = false;
 }
 
 string Peer::LogPrefixUnlocked() const {
@@ -346,23 +374,18 @@ void Peer::Close() {
   // If the peer is already closed return.
   {
     std::lock_guard<simple_spinlock> lock(peer_lock_);
-    if (state_ == kPeerClosed) return;
-    DCHECK(state_ == kPeerRunning || state_ == kPeerStarted) << "Unexpected state:
" << state_;
-    state_ = kPeerClosed;
+    if (closed_) return;
+    closed_ = true;
   }
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Closing peer: " << peer_pb_.permanent_uuid();
 
-  // Acquire the semaphore to wait for any concurrent request to finish.
-  // They will see the state_ == kPeerClosed and not start any new requests,
-  // but we can't currently cancel the already-sent ones. (see KUDU-699)
-  std::lock_guard<Semaphore> l(sem_);
   queue_->UntrackPeer(peer_pb_.permanent_uuid());
-  // We don't own the ops (the queue does).
-  request_.mutable_ops()->ExtractSubrange(0, request_.ops_size(), nullptr);
 }
 
 Peer::~Peer() {
   Close();
+  // We don't own the ops (the queue does).
+  request_.mutable_ops()->ExtractSubrange(0, request_.ops_size(), nullptr);
 }
 
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/a32ea341/src/kudu/consensus/consensus_peers.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.h b/src/kudu/consensus/consensus_peers.h
index 82fc8f9..4653963 100644
--- a/src/kudu/consensus/consensus_peers.h
+++ b/src/kudu/consensus/consensus_peers.h
@@ -73,9 +73,9 @@ class VoteResponsePB;
 // object, and performed on a thread pool (since it may do IO). When a
 // response is received, the peer updates the PeerMessageQueue
 // using PeerMessageQueue::ResponseFromPeer(...) on the same threadpool.
-class Peer {
+class Peer : public std::enable_shared_from_this<Peer> {
  public:
-  // Initializes a peer and get its status.
+  // Initializes a peer and start sending periodic heartbeats.
   Status Init();
 
   // Signals that this peer has a new request to replicate/store.
@@ -87,11 +87,16 @@ class Peer {
   const RaftPeerPB& peer_pb() const { return peer_pb_; }
 
   // Stop sending requests and periodic heartbeats.
-  // TODO(KUDU-699). This currently blocks until the most recent request
-  // has completed, which is problematic.
+  //
+  // This does not block waiting on any current outstanding requests to finish.
+  // However, when they do finish, the results will be disregarded, so this
+  // is safe to call at any point.
+  //
+  // This method must be called before the Peer's associated ThreadPool
+  // is destructed. Once this method returns, it is safe to destruct
+  // the ThreadPool.
   void Close();
 
-  // Calls Close() automatically.
   ~Peer();
 
   // Creates a new remote peer and makes the queue track it.'
@@ -106,7 +111,7 @@ class Peer {
                               PeerMessageQueue* queue,
                               ThreadPool* thread_pool,
                               gscoped_ptr<PeerProxy> proxy,
-                              gscoped_ptr<Peer>* peer);
+                              std::shared_ptr<Peer>* peer);
 
  private:
   Peer(const RaftPeerPB& peer_pb, std::string tablet_id, std::string leader_uuid,
@@ -124,12 +129,12 @@ class Peer {
   // Run on 'thread_pool'. Does response handling that requires IO or may block.
   void DoProcessResponse();
 
-  // Fetch the desired tablet copy request from the queue and send it
-  // to the peer. The callback goes to ProcessTabletCopyResponse().
+  // Fetch the desired tablet copy request from the queue and set up
+  // tc_request_ appropriately.
   //
   // Returns a bad Status if tablet copy is disabled, or if the
   // request cannot be generated for some reason.
-  Status SendTabletCopyRequest();
+  Status PrepareTabletCopyRequest();
 
   // Handle RPC callback from initiating tablet copy.
   void ProcessTabletCopyResponse();
@@ -167,13 +172,6 @@ class Peer {
 
   rpc::RpcController controller_;
 
-  // Held if there is an outstanding request.
-  // This is used in order to ensure that we only have a single request
-  // oustanding at a time, and to wait for the outstanding requests
-  // at Close().
-  Semaphore sem_;
-
-
   // Heartbeater for remote peer implementations.
   // This will send status only requests to the remote peers
   // whenever we go more than 'FLAGS_raft_heartbeat_interval_ms'
@@ -183,17 +181,12 @@ class Peer {
   // Thread pool used to construct requests to this peer.
   ThreadPool* thread_pool_;
 
-  enum State {
-    kPeerCreated,
-    kPeerStarted,
-    kPeerRunning,
-    kPeerClosed
-  };
-
   // lock that protects Peer state changes, initialization, etc.
-  // Must not try to acquire sem_ while holding peer_lock_.
   mutable simple_spinlock peer_lock_;
-  State state_;
+  bool request_pending_ = false;
+  bool closed_ = false;
+  bool has_sent_first_request_ = false;
+
 };
 
 // A proxy to another peer. Usually a thin wrapper around an rpc proxy but can

http://git-wip-us.apache.org/repos/asf/kudu/blob/a32ea341/src/kudu/consensus/peer_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/peer_manager.cc b/src/kudu/consensus/peer_manager.cc
index 6d39ff5..1f28faa 100644
--- a/src/kudu/consensus/peer_manager.cc
+++ b/src/kudu/consensus/peer_manager.cc
@@ -51,16 +51,12 @@ PeerManager::~PeerManager() {
 }
 
 Status PeerManager::UpdateRaftConfig(const RaftConfigPB& config) {
-  unordered_set<string> new_peers;
-
   VLOG(1) << "Updating peers from new config: " << config.ShortDebugString();
 
   std::lock_guard<simple_spinlock> lock(lock_);
   // Create new peers
   for (const RaftPeerPB& peer_pb : config.peers()) {
-    new_peers.insert(peer_pb.permanent_uuid());
-    Peer* peer = FindPtrOrNull(peers_, peer_pb.permanent_uuid());
-    if (peer != nullptr) {
+    if (ContainsKey(peers_, peer_pb.permanent_uuid())) {
       continue;
     }
     if (peer_pb.permanent_uuid() == local_uuid_) {
@@ -72,7 +68,7 @@ Status PeerManager::UpdateRaftConfig(const RaftConfigPB& config) {
     RETURN_NOT_OK_PREPEND(peer_proxy_factory_->NewProxy(peer_pb, &peer_proxy),
                           "Could not obtain a remote proxy to the peer.");
 
-    gscoped_ptr<Peer> remote_peer;
+    std::shared_ptr<Peer> remote_peer;
     RETURN_NOT_OK(Peer::NewRemotePeer(peer_pb,
                                       tablet_id_,
                                       local_uuid_,
@@ -80,7 +76,7 @@ Status PeerManager::UpdateRaftConfig(const RaftConfigPB& config) {
                                       thread_pool_,
                                       std::move(peer_proxy),
                                       &remote_peer));
-    InsertOrDie(&peers_, peer_pb.permanent_uuid(), remote_peer.release());
+    peers_.emplace(peer_pb.permanent_uuid(), std::move(remote_peer));
   }
 
   return Status::OK();
@@ -88,14 +84,15 @@ Status PeerManager::UpdateRaftConfig(const RaftConfigPB& config) {
 
 void PeerManager::SignalRequest(bool force_if_queue_empty) {
   std::lock_guard<simple_spinlock> lock(lock_);
-  auto iter = peers_.begin();
-  for (; iter != peers_.end(); iter++) {
+  for (auto iter = peers_.begin(); iter != peers_.end();) {
     Status s = (*iter).second->SignalRequest(force_if_queue_empty);
     if (PREDICT_FALSE(!s.ok())) {
       LOG(WARNING) << GetLogPrefix()
                    << "Peer was closed, removing from peers. Peer: "
                    << (*iter).second->peer_pb().ShortDebugString();
-      peers_.erase(iter);
+      peers_.erase(iter++);
+    } else {
+      ++iter;
     }
   }
 }
@@ -103,10 +100,10 @@ void PeerManager::SignalRequest(bool force_if_queue_empty) {
 void PeerManager::Close() {
   {
     std::lock_guard<simple_spinlock> lock(lock_);
-    for (const PeersMap::value_type& entry : peers_) {
+    for (const auto& entry : peers_) {
       entry.second->Close();
     }
-    STLDeleteValues(&peers_);
+    peers_.clear();
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/a32ea341/src/kudu/consensus/peer_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/peer_manager.h b/src/kudu/consensus/peer_manager.h
index ebbe5f5..62babc7 100644
--- a/src/kudu/consensus/peer_manager.h
+++ b/src/kudu/consensus/peer_manager.h
@@ -71,7 +71,7 @@ class PeerManager {
  private:
   std::string GetLogPrefix() const;
 
-  typedef std::unordered_map<std::string, Peer*> PeersMap;
+  typedef std::unordered_map<std::string, std::shared_ptr<Peer>> PeersMap;
   const std::string tablet_id_;
   const std::string local_uuid_;
   PeerProxyFactory* peer_proxy_factory_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/a32ea341/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 4d284e4..86639d3 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -1632,6 +1632,40 @@ TEST_F(RaftConsensusITest, TestLeaderStepDown) {
                                   << s.ToString();
 }
 
+// Test for KUDU-699: sets the consensus RPC timeout to be long,
+// and freezes both followers before asking the leader to step down.
+// Prior to fixing KUDU-699, the step-down process would block
+// until the pending requests timed out.
+TEST_F(RaftConsensusITest, TestStepDownWithSlowFollower) {
+  vector<string> ts_flags = {
+    "--enable_leader_failure_detection=false",
+    // Bump up the RPC timeout, so that we can verify that the stepdown responds
+    // quickly even when an outbound request is hung.
+    "--consensus_rpc_timeout_ms=15000"
+  };
+  vector<string> master_flags = {
+    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"
+  };
+  BuildAndStart(ts_flags, master_flags);
+
+  vector<TServerDetails*> tservers;
+  AppendValuesFromMap(tablet_servers_, &tservers);
+  ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
+  ASSERT_OK(WaitUntilLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
+
+  // Stop both followers.
+  for (int i = 1; i < 3; i++) {
+    cluster_->tablet_server_by_uuid(tservers[i]->uuid())->Pause();
+  }
+
+  // Sleep a little bit of time to make sure that the leader has outstanding heartbeats
+  // to the paused followers before requesting the stepdown.
+  SleepFor(MonoDelta::FromSeconds(1));
+
+  // Step down should respond quickly despite the hung requests.
+  ASSERT_OK(LeaderStepDown(tservers[0], tablet_id_, MonoDelta::FromSeconds(3)));
+}
+
 void RaftConsensusITest::AssertMajorityRequiredForElectionsAndWrites(
     const TabletServerMap& tablet_servers, const string& leader_uuid) {
 


Mime
View raw message