Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 27796200BE1 for ; Mon, 19 Dec 2016 10:11:46 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 2604D160B21; Mon, 19 Dec 2016 09:11:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C5A4D160B18 for ; Mon, 19 Dec 2016 10:11:44 +0100 (CET) Received: (qmail 35547 invoked by uid 500); 19 Dec 2016 09:11:44 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 35533 invoked by uid 99); 19 Dec 2016 09:11:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Dec 2016 09:11:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E2C61DFB86; Mon, 19 Dec 2016 09:11:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: todd@apache.org To: commits@kudu.apache.org Date: Mon, 19 Dec 2016 09:11:43 -0000 Message-Id: <717b9e2916314aa78c521d54d187a410@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] kudu git commit: KUDU-699. consensus: Peer::Close() should not block on outstanding requests archived-at: Mon, 19 Dec 2016 09:11:46 -0000 Repository: kudu Updated Branches: refs/heads/master 2840a586d -> b9dadad6b KUDU-699. consensus: Peer::Close() should not block on outstanding requests Prior to this patch, Peer::Close() would wait on a semaphore which was held by outbound RPC requests. This meant that, in the case of the default 1 second consensus RPC timeout, Close() would commonly block for a fairly long time. This blocking wasn't necessary for semantic correctness: even if the response came back with some kind of successful result, the result would be ignored because the leader was already in the process of stepping down. Instead, the blocking was an implementation detail: the outstanding RPC requests needed to keep alive the RPC request/response protobufs until they finished, and those protobufs are owned by the Peer object. In addition to being unnecessary, the blocking actually causes a couple serious issues: 1) in an overloaded cluster, we often see a lot of leader election churn and slow UpdateConsensus() calls. When UpdateConsensus() calls are slow, this would cause leader step-down to also be slow because of the PeerManager::Close() call taking a long time. This slow step-down process would hold an RPC thread, increasing the possibility of other RPCs getting rejected, retried, etc, contributing to the overall problems on the cluster. This is often visible in 'pstack' as 5-10 threads stuck in 'PeerManager::Close()' 2) KUDU-1788 describes an issue in which the short timeout we're currently using for consensus RPCs ends up resulting in those RPCs never succeeding, and wasting a lot of network bandwidth with repeated retries. Part of the solution to this issue is likely to involve boosting the timeout. With a longer RPC timeout, the blocking behavior on Close() described above is even more problematic. This patch fixes the issue as follows: 1) Peers now have shared ownership and inherit from std::enable_shared_from_this. When we make an RPC, we bind a shared_ptr to the Peer in the RPC's callback. This ensures that the Peer will not be destructed while an RPC is still in-flight. 2) We no longer need to use the 'sem_' variable to track whether an RPC is in flight. The purpose of this semaphore was two-fold: (a) to cause Close() to block, and (b) to prevent a second RPC from being sent while one was already in flight. The former purpose is no longer a goal. The latter purpose is attained just as easily using a simple boolean. So, this patch removes the semaphore and instead just uses a 'request_pending_' boolean. 3) While making these changes, I removed the 'state_' member and enum. The state was used to flag that Close() had been called, and to flag whether the first request had been sent yet. I replaced the state with two booleans, which I found simpler to reason about. A new test is included which sets the consensus RPC timeout to be long, stops, a follower, and then asks the leader to step down. Prior to this patch, the step-down would take as long as the consensus RPC timeout and cause the test to fail. With this patch, the step-down occurs immediately. Change-Id: I4e1bc80f536defad28f4d7b51fb95aa32dc9fca0 Reviewed-on: http://gerrit.cloudera.org:8080/5490 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a32ea341 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a32ea341 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a32ea341 Branch: refs/heads/master Commit: a32ea34148249b92c4428322f5778665b6fb1357 Parents: 2840a58 Author: Todd Lipcon Authored: Tue Dec 13 16:12:36 2016 +0700 Committer: Todd Lipcon Committed: Mon Dec 19 09:08:50 2016 +0000 ---------------------------------------------------------------------- src/kudu/consensus/consensus_peers-test.cc | 15 +- src/kudu/consensus/consensus_peers.cc | 185 +++++++++++-------- src/kudu/consensus/consensus_peers.h | 43 ++--- src/kudu/consensus/peer_manager.cc | 21 +-- src/kudu/consensus/peer_manager.h | 2 +- .../integration-tests/raft_consensus-itest.cc | 34 ++++ 6 files changed, 175 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/a32ea341/src/kudu/consensus/consensus_peers-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus_peers-test.cc b/src/kudu/consensus/consensus_peers-test.cc index 626afeb..40ba0d2 100644 --- a/src/kudu/consensus/consensus_peers-test.cc +++ b/src/kudu/consensus/consensus_peers-test.cc @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include + #include #include "kudu/common/schema.h" @@ -38,6 +40,7 @@ namespace consensus { using log::Log; using log::LogOptions; +using std::shared_ptr; const char* kTabletId = "test-peers-tablet"; const char* kLeaderUuid = "peer-0"; @@ -80,7 +83,7 @@ class ConsensusPeersTest : public KuduTest { DelayablePeerProxy* NewRemotePeer( const string& peer_name, - gscoped_ptr* peer) { + shared_ptr* peer) { RaftPeerPB peer_pb; peer_pb.set_permanent_uuid(peer_name); auto proxy_ptr = new DelayablePeerProxy( @@ -145,7 +148,7 @@ TEST_F(ConsensusPeersTest, TestRemotePeer) { kMinimumTerm, BuildRaftConfigPBForTests(3)); - gscoped_ptr remote_peer; + shared_ptr remote_peer; DelayablePeerProxy* proxy = NewRemotePeer(kFollowerUuid, &remote_peer); @@ -170,11 +173,11 @@ TEST_F(ConsensusPeersTest, TestRemotePeers) { BuildRaftConfigPBForTests(3)); // Create a set of remote peers - gscoped_ptr remote_peer1; + shared_ptr remote_peer1; DelayablePeerProxy* remote_peer1_proxy = NewRemotePeer("peer-1", &remote_peer1); - gscoped_ptr remote_peer2; + shared_ptr remote_peer2; DelayablePeerProxy* remote_peer2_proxy = NewRemotePeer("peer-2", &remote_peer2); @@ -229,7 +232,7 @@ TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) { BuildRaftConfigPBForTests(3)); auto mock_proxy = new MockedPeerProxy(pool_.get()); - gscoped_ptr peer; + shared_ptr peer; ASSERT_OK(Peer::NewRemotePeer(FakeRaftPeerPB(kFollowerUuid), kTabletId, kLeaderUuid, @@ -267,7 +270,7 @@ TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) { BuildRaftConfigPBForTests(3)); auto mock_proxy = new MockedPeerProxy(pool_.get()); - gscoped_ptr peer; + shared_ptr peer; ASSERT_OK(Peer::NewRemotePeer(FakeRaftPeerPB(kFollowerUuid), kTabletId, kLeaderUuid, http://git-wip-us.apache.org/repos/asf/kudu/blob/a32ea341/src/kudu/consensus/consensus_peers.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc index 92bb35a..5102dcd 100644 --- a/src/kudu/consensus/consensus_peers.cc +++ b/src/kudu/consensus/consensus_peers.cc @@ -40,6 +40,10 @@ #include "kudu/util/net/net_util.h" #include "kudu/util/threadpool.h" +// This file uses C++14 'generalized lambda capture' syntax, which is supported +// in C++11 mode both by clang and by GCC. Disable the accompanying warning. +#pragma clang diagnostic ignored "-Wc++14-extensions" + DEFINE_int32(consensus_rpc_timeout_ms, 1000, "Timeout used for all consensus internal RPC communications."); TAG_FLAG(consensus_rpc_timeout_ms, advanced); @@ -84,16 +88,16 @@ Status Peer::NewRemotePeer(const RaftPeerPB& peer_pb, PeerMessageQueue* queue, ThreadPool* thread_pool, gscoped_ptr proxy, - gscoped_ptr* peer) { - - gscoped_ptr new_peer(new Peer(peer_pb, - tablet_id, - leader_uuid, - std::move(proxy), - queue, - thread_pool)); + shared_ptr* peer) { + + shared_ptr new_peer(new Peer(peer_pb, + tablet_id, + leader_uuid, + std::move(proxy), + queue, + thread_pool)); RETURN_NOT_OK(new_peer->Init()); - peer->reset(new_peer.release()); + *peer = std::move(new_peer); return Status::OK(); } @@ -106,69 +110,66 @@ Peer::Peer(const RaftPeerPB& peer_pb, string tablet_id, string leader_uuid, proxy_(std::move(proxy)), queue_(queue), failed_attempts_(0), - sem_(1), heartbeater_( peer_pb.permanent_uuid(), MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms), boost::bind(&Peer::SignalRequest, this, true)), - thread_pool_(thread_pool), - state_(kPeerCreated) {} - + thread_pool_(thread_pool) { +} Status Peer::Init() { std::lock_guard lock(peer_lock_); queue_->TrackPeer(peer_pb_.permanent_uuid()); RETURN_NOT_OK(heartbeater_.Start()); - state_ = kPeerStarted; return Status::OK(); } Status Peer::SignalRequest(bool even_if_queue_empty) { - // If the peer is currently sending, return Status::OK(). - // If there are new requests in the queue we'll get them on ProcessResponse(). - if (!sem_.TryAcquire()) { - return Status::OK(); - } - { - std::lock_guard l(peer_lock_); + std::lock_guard l(peer_lock_); - if (PREDICT_FALSE(state_ == kPeerClosed)) { - sem_.Release(); - return Status::IllegalState("Peer was closed."); - } - - // For the first request sent by the peer, we send it even if the queue is empty, - // which it will always appear to be for the first request, since this is the - // negotiation round. - if (PREDICT_FALSE(state_ == kPeerStarted)) { - even_if_queue_empty = true; - state_ = kPeerRunning; - } - DCHECK_EQ(state_, kPeerRunning); - - // If our last request generated an error, and this is not a normal - // heartbeat request, then don't send the "per-RPC" request. Instead, - // we'll wait for the heartbeat. - // - // TODO: we could consider looking at the number of consecutive failed - // attempts, and instead of ignoring the signal, ask the heartbeater - // to "expedite" the next heartbeat in order to achieve something like - // exponential backoff after an error. As it is implemented today, any - // transient error will result in a latency blip as long as the heartbeat - // period. - if (failed_attempts_ > 0 && !even_if_queue_empty) { - sem_.Release(); - return Status::OK(); - } + if (PREDICT_FALSE(closed_)) { + return Status::IllegalState("Peer was closed."); } - - RETURN_NOT_OK(thread_pool_->SubmitClosure( - Bind(&Peer::SendNextRequest, Unretained(this), even_if_queue_empty))); + RETURN_NOT_OK(thread_pool_->SubmitFunc([=, s_this = shared_from_this()]() { + s_this->SendNextRequest(even_if_queue_empty); + })); return Status::OK(); } void Peer::SendNextRequest(bool even_if_queue_empty) { + std::unique_lock l(peer_lock_); + if (PREDICT_FALSE(closed_)) { + return; + } + + // Only allow one request at a time. + if (request_pending_) { + return; + } + + // For the first request sent by the peer, we send it even if the queue is empty, + // which it will always appear to be for the first request, since this is the + // negotiation round. + if (!has_sent_first_request_) { + even_if_queue_empty = true; + has_sent_first_request_ = true; + } + + // If our last request generated an error, and this is not a normal + // heartbeat request, then don't send the "per-op" request. Instead, + // we'll wait for the heartbeat. + // + // TODO(todd): we could consider looking at the number of consecutive failed + // attempts, and instead of ignoring the signal, ask the heartbeater + // to "expedite" the next heartbeat in order to achieve something like + // exponential backoff after an error. As it is implemented today, any + // transient error will result in a latency blip as long as the heartbeat + // period. + if (failed_attempts_ > 0 && !even_if_queue_empty) { + return; + } + // The peer has no pending request nor is sending: send the request. bool needs_tablet_copy = false; int64_t commit_index_before = request_.has_committed_index() ? @@ -181,17 +182,25 @@ void Peer::SendNextRequest(bool even_if_queue_empty) { if (PREDICT_FALSE(!s.ok())) { LOG_WITH_PREFIX_UNLOCKED(INFO) << "Could not obtain request from queue for peer: " << peer_pb_.permanent_uuid() << ". Status: " << s.ToString(); - sem_.Release(); return; } if (PREDICT_FALSE(needs_tablet_copy)) { - Status s = SendTabletCopyRequest(); + Status s = PrepareTabletCopyRequest(); if (!s.ok()) { LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to generate Tablet Copy request for peer: " << s.ToString(); - sem_.Release(); } + + controller_.Reset(); + request_pending_ = true; + l.unlock(); + // Capture a shared_ptr reference into the RPC callback so that we're guaranteed + // that this object outlives the RPC. + proxy_->StartTabletCopy(&tc_request_, &tc_response_, &controller_, + [s_this = shared_from_this()]() { + s_this->ProcessTabletCopyResponse(); + }); return; } @@ -203,7 +212,6 @@ void Peer::SendNextRequest(bool even_if_queue_empty) { // If the queue is empty, check if we were told to send a status-only // message, if not just return. if (PREDICT_FALSE(!req_has_ops && !even_if_queue_empty)) { - sem_.Release(); return; } @@ -220,15 +228,23 @@ void Peer::SendNextRequest(bool even_if_queue_empty) { << request_.ShortDebugString(); controller_.Reset(); + request_pending_ = true; + l.unlock(); + // Capture a shared_ptr reference into the RPC callback so that we're guaranteed + // that this object outlives the RPC. proxy_->UpdateAsync(&request_, &response_, &controller_, - boost::bind(&Peer::ProcessResponse, this)); + [s_this = shared_from_this()]() { + s_this->ProcessResponse(); + }); } void Peer::ProcessResponse() { // Note: This method runs on the reactor thread. - - DCHECK_EQ(0, sem_.GetValue()) - << "Got a response when nothing was pending"; + std::unique_lock lock(peer_lock_); + if (closed_) { + return; + } + CHECK(request_pending_); MAYBE_FAULT(FLAGS_fault_crash_after_leader_request_fraction); @@ -263,16 +279,17 @@ void Peer::ProcessResponse() { // the WAL) and SendNextRequest() may do the same thing. So we run the rest // of the response handling logic on our thread pool and not on the reactor // thread. - Status s = thread_pool_->SubmitClosure(Bind(&Peer::DoProcessResponse, Unretained(this))); + Status s = thread_pool_->SubmitFunc([s_this = shared_from_this()]() { + s_this->DoProcessResponse(); + }); if (PREDICT_FALSE(!s.ok())) { LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to process peer response: " << s.ToString() << ": " << response_.ShortDebugString(); - sem_.Release(); + request_pending_ = false; } } void Peer::DoProcessResponse() { - failed_attempts_ = 0; VLOG_WITH_PREFIX_UNLOCKED(2) << "Response from peer " << peer_pb().permanent_uuid() << ": " << response_.ShortDebugString(); @@ -280,30 +297,42 @@ void Peer::DoProcessResponse() { bool more_pending; queue_->ResponseFromPeer(peer_pb_.permanent_uuid(), response_, &more_pending); + { + std::unique_lock lock(peer_lock_); + CHECK(request_pending_); + failed_attempts_ = 0; + request_pending_ = false; + } // We're OK to read the state_ without a lock here -- if we get a race, // the worst thing that could happen is that we'll make one more request before // noticing a close. - if (more_pending && ANNOTATE_UNPROTECTED_READ(state_) != kPeerClosed) { + if (more_pending) { SendNextRequest(true); - } else { - sem_.Release(); } } -Status Peer::SendTabletCopyRequest() { +Status Peer::PrepareTabletCopyRequest() { if (!FLAGS_enable_tablet_copy) { failed_attempts_++; return Status::NotSupported("Tablet Copy is disabled"); } RETURN_NOT_OK(queue_->GetTabletCopyRequestForPeer(peer_pb_.permanent_uuid(), &tc_request_)); - controller_.Reset(); - proxy_->StartTabletCopy(&tc_request_, &tc_response_, &controller_, - boost::bind(&Peer::ProcessTabletCopyResponse, this)); + return Status::OK(); } void Peer::ProcessTabletCopyResponse() { + // If the peer is already closed return. + { + std::unique_lock lock(peer_lock_); + if (closed_) { + return; + } + CHECK(request_pending_); + request_pending_ = false; + } + if (controller_.status().ok() && tc_response_.has_error()) { // ALREADY_INPROGRESS is expected, so we do not log this error. if (tc_response_.error().code() == @@ -314,7 +343,6 @@ void Peer::ProcessTabletCopyResponse() { << tc_response_.ShortDebugString(); } } - sem_.Release(); } void Peer::ProcessResponseError(const Status& status) { @@ -331,7 +359,7 @@ void Peer::ProcessResponseError(const Status& status) { << " Status: " << status.ToString() << "." << " Retrying in the next heartbeat period." << " Already tried " << failed_attempts_ << " times."; - sem_.Release(); + request_pending_ = false; } string Peer::LogPrefixUnlocked() const { @@ -346,23 +374,18 @@ void Peer::Close() { // If the peer is already closed return. { std::lock_guard lock(peer_lock_); - if (state_ == kPeerClosed) return; - DCHECK(state_ == kPeerRunning || state_ == kPeerStarted) << "Unexpected state: " << state_; - state_ = kPeerClosed; + if (closed_) return; + closed_ = true; } LOG_WITH_PREFIX_UNLOCKED(INFO) << "Closing peer: " << peer_pb_.permanent_uuid(); - // Acquire the semaphore to wait for any concurrent request to finish. - // They will see the state_ == kPeerClosed and not start any new requests, - // but we can't currently cancel the already-sent ones. (see KUDU-699) - std::lock_guard l(sem_); queue_->UntrackPeer(peer_pb_.permanent_uuid()); - // We don't own the ops (the queue does). - request_.mutable_ops()->ExtractSubrange(0, request_.ops_size(), nullptr); } Peer::~Peer() { Close(); + // We don't own the ops (the queue does). + request_.mutable_ops()->ExtractSubrange(0, request_.ops_size(), nullptr); } http://git-wip-us.apache.org/repos/asf/kudu/blob/a32ea341/src/kudu/consensus/consensus_peers.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus_peers.h b/src/kudu/consensus/consensus_peers.h index 82fc8f9..4653963 100644 --- a/src/kudu/consensus/consensus_peers.h +++ b/src/kudu/consensus/consensus_peers.h @@ -73,9 +73,9 @@ class VoteResponsePB; // object, and performed on a thread pool (since it may do IO). When a // response is received, the peer updates the PeerMessageQueue // using PeerMessageQueue::ResponseFromPeer(...) on the same threadpool. -class Peer { +class Peer : public std::enable_shared_from_this { public: - // Initializes a peer and get its status. + // Initializes a peer and start sending periodic heartbeats. Status Init(); // Signals that this peer has a new request to replicate/store. @@ -87,11 +87,16 @@ class Peer { const RaftPeerPB& peer_pb() const { return peer_pb_; } // Stop sending requests and periodic heartbeats. - // TODO(KUDU-699). This currently blocks until the most recent request - // has completed, which is problematic. + // + // This does not block waiting on any current outstanding requests to finish. + // However, when they do finish, the results will be disregarded, so this + // is safe to call at any point. + // + // This method must be called before the Peer's associated ThreadPool + // is destructed. Once this method returns, it is safe to destruct + // the ThreadPool. void Close(); - // Calls Close() automatically. ~Peer(); // Creates a new remote peer and makes the queue track it.' @@ -106,7 +111,7 @@ class Peer { PeerMessageQueue* queue, ThreadPool* thread_pool, gscoped_ptr proxy, - gscoped_ptr* peer); + std::shared_ptr* peer); private: Peer(const RaftPeerPB& peer_pb, std::string tablet_id, std::string leader_uuid, @@ -124,12 +129,12 @@ class Peer { // Run on 'thread_pool'. Does response handling that requires IO or may block. void DoProcessResponse(); - // Fetch the desired tablet copy request from the queue and send it - // to the peer. The callback goes to ProcessTabletCopyResponse(). + // Fetch the desired tablet copy request from the queue and set up + // tc_request_ appropriately. // // Returns a bad Status if tablet copy is disabled, or if the // request cannot be generated for some reason. - Status SendTabletCopyRequest(); + Status PrepareTabletCopyRequest(); // Handle RPC callback from initiating tablet copy. void ProcessTabletCopyResponse(); @@ -167,13 +172,6 @@ class Peer { rpc::RpcController controller_; - // Held if there is an outstanding request. - // This is used in order to ensure that we only have a single request - // oustanding at a time, and to wait for the outstanding requests - // at Close(). - Semaphore sem_; - - // Heartbeater for remote peer implementations. // This will send status only requests to the remote peers // whenever we go more than 'FLAGS_raft_heartbeat_interval_ms' @@ -183,17 +181,12 @@ class Peer { // Thread pool used to construct requests to this peer. ThreadPool* thread_pool_; - enum State { - kPeerCreated, - kPeerStarted, - kPeerRunning, - kPeerClosed - }; - // lock that protects Peer state changes, initialization, etc. - // Must not try to acquire sem_ while holding peer_lock_. mutable simple_spinlock peer_lock_; - State state_; + bool request_pending_ = false; + bool closed_ = false; + bool has_sent_first_request_ = false; + }; // A proxy to another peer. Usually a thin wrapper around an rpc proxy but can http://git-wip-us.apache.org/repos/asf/kudu/blob/a32ea341/src/kudu/consensus/peer_manager.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/peer_manager.cc b/src/kudu/consensus/peer_manager.cc index 6d39ff5..1f28faa 100644 --- a/src/kudu/consensus/peer_manager.cc +++ b/src/kudu/consensus/peer_manager.cc @@ -51,16 +51,12 @@ PeerManager::~PeerManager() { } Status PeerManager::UpdateRaftConfig(const RaftConfigPB& config) { - unordered_set new_peers; - VLOG(1) << "Updating peers from new config: " << config.ShortDebugString(); std::lock_guard lock(lock_); // Create new peers for (const RaftPeerPB& peer_pb : config.peers()) { - new_peers.insert(peer_pb.permanent_uuid()); - Peer* peer = FindPtrOrNull(peers_, peer_pb.permanent_uuid()); - if (peer != nullptr) { + if (ContainsKey(peers_, peer_pb.permanent_uuid())) { continue; } if (peer_pb.permanent_uuid() == local_uuid_) { @@ -72,7 +68,7 @@ Status PeerManager::UpdateRaftConfig(const RaftConfigPB& config) { RETURN_NOT_OK_PREPEND(peer_proxy_factory_->NewProxy(peer_pb, &peer_proxy), "Could not obtain a remote proxy to the peer."); - gscoped_ptr remote_peer; + std::shared_ptr remote_peer; RETURN_NOT_OK(Peer::NewRemotePeer(peer_pb, tablet_id_, local_uuid_, @@ -80,7 +76,7 @@ Status PeerManager::UpdateRaftConfig(const RaftConfigPB& config) { thread_pool_, std::move(peer_proxy), &remote_peer)); - InsertOrDie(&peers_, peer_pb.permanent_uuid(), remote_peer.release()); + peers_.emplace(peer_pb.permanent_uuid(), std::move(remote_peer)); } return Status::OK(); @@ -88,14 +84,15 @@ Status PeerManager::UpdateRaftConfig(const RaftConfigPB& config) { void PeerManager::SignalRequest(bool force_if_queue_empty) { std::lock_guard lock(lock_); - auto iter = peers_.begin(); - for (; iter != peers_.end(); iter++) { + for (auto iter = peers_.begin(); iter != peers_.end();) { Status s = (*iter).second->SignalRequest(force_if_queue_empty); if (PREDICT_FALSE(!s.ok())) { LOG(WARNING) << GetLogPrefix() << "Peer was closed, removing from peers. Peer: " << (*iter).second->peer_pb().ShortDebugString(); - peers_.erase(iter); + peers_.erase(iter++); + } else { + ++iter; } } } @@ -103,10 +100,10 @@ void PeerManager::SignalRequest(bool force_if_queue_empty) { void PeerManager::Close() { { std::lock_guard lock(lock_); - for (const PeersMap::value_type& entry : peers_) { + for (const auto& entry : peers_) { entry.second->Close(); } - STLDeleteValues(&peers_); + peers_.clear(); } } http://git-wip-us.apache.org/repos/asf/kudu/blob/a32ea341/src/kudu/consensus/peer_manager.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/peer_manager.h b/src/kudu/consensus/peer_manager.h index ebbe5f5..62babc7 100644 --- a/src/kudu/consensus/peer_manager.h +++ b/src/kudu/consensus/peer_manager.h @@ -71,7 +71,7 @@ class PeerManager { private: std::string GetLogPrefix() const; - typedef std::unordered_map PeersMap; + typedef std::unordered_map> PeersMap; const std::string tablet_id_; const std::string local_uuid_; PeerProxyFactory* peer_proxy_factory_; http://git-wip-us.apache.org/repos/asf/kudu/blob/a32ea341/src/kudu/integration-tests/raft_consensus-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc index 4d284e4..86639d3 100644 --- a/src/kudu/integration-tests/raft_consensus-itest.cc +++ b/src/kudu/integration-tests/raft_consensus-itest.cc @@ -1632,6 +1632,40 @@ TEST_F(RaftConsensusITest, TestLeaderStepDown) { << s.ToString(); } +// Test for KUDU-699: sets the consensus RPC timeout to be long, +// and freezes both followers before asking the leader to step down. +// Prior to fixing KUDU-699, the step-down process would block +// until the pending requests timed out. +TEST_F(RaftConsensusITest, TestStepDownWithSlowFollower) { + vector ts_flags = { + "--enable_leader_failure_detection=false", + // Bump up the RPC timeout, so that we can verify that the stepdown responds + // quickly even when an outbound request is hung. + "--consensus_rpc_timeout_ms=15000" + }; + vector master_flags = { + "--catalog_manager_wait_for_new_tablets_to_elect_leader=false" + }; + BuildAndStart(ts_flags, master_flags); + + vector tservers; + AppendValuesFromMap(tablet_servers_, &tservers); + ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10))); + ASSERT_OK(WaitUntilLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10))); + + // Stop both followers. + for (int i = 1; i < 3; i++) { + cluster_->tablet_server_by_uuid(tservers[i]->uuid())->Pause(); + } + + // Sleep a little bit of time to make sure that the leader has outstanding heartbeats + // to the paused followers before requesting the stepdown. + SleepFor(MonoDelta::FromSeconds(1)); + + // Step down should respond quickly despite the hung requests. + ASSERT_OK(LeaderStepDown(tservers[0], tablet_id_, MonoDelta::FromSeconds(3))); +} + void RaftConsensusITest::AssertMajorityRequiredForElectionsAndWrites( const TabletServerMap& tablet_servers, const string& leader_uuid) {