Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F1704200CEC for ; Tue, 1 Aug 2017 02:16:43 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EFC94166353; Tue, 1 Aug 2017 00:16:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2052F16634E for ; Tue, 1 Aug 2017 02:16:41 +0200 (CEST) Received: (qmail 97278 invoked by uid 500); 1 Aug 2017 00:16:41 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 97264 invoked by uid 99); 1 Aug 2017 00:16:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Aug 2017 00:16:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2DFA8DFC28; Tue, 1 Aug 2017 00:16:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: adar@apache.org To: commits@kudu.apache.org Message-Id: <95c1a6cdd0164bf2a224624e1fc5124f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kudu git commit: consensus: switch RaftConsensus to shared_ptr Date: Tue, 1 Aug 2017 00:16:40 +0000 (UTC) archived-at: Tue, 01 Aug 2017 00:16:44 -0000 Repository: kudu Updated Branches: refs/heads/master a21a50f87 -> 1c0276a98 consensus: switch RaftConsensus to shared_ptr Using shared_ptr instead of scoped_refptr means we can create weak pointers to RaftConsensus, and I'd like to take advantage of that in future work. The change is largely mechanical. The interesting part is the corresponding switch from kudu::{Bind,Callback} to std::{bind,function}. To maintain equivalent ownership semantics, the following conversions to bound arguments are needed: - Unretained(this) -> this - this -> shared_from_this() I also took the liberty of converting several pass-by-cref functions to use move semantics. Change-Id: I7fcaf5b7e4c4ce19126972fa0a81764b7da34e48 Reviewed-on: http://gerrit.cloudera.org:8080/7531 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1c0276a9 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1c0276a9 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1c0276a9 Branch: refs/heads/master Commit: 1c0276a98cad2fbb91ac1b21a68961bd50265f9c Parents: a21a50f Author: Adar Dembo Authored: Thu Jul 27 15:12:42 2017 -0700 Committer: Adar Dembo Committed: Tue Aug 1 00:15:11 2017 +0000 ---------------------------------------------------------------------- src/kudu/consensus/consensus-test-util.h | 19 ++-- src/kudu/consensus/leader_election-test.cc | 21 ++-- src/kudu/consensus/leader_election.cc | 2 +- src/kudu/consensus/leader_election.h | 4 +- src/kudu/consensus/raft_consensus.cc | 105 +++++++++++-------- src/kudu/consensus/raft_consensus.h | 30 +++--- .../consensus/raft_consensus_quorum-test.cc | 48 ++++----- src/kudu/master/catalog_manager.cc | 2 +- src/kudu/master/sys_catalog.cc | 2 +- src/kudu/tablet/tablet_replica.cc | 13 ++- src/kudu/tablet/tablet_replica.h | 4 +- .../tablet/transactions/transaction_driver.cc | 7 +- src/kudu/tserver/tablet_copy_source_session.cc | 2 +- src/kudu/tserver/tablet_service.cc | 33 +++--- src/kudu/tserver/ts_tablet_manager.cc | 4 +- src/kudu/tserver/tserver-path-handlers.cc | 4 +- src/kudu/util/async_util.h | 38 +++++-- src/kudu/util/status_callback.h | 6 ++ 18 files changed, 207 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/consensus/consensus-test-util.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h index de3e4ea..b000dde 100644 --- a/src/kudu/consensus/consensus-test-util.h +++ b/src/kudu/consensus/consensus-test-util.h @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include #include #include #include @@ -369,25 +370,25 @@ class NoOpTestPeerProxyFactory : public PeerProxyFactory { std::shared_ptr messenger_; }; -typedef std::unordered_map > TestPeerMap; +typedef std::unordered_map> TestPeerMap; // Thread-safe manager for list of peers being used in tests. class TestPeerMapManager { public: explicit TestPeerMapManager(RaftConfigPB config) : config_(std::move(config)) {} - void AddPeer(const std::string& peer_uuid, const scoped_refptr& peer) { + void AddPeer(const std::string& peer_uuid, const std::shared_ptr& peer) { std::lock_guard lock(lock_); InsertOrDie(&peers_, peer_uuid, peer); } - Status GetPeerByIdx(int idx, scoped_refptr* peer_out) const { + Status GetPeerByIdx(int idx, std::shared_ptr* peer_out) const { CHECK_LT(idx, config_.peers_size()); return GetPeerByUuid(config_.peers(idx).permanent_uuid(), peer_out); } Status GetPeerByUuid(const std::string& peer_uuid, - scoped_refptr* peer_out) const { + std::shared_ptr* peer_out) const { std::lock_guard lock(lock_); if (!FindCopy(peers_, peer_uuid, peer_out)) { return Status::NotFound("Other consensus instance was destroyed"); @@ -493,7 +494,7 @@ class LocalTestPeerProxy : public TestPeerProxy { // Give the other peer a clean response object to write to. ConsensusResponsePB other_peer_resp; - scoped_refptr peer; + std::shared_ptr peer; Status s = peers_->GetPeerByUuid(peer_uuid_, &peer); if (s.ok()) { @@ -526,7 +527,7 @@ class LocalTestPeerProxy : public TestPeerProxy { VoteResponsePB other_peer_resp; other_peer_resp.CopyFrom(*response); - scoped_refptr peer; + std::shared_ptr peer; Status s = peers_->GetPeerByUuid(peer_uuid_, &peer); if (s.ok()) { @@ -662,8 +663,10 @@ class TestTransactionFactory : public ReplicaTransactionFactory { Status StartReplicaTransaction(const scoped_refptr& round) OVERRIDE { auto txn = new TestDriver(pool_.get(), log_, round); - txn->round_->SetConsensusReplicatedCallback(Bind(&TestDriver::ReplicationFinished, - Unretained(txn))); + txn->round_->SetConsensusReplicatedCallback(std::bind( + &TestDriver::ReplicationFinished, + txn, + std::placeholders::_1)); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/consensus/leader_election-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/leader_election-test.cc b/src/kudu/consensus/leader_election-test.cc index 915344a..dd62384 100644 --- a/src/kudu/consensus/leader_election-test.cc +++ b/src/kudu/consensus/leader_election-test.cc @@ -17,6 +17,7 @@ #include "kudu/consensus/leader_election.h" +#include #include #include #include @@ -211,8 +212,9 @@ scoped_refptr LeaderElectionTest::SetUpElectionWithHighTermVoter scoped_refptr election( new LeaderElection(config_, proxy_factory_.get(), request, std::move(counter), MonoDelta::FromSeconds(kLeaderElectionTimeoutSecs), - Bind(&LeaderElectionTest::ElectionCallback, - Unretained(this)))); + std::bind(&LeaderElectionTest::ElectionCallback, + this, + std::placeholders::_1))); return election; } @@ -267,8 +269,9 @@ scoped_refptr LeaderElectionTest::SetUpElectionWithGrantDenyErro scoped_refptr election( new LeaderElection(config_, proxy_factory_.get(), request, std::move(counter), MonoDelta::FromSeconds(kLeaderElectionTimeoutSecs), - Bind(&LeaderElectionTest::ElectionCallback, - Unretained(this)))); + std::bind(&LeaderElectionTest::ElectionCallback, + this, + std::placeholders::_1))); return election; } @@ -293,8 +296,9 @@ TEST_F(LeaderElectionTest, TestPerfectElection) { scoped_refptr election( new LeaderElection(config_, proxy_factory_.get(), request, std::move(counter), MonoDelta::FromSeconds(kLeaderElectionTimeoutSecs), - Bind(&LeaderElectionTest::ElectionCallback, - Unretained(this)))); + std::bind(&LeaderElectionTest::ElectionCallback, + this, + std::placeholders::_1))); election->Run(); latch_.Wait(); @@ -421,8 +425,9 @@ TEST_F(LeaderElectionTest, TestFailToCreateProxy) { scoped_refptr election( new LeaderElection(config_, proxy_factory_.get(), request, std::move(counter), MonoDelta::FromSeconds(kLeaderElectionTimeoutSecs), - Bind(&LeaderElectionTest::ElectionCallback, - Unretained(this)))); + std::bind(&LeaderElectionTest::ElectionCallback, + this, + std::placeholders::_1))); election->Run(); latch_.Wait(); ASSERT_EQ(kElectionTerm, result_->vote_request.candidate_term()); http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/consensus/leader_election.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/leader_election.cc b/src/kudu/consensus/leader_election.cc index 2d371d1..b180345 100644 --- a/src/kudu/consensus/leader_election.cc +++ b/src/kudu/consensus/leader_election.cc @@ -259,7 +259,7 @@ void LeaderElection::CheckForDecision() { // Respond outside of the lock. if (to_respond) { // This is thread-safe since result_ is write-once. - decision_callback_.Run(*result_); + decision_callback_(*result_); } } http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/consensus/leader_election.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/leader_election.h b/src/kudu/consensus/leader_election.h index 487e34c..a4b5ca5 100644 --- a/src/kudu/consensus/leader_election.h +++ b/src/kudu/consensus/leader_election.h @@ -18,6 +18,7 @@ #ifndef KUDU_CONSENSUS_LEADER_ELECTION_H #define KUDU_CONSENSUS_LEADER_ELECTION_H +#include #include #include #include @@ -25,7 +26,6 @@ #include "kudu/consensus/consensus.pb.h" #include "kudu/consensus/raft_consensus.h" -#include "kudu/gutil/callback.h" #include "kudu/gutil/gscoped_ptr.h" #include "kudu/gutil/macros.h" #include "kudu/gutil/ref_counted.h" @@ -141,7 +141,7 @@ struct ElectionResult { // This class is thread-safe. class LeaderElection : public RefCountedThreadSafe { public: - typedef Callback ElectionDecisionCallback; + typedef std::function ElectionDecisionCallback; typedef std::unordered_map ProxyMap; // Set up a new leader election driver. http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/consensus/raft_consensus.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc index 2216366..1ebd0ef 100644 --- a/src/kudu/consensus/raft_consensus.cc +++ b/src/kudu/consensus/raft_consensus.cc @@ -462,10 +462,13 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) { *request.mutable_candidate_status()->mutable_last_received() = queue_->GetLastOpIdInLog(); - election.reset(new LeaderElection(active_config, - peer_proxy_factory_.get(), - request, std::move(counter), timeout, - Bind(&RaftConsensus::ElectionCallback, this, reason))); + election.reset(new LeaderElection( + active_config, + peer_proxy_factory_.get(), + request, std::move(counter), timeout, + std::bind(&RaftConsensus::ElectionCallback, + shared_from_this(), + reason, std::placeholders::_1))); } // Start the election outside the lock. @@ -505,8 +508,10 @@ Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) { scoped_refptr RaftConsensus::NewRound( gscoped_ptr replicate_msg, - const ConsensusReplicatedCallback& replicated_cb) { - return make_scoped_refptr(new ConsensusRound(this, std::move(replicate_msg), replicated_cb)); + ConsensusReplicatedCallback replicated_cb) { + return make_scoped_refptr(new ConsensusRound(this, + std::move(replicate_msg), + std::move(replicated_cb))); } void RaftConsensus::ReportFailureDetected(const std::string& name, const Status& /*msg*/) { @@ -545,10 +550,12 @@ Status RaftConsensus::BecomeLeaderUnlocked() { scoped_refptr round( new ConsensusRound(this, make_scoped_refptr(new RefCountedReplicate(replicate)))); - round->SetConsensusReplicatedCallback(Bind(&RaftConsensus::NonTxRoundReplicationFinished, - Unretained(this), - Unretained(round.get()), - Bind(&DoNothingStatusCB))); + round->SetConsensusReplicatedCallback(std::bind( + &RaftConsensus::NonTxRoundReplicationFinished, + this, + round.get(), + &DoNothingStatusCB, + std::placeholders::_1)); RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round)); return Status::OK(); @@ -737,8 +744,11 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid, } // Run config change on thread pool after dropping lock. - WARN_NOT_OK(raft_pool_token_->SubmitClosure(Bind(&RaftConsensus::TryRemoveFollowerTask, - this, uuid, committed_config, reason)), + WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryRemoveFollowerTask, + shared_from_this(), + uuid, + committed_config, + reason)), LogPrefixThreadSafe() + "Unable to start RemoteFollowerTask"); } @@ -753,7 +763,7 @@ void RaftConsensus::TryRemoveFollowerTask(const string& uuid, LOG(INFO) << LogPrefixThreadSafe() << "Attempting to remove follower " << uuid << " from the Raft config. Reason: " << reason; boost::optional error_code; - WARN_NOT_OK(ChangeConfig(req, Bind(&DoNothingStatusCB), &error_code), + WARN_NOT_OK(ChangeConfig(req, &DoNothingStatusCB, &error_code), LogPrefixThreadSafe() + "Unable to remove follower " + uuid); } @@ -1498,7 +1508,7 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB* } Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req, - const StatusCallback& client_cb, + StdStatusCallback client_cb, boost::optional* error_code) { TRACE_EVENT2("consensus", "RaftConsensus::ChangeConfig", "peer", peer_uuid(), @@ -1591,11 +1601,13 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req, return Status::NotSupported("Role change is not yet implemented."); } - RETURN_NOT_OK(ReplicateConfigChangeUnlocked(committed_config, new_config, - Bind(&RaftConsensus::MarkDirtyOnSuccess, - Unretained(this), - string("Config change replication complete"), - client_cb))); + RETURN_NOT_OK(ReplicateConfigChangeUnlocked( + committed_config, new_config, std::bind( + &RaftConsensus::MarkDirtyOnSuccess, + this, + string("Config change replication complete"), + std::move(client_cb), + std::placeholders::_1))); } peer_manager_->SignalRequest(); return Status::OK(); @@ -1800,13 +1812,17 @@ Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg VLOG_WITH_PREFIX_UNLOCKED(1) << "Starting consensus round: " << SecureShortDebugString(msg->get()->id()); scoped_refptr round(new ConsensusRound(this, msg)); - round->SetConsensusReplicatedCallback(Bind(&RaftConsensus::NonTxRoundReplicationFinished, - Unretained(this), - Unretained(round.get()), - Bind(&RaftConsensus::MarkDirtyOnSuccess, - Unretained(this), - string("Replicated consensus-only round"), - Bind(&DoNothingStatusCB)))); + StdStatusCallback client_cb = std::bind(&RaftConsensus::MarkDirtyOnSuccess, + this, + string("Replicated consensus-only round"), + &DoNothingStatusCB, + std::placeholders::_1); + round->SetConsensusReplicatedCallback(std::bind( + &RaftConsensus::NonTxRoundReplicationFinished, + this, + round.get(), + std::move(client_cb), + std::placeholders::_1)); return AddPendingOperationUnlocked(round); } @@ -1978,9 +1994,10 @@ void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) { MarkDirty(Substitute("New leader $0", uuid)); } -Status RaftConsensus::ReplicateConfigChangeUnlocked(const RaftConfigPB& old_config, - const RaftConfigPB& new_config, - const StatusCallback& client_cb) { +Status RaftConsensus::ReplicateConfigChangeUnlocked( + const RaftConfigPB& old_config, + const RaftConfigPB& new_config, + StdStatusCallback client_cb) { DCHECK(lock_.is_locked()); auto cc_replicate = new ReplicateMsg(); cc_replicate->set_op_type(CHANGE_CONFIG_OP); @@ -1992,10 +2009,12 @@ Status RaftConsensus::ReplicateConfigChangeUnlocked(const RaftConfigPB& old_conf scoped_refptr round( new ConsensusRound(this, make_scoped_refptr(new RefCountedReplicate(cc_replicate)))); - round->SetConsensusReplicatedCallback(Bind(&RaftConsensus::NonTxRoundReplicationFinished, - Unretained(this), - Unretained(round.get()), - client_cb)); + round->SetConsensusReplicatedCallback(std::bind( + &RaftConsensus::NonTxRoundReplicationFinished, + this, + round.get(), + std::move(client_cb), + std::placeholders::_1)); CHECK_OK(AppendNewRoundToQueueUnlocked(round)); return Status::OK(); @@ -2068,8 +2087,10 @@ void RaftConsensus::ElectionCallback(ElectionReason reason, const ElectionResult // The election callback runs on a reactor thread, so we need to defer to our // threadpool. If the threadpool is already shut down for some reason, it's OK -- // we're OK with the callback never running. - WARN_NOT_OK(raft_pool_token_->SubmitClosure(Bind(&RaftConsensus::DoElectionCallback, - this, reason, result)), + WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::DoElectionCallback, + shared_from_this(), + reason, + result)), LogPrefixThreadSafe() + "Unable to run election callback"); } @@ -2220,16 +2241,16 @@ void RaftConsensus::MarkDirty(const std::string& reason) { } void RaftConsensus::MarkDirtyOnSuccess(const string& reason, - const StatusCallback& client_cb, + const StdStatusCallback& client_cb, const Status& status) { if (PREDICT_TRUE(status.ok())) { MarkDirty(reason); } - client_cb.Run(status); + client_cb(status); } void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round, - const StatusCallback& client_cb, + const StdStatusCallback& client_cb, const Status& status) { // NOTE: lock_ is held here because this is triggered by // PendingRounds::AbortOpsAfter() and AdvanceCommittedIndex(). @@ -2249,7 +2270,7 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round, if (!status.ok()) { LOG_WITH_PREFIX_UNLOCKED(INFO) << op_type_str << " replication failed: " << status.ToString(); - client_cb.Run(status); + client_cb(status); return; } VLOG_WITH_PREFIX_UNLOCKED(1) << "Committing " << op_type_str << " with op id " @@ -2262,7 +2283,7 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round, Bind(CrashIfNotOkStatusCB, "Enqueued commit operation failed to write to WAL"))); - client_cb.Run(status); + client_cb(status); } void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, const Status& status) { @@ -2650,8 +2671,8 @@ ConsensusRound::ConsensusRound(RaftConsensus* consensus, } void ConsensusRound::NotifyReplicationFinished(const Status& status) { - if (PREDICT_FALSE(replicated_cb_.is_null())) return; - replicated_cb_.Run(status); + if (PREDICT_FALSE(!replicated_cb_)) return; + replicated_cb_(status); } Status ConsensusRound::CheckBoundTerm(int64_t current_term) const { http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/consensus/raft_consensus.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h index 7a66045..0eb3652 100644 --- a/src/kudu/consensus/raft_consensus.h +++ b/src/kudu/consensus/raft_consensus.h @@ -82,9 +82,9 @@ struct ConsensusOptions { }; typedef int64_t ConsensusTerm; -typedef StatusCallback ConsensusReplicatedCallback; +typedef StdStatusCallback ConsensusReplicatedCallback; -class RaftConsensus : public RefCountedThreadSafe, +class RaftConsensus : public std::enable_shared_from_this, public PeerMessageQueueObserver { public: @@ -124,6 +124,7 @@ class RaftConsensus : public RefCountedThreadSafe, RaftPeerPB local_peer_pb, scoped_refptr cmeta_manager, ThreadPool* raft_pool); + ~RaftConsensus(); // Initializes the RaftConsensus object. This should be called before // publishing this object to any thread other than the thread that invoked @@ -166,7 +167,7 @@ class RaftConsensus : public RefCountedThreadSafe, // increase the reference count for the provided callbacks. scoped_refptr NewRound( gscoped_ptr replicate_msg, - const ConsensusReplicatedCallback& replicated_cb); + ConsensusReplicatedCallback replicated_cb); // Call StartElection(), log a warning if the call fails (usually due to // being shut down). @@ -247,7 +248,7 @@ class RaftConsensus : public RefCountedThreadSafe, // Implement a ChangeConfig() request. Status ChangeConfig(const ChangeConfigRequestPB& req, - const StatusCallback& client_cb, + StdStatusCallback client_cb, boost::optional* error_code); // Implement an UnsafeChangeConfig() request. @@ -373,9 +374,6 @@ class RaftConsensus : public RefCountedThreadSafe, using LockGuard = std::lock_guard; using UniqueLock = std::unique_lock; - // Private because this class is refcounted. - ~RaftConsensus(); - // Returns string description for State enum value. static const char* State_Name(State state); @@ -386,9 +384,10 @@ class RaftConsensus : public RefCountedThreadSafe, // Replicate (as leader) a pre-validated config change. This includes // updating the peers and setting the new_configuration as pending. // The old_configuration must be the currently-committed configuration. - Status ReplicateConfigChangeUnlocked(const RaftConfigPB& old_config, - const RaftConfigPB& new_config, - const StatusCallback& client_cb); + Status ReplicateConfigChangeUnlocked( + const RaftConfigPB& old_config, + const RaftConfigPB& new_config, + StdStatusCallback client_cb); // Update the peers and queue to be consistent with a new active configuration. // Should only be called by the leader. @@ -569,7 +568,7 @@ class RaftConsensus : public RefCountedThreadSafe, // Calls MarkDirty() if 'status' == OK. Then, always calls 'client_cb' with // 'status' as its argument. void MarkDirtyOnSuccess(const std::string& reason, - const StatusCallback& client_cb, + const StdStatusCallback& client_cb, const Status& status); // Attempt to remove the follower with the specified 'uuid' from the config, @@ -602,7 +601,7 @@ class RaftConsensus : public RefCountedThreadSafe, // // NOTE: Must be called while holding 'lock_'. void NonTxRoundReplicationFinished(ConsensusRound* round, - const StatusCallback& client_cb, + const StdStatusCallback& client_cb, const Status& status); // As a leader, append a new ConsensusRound to the queue. @@ -827,7 +826,8 @@ class ConsensusRound : public RefCountedThreadSafe { public: // Ctor used for leader transactions. Leader transactions can and must specify the // callbacks prior to initiating the consensus round. - ConsensusRound(RaftConsensus* consensus, gscoped_ptr replicate_msg, + ConsensusRound(RaftConsensus* consensus, + gscoped_ptr replicate_msg, ConsensusReplicatedCallback replicated_cb); // Ctor used for follower/learner transactions. These transactions do not use the @@ -855,8 +855,8 @@ class ConsensusRound : public RefCountedThreadSafe { // permanently failed to replicate if 'status' is anything else. If 'status' // is OK() then the operation can be applied to the state machine, otherwise // the operation should be aborted. - void SetConsensusReplicatedCallback(const ConsensusReplicatedCallback& replicated_cb) { - replicated_cb_ = replicated_cb; + void SetConsensusReplicatedCallback(ConsensusReplicatedCallback replicated_cb) { + replicated_cb_ = std::move(replicated_cb); } // If a continuation was set, notifies it that the round has been replicated. http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/consensus/raft_consensus_quorum-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc index f9d2533..491e1ad 100644 --- a/src/kudu/consensus/raft_consensus_quorum-test.cc +++ b/src/kudu/consensus/raft_consensus_quorum-test.cc @@ -167,7 +167,7 @@ class RaftConsensusQuorumTest : public KuduTest { RaftPeerPB local_peer_pb; RETURN_NOT_OK(GetRaftConfigMember(config_, fs_managers_[i]->uuid(), &local_peer_pb)); - scoped_refptr peer( + shared_ptr peer( new RaftConsensus(options_, config_.peers(i), cmeta_managers_[i], @@ -184,7 +184,7 @@ class RaftConsensusQuorumTest : public KuduTest { TestPeerMap all_peers = peers_->GetPeerMapCopy(); for (int i = 0; i < config_.peers_size(); i++) { - scoped_refptr peer; + shared_ptr peer; RETURN_NOT_OK(peers_->GetPeerByIdx(i, &peer)); gscoped_ptr proxy_factory(new LocalTestPeerProxyFactory(peers_.get())); @@ -218,16 +218,16 @@ class RaftConsensusQuorumTest : public KuduTest { // Automatically elect the last node in the list. const int kLeaderIdx = num - 1; - scoped_refptr leader; + shared_ptr leader; RETURN_NOT_OK(peers_->GetPeerByIdx(kLeaderIdx, &leader)); RETURN_NOT_OK(leader->EmulateElection()); return Status::OK(); } LocalTestPeerProxy* GetLeaderProxyToPeer(int peer_idx, int leader_idx) { - scoped_refptr follower; + shared_ptr follower; CHECK_OK(peers_->GetPeerByIdx(peer_idx, &follower)); - scoped_refptr leader; + shared_ptr leader; CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader)); for (LocalTestPeerProxy* proxy : down_cast( leader->peer_proxy_factory_.get())->GetProxies()) { @@ -246,12 +246,12 @@ class RaftConsensusQuorumTest : public KuduTest { msg->mutable_noop_request(); msg->set_timestamp(clock_->Now().ToUint64()); - scoped_refptr peer; + shared_ptr peer; CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer)); // Use a latch in place of a Transaction callback. gscoped_ptr sync(new Synchronizer()); - *round = peer->NewRound(std::move(msg), sync->AsStatusCallback()); + *round = peer->NewRound(std::move(msg), sync->AsStdStatusCallback()); InsertOrDie(&syncs_, round->get(), sync.release()); RETURN_NOT_OK_PREPEND(peer->Replicate(round->get()), Substitute("Unable to replicate to peer $0", peer_idx)); @@ -289,7 +289,7 @@ class RaftConsensusQuorumTest : public KuduTest { } void WaitForReplicateIfNotAlreadyPresent(const OpId& to_wait_for, int peer_idx) { - scoped_refptr peer; + shared_ptr peer; CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer)); while (true) { if (OpIdCompare(peer->queue_->GetLastOpIdInLog(), to_wait_for) >= 0) { @@ -307,7 +307,7 @@ class RaftConsensusQuorumTest : public KuduTest { MonoDelta timeout(MonoDelta::FromSeconds(10)); MonoTime start(MonoTime::Now()); - scoped_refptr peer; + shared_ptr peer; CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer)); int backoff_exp = 0; @@ -329,7 +329,7 @@ class RaftConsensusQuorumTest : public KuduTest { << "op " << to_wait_for << " on replica. Last committed op on replica: " << committed.index() << ". Dumping state and quitting."; vector lines; - scoped_refptr leader; + shared_ptr leader; CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader)); for (const string& line : lines) { LOG(ERROR) << line; @@ -385,7 +385,7 @@ class RaftConsensusQuorumTest : public KuduTest { } if (wait_mode == WAIT_FOR_ALL_REPLICAS) { - scoped_refptr leader; + shared_ptr leader; CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader)); TestPeerMap all_peers = peers_->GetPeerMapCopy(); @@ -439,7 +439,7 @@ class RaftConsensusQuorumTest : public KuduTest { vector leader_entries; ElementDeleter leader_entry_deleter(&leader_entries); GatherLogEntries(leader_idx, logs_[leader_idx], &leader_entries); - scoped_refptr leader; + shared_ptr leader; CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader)); for (int replica_idx = first_replica_idx; replica_idx < last_replica_idx; replica_idx++) { @@ -447,7 +447,7 @@ class RaftConsensusQuorumTest : public KuduTest { ElementDeleter replica_entry_deleter(&replica_entries); GatherLogEntries(replica_idx, logs_[replica_idx], &replica_entries); - scoped_refptr replica; + shared_ptr replica; CHECK_OK(peers_->GetPeerByIdx(replica_idx, &replica)); VerifyReplica(leader_entries, replica_entries, @@ -661,7 +661,7 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind) { { // lock one of the replicas down by obtaining the state lock // and never letting it go. - scoped_refptr follower0; + shared_ptr follower0; CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0)); RaftConsensus::LockGuard l(follower0->lock_); @@ -704,11 +704,11 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind) { { // lock two of the replicas down by obtaining the state locks // and never letting them go. - scoped_refptr follower0; + shared_ptr follower0; CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0)); RaftConsensus::LockGuard l_0(follower0->lock_); - scoped_refptr follower1; + shared_ptr follower1; CHECK_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1)); RaftConsensus::LockGuard l_1(follower1->lock_); @@ -811,14 +811,14 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderHeartbeats) { ASSERT_OK(BuildConfig(3)); - scoped_refptr follower0; + shared_ptr follower0; CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0)); - scoped_refptr follower1; + shared_ptr follower1; CHECK_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1)); ASSERT_OK(StartPeers()); - scoped_refptr leader; + shared_ptr leader; CHECK_OK(peers_->GetPeerByIdx(kLeaderIdx, &leader)); ASSERT_OK(leader->EmulateElection()); @@ -884,13 +884,13 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) { // Now shutdown the current leader. LOG(INFO) << "Shutting down current leader with index " << (current_config_size - 1); - scoped_refptr current_leader; + shared_ptr current_leader; CHECK_OK(peers_->GetPeerByIdx(current_config_size - 1, ¤t_leader)); current_leader->Shutdown(); peers_->RemovePeer(current_leader->peer_uuid()); // ... and make the peer before it become leader. - scoped_refptr new_leader; + shared_ptr new_leader; CHECK_OK(peers_->GetPeerByIdx(current_config_size - 2, &new_leader)); // This will force an election in which we expect to make the last @@ -951,10 +951,10 @@ TEST_F(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty) { ConsensusRequestPB req; ConsensusResponsePB resp; - scoped_refptr leader; + shared_ptr leader; CHECK_OK(peers_->GetPeerByIdx(2, &leader)); - scoped_refptr follower; + shared_ptr follower; CHECK_OK(peers_->GetPeerByIdx(0, &follower)); req.set_caller_uuid(leader->peer_uuid()); @@ -1018,7 +1018,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) { ASSERT_TRUE(OpIdLessThan(MinimumOpId(), last_op_id)); const int kPeerIndex = 1; - scoped_refptr peer; + shared_ptr peer; CHECK_OK(peers_->GetPeerByIdx(kPeerIndex, &peer)); auto flush_count = [&]() { return peer->consensus_metadata_for_tests()->flush_count_for_tests(); http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/master/catalog_manager.cc ---------------------------------------------------------------------- diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc index 5ea25e8..5b7c0d5 100644 --- a/src/kudu/master/catalog_manager.cc +++ b/src/kudu/master/catalog_manager.cc @@ -1102,7 +1102,7 @@ bool CatalogManager::IsInitialized() const { } RaftPeerPB::Role CatalogManager::Role() const { - scoped_refptr consensus; + shared_ptr consensus; { std::lock_guard l(state_lock_); if (state_ == kRunning) { http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/master/sys_catalog.cc ---------------------------------------------------------------------- diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc index 5816484..7ca586b 100644 --- a/src/kudu/master/sys_catalog.cc +++ b/src/kudu/master/sys_catalog.cc @@ -274,7 +274,7 @@ Status SysCatalogTable::CreateDistributedConfig(const MasterOptions& options, void SysCatalogTable::SysCatalogStateChanged(const string& tablet_id, const string& reason) { CHECK_EQ(tablet_id, tablet_replica_->tablet_id()); - scoped_refptr consensus = tablet_replica_->shared_consensus(); + shared_ptr consensus = tablet_replica_->shared_consensus(); if (!consensus) { LOG_WITH_PREFIX(WARNING) << "Received notification of tablet state change " << "but tablet no longer running. Tablet ID: " http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/tablet/tablet_replica.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc index 07679d1..2a87fa0 100644 --- a/src/kudu/tablet/tablet_replica.cc +++ b/src/kudu/tablet/tablet_replica.cc @@ -18,6 +18,7 @@ #include "kudu/tablet/tablet_replica.h" #include +#include #include #include #include @@ -157,8 +158,10 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info, TRACE("Creating consensus instance"); ConsensusOptions options; options.tablet_id = meta_->tablet_id(); - scoped_refptr consensus(new RaftConsensus(std::move(options), local_peer_pb_, - cmeta_manager_, raft_pool)); + shared_ptr consensus(std::make_shared(std::move(options), + local_peer_pb_, + cmeta_manager_, + raft_pool)); RETURN_NOT_OK(consensus->Init()); scoped_refptr metric_entity; @@ -545,9 +548,11 @@ Status TabletReplica::StartReplicaTransaction(const scoped_refptr driver; RETURN_NOT_OK(NewReplicaTransactionDriver(std::move(transaction), &driver)); - // Unretained is required to avoid a refcount cycle. + // A raw pointer is required to avoid a refcount cycle. state->consensus_round()->SetConsensusReplicatedCallback( - Bind(&TransactionDriver::ReplicationFinished, Unretained(driver.get()))); + std::bind(&TransactionDriver::ReplicationFinished, + driver.get(), + std::placeholders::_1)); RETURN_NOT_OK(driver->ExecuteAsync()); return Status::OK(); http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/tablet/tablet_replica.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h index e813ba4..307ddd9 100644 --- a/src/kudu/tablet/tablet_replica.h +++ b/src/kudu/tablet/tablet_replica.h @@ -131,7 +131,7 @@ class TabletReplica : public RefCountedThreadSafe, return consensus_.get(); } - scoped_refptr shared_consensus() const { + std::shared_ptr shared_consensus() const { std::lock_guard lock(lock_); return consensus_; } @@ -304,7 +304,7 @@ class TabletReplica : public RefCountedThreadSafe, scoped_refptr log_; std::shared_ptr tablet_; std::shared_ptr messenger_; - scoped_refptr consensus_; + std::shared_ptr consensus_; simple_spinlock prepare_replicate_lock_; // Lock protecting state_, last_status_, as well as smart pointers to collaborating http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/tablet/transactions/transaction_driver.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/transactions/transaction_driver.cc b/src/kudu/tablet/transactions/transaction_driver.cc index 0e73d1b..ebee430 100644 --- a/src/kudu/tablet/transactions/transaction_driver.cc +++ b/src/kudu/tablet/transactions/transaction_driver.cc @@ -17,6 +17,7 @@ #include "kudu/tablet/transactions/transaction_driver.h" +#include #include #include "kudu/consensus/time_manager.h" @@ -133,10 +134,12 @@ Status TransactionDriver::Init(gscoped_ptr transaction, gscoped_ptr replicate_msg; transaction_->NewReplicateMsg(&replicate_msg); if (consensus_) { // sometimes NULL in tests - // Unretained is required to avoid a refcount cycle. + // A raw pointer is required to avoid a refcount cycle. mutable_state()->set_consensus_round( consensus_->NewRound(std::move(replicate_msg), - Bind(&TransactionDriver::ReplicationFinished, Unretained(this)))); + std::bind(&TransactionDriver::ReplicationFinished, + this, + std::placeholders::_1))); } } http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/tserver/tablet_copy_source_session.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_copy_source_session.cc b/src/kudu/tserver/tablet_copy_source_session.cc index 85faa49..f470647 100644 --- a/src/kudu/tserver/tablet_copy_source_session.cc +++ b/src/kudu/tserver/tablet_copy_source_session.cc @@ -120,7 +120,7 @@ Status TabletCopySourceSession::Init() { // We do this after snapshotting the log to avoid a scenario where the latest // entry in the log has a term higher than the term stored in the consensus // metadata, which will results in a CHECK failure on RaftConsensus init. - scoped_refptr consensus = tablet_replica_->shared_consensus(); + shared_ptr consensus = tablet_replica_->shared_consensus(); if (!consensus) { tablet::TabletStatePB tablet_state = tablet_replica_->state(); return Status::IllegalState(Substitute( http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/tserver/tablet_service.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc index 68fa88b..c430982 100644 --- a/src/kudu/tserver/tablet_service.cc +++ b/src/kudu/tserver/tablet_service.cc @@ -18,12 +18,14 @@ #include "kudu/tserver/tablet_service.h" #include -#include +#include #include #include #include #include +#include + #include "kudu/common/iterator.h" #include "kudu/common/scan_spec.h" #include "kudu/common/schema.h" @@ -214,7 +216,7 @@ template bool GetConsensusOrRespond(const scoped_refptr& replica, RespClass* resp, rpc::RpcContext* context, - scoped_refptr* consensus) { + shared_ptr* consensus) { *consensus = replica->shared_consensus(); if (!*consensus) { Status s = Status::ServiceUnavailable("Raft Consensus unavailable. Tablet not running"); @@ -255,8 +257,15 @@ void HandleResponse(const ReqType* req, RespType* resp, } template -static StatusCallback BindHandleResponse(const ReqType* req, RespType* resp, RpcContext* context) { - return Bind(&HandleResponse, req, resp, context); +static StdStatusCallback BindHandleResponse( + const ReqType* req, + RespType* resp, + RpcContext* context) { + return std::bind(&HandleResponse, + req, + resp, + context, + std::placeholders::_1); } } // namespace @@ -856,7 +865,7 @@ void ConsensusServiceImpl::UpdateConsensus(const ConsensusRequestPB* req, replica->permanent_uuid(); // Submit the update directly to the TabletReplica's RaftConsensus instance. - scoped_refptr consensus; + shared_ptr consensus; if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; Status s = consensus->Update(req, resp); if (PREDICT_FALSE(!s.ok())) { @@ -886,7 +895,7 @@ void ConsensusServiceImpl::RequestConsensusVote(const VoteRequestPB* req, } // Submit the vote request directly to the consensus instance. - scoped_refptr consensus; + shared_ptr consensus; if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; Status s = consensus->RequestVote(req, resp); if (PREDICT_FALSE(!s.ok())) { @@ -911,7 +920,7 @@ void ConsensusServiceImpl::ChangeConfig(const ChangeConfigRequestPB* req, return; } - scoped_refptr consensus; + shared_ptr consensus; if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; boost::optional error_code; Status s = consensus->ChangeConfig(*req, BindHandleResponse(req, resp, context), &error_code); @@ -935,7 +944,7 @@ void ConsensusServiceImpl::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB* return; } - scoped_refptr consensus; + shared_ptr consensus; if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; TabletServerErrorPB::Code error_code; Status s = consensus->UnsafeChangeConfig(*req, &error_code); @@ -966,7 +975,7 @@ void ConsensusServiceImpl::RunLeaderElection(const RunLeaderElectionRequestPB* r return; } - scoped_refptr consensus; + shared_ptr consensus; if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; Status s = consensus->StartElection( consensus::RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, @@ -992,7 +1001,7 @@ void ConsensusServiceImpl::LeaderStepDown(const LeaderStepDownRequestPB* req, return; } - scoped_refptr consensus; + shared_ptr consensus; if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; Status s = consensus->StepDown(resp); if (PREDICT_FALSE(!s.ok())) { @@ -1022,7 +1031,7 @@ void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *re TabletServerErrorPB::TABLET_NOT_RUNNING, context); return; } - scoped_refptr consensus; + shared_ptr consensus; if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return; if (PREDICT_FALSE(req->opid_type() == consensus::UNKNOWN_OPID_TYPE)) { HandleUnknownError(Status::InvalidArgument("Invalid opid_type specified to GetLastOpId()"), @@ -1057,7 +1066,7 @@ void ConsensusServiceImpl::GetConsensusState(const consensus::GetConsensusStateR continue; } - scoped_refptr consensus(replica->shared_consensus()); + shared_ptr consensus(replica->shared_consensus()); if (!consensus) { continue; } http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/tserver/ts_tablet_manager.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc index 079ad89..3444968 100644 --- a/src/kudu/tserver/ts_tablet_manager.cc +++ b/src/kudu/tserver/ts_tablet_manager.cc @@ -620,7 +620,7 @@ Status TSTabletManager::DeleteTablet( // restarting the tablet if the local replica committed a higher config // change op during that time, or potentially something else more invasive. if (cas_config_opid_index_less_or_equal && !tablet_deleted) { - scoped_refptr consensus = replica->shared_consensus(); + shared_ptr consensus = replica->shared_consensus(); if (!consensus) { *error_code = TabletServerErrorPB::TABLET_NOT_RUNNING; return Status::IllegalState("Raft Consensus not available. Tablet shutting down"); @@ -953,7 +953,7 @@ void TSTabletManager::CreateReportedTabletPB(const string& tablet_id, reported_tablet->set_schema_version(replica->tablet_metadata()->schema_version()); // We cannot get consensus state information unless the TabletReplica is running. - scoped_refptr consensus = replica->shared_consensus(); + shared_ptr consensus = replica->shared_consensus(); if (consensus) { *reported_tablet->mutable_consensus_state() = consensus->ConsensusState(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/tserver/tserver-path-handlers.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tserver-path-handlers.cc b/src/kudu/tserver/tserver-path-handlers.cc index 26220dd..476832d 100644 --- a/src/kudu/tserver/tserver-path-handlers.cc +++ b/src/kudu/tserver/tserver-path-handlers.cc @@ -251,7 +251,7 @@ void TabletServerPathHandlers::HandleTabletsPage(const Webserver::WebRequest& re .PartitionDebugString(replica->tablet_metadata()->partition(), replica->tablet_metadata()->schema()); - scoped_refptr consensus = replica->shared_consensus(); + shared_ptr consensus = replica->shared_consensus(); (*output) << Substitute( // Table name, tablet id, partition "$0$1$2" @@ -456,7 +456,7 @@ void TabletServerPathHandlers::HandleConsensusStatusPage(const Webserver::WebReq string id; scoped_refptr replica; if (!LoadTablet(tserver_, req, &id, &replica, output)) return; - scoped_refptr consensus = replica->shared_consensus(); + shared_ptr consensus = replica->shared_consensus(); if (!consensus) { *output << "Tablet " << EscapeForHtmlToString(id) << " not running"; return; http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/util/async_util.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/async_util.h b/src/kudu/util/async_util.h index 1e2830c..727b7f7 100644 --- a/src/kudu/util/async_util.h +++ b/src/kudu/util/async_util.h @@ -19,6 +19,8 @@ #ifndef KUDU_UTIL_ASYNC_UTIL_H #define KUDU_UTIL_ASYNC_UTIL_H +#include + #include "kudu/gutil/bind.h" #include "kudu/gutil/macros.h" #include "kudu/util/countdown_latch.h" @@ -35,12 +37,14 @@ namespace kudu { class Synchronizer { public: Synchronizer() - : l(1) { + : l_(1) { } + void StatusCB(const Status& status) { - s = status; - l.CountDown(); + s_ = status; + l_.CountDown(); } + StatusCallback AsStatusCallback() { // Synchronizers are often declared on the stack, so it doesn't make // sense for a callback to take a reference to its synchronizer. @@ -49,23 +53,37 @@ class Synchronizer { // its synchronizer. return Bind(&Synchronizer::StatusCB, Unretained(this)); } + + StdStatusCallback AsStdStatusCallback() { + // Synchronizers are often declared on the stack, so it doesn't make + // sense for a callback to take a reference to its synchronizer. + // + // Note: this means the returned callback _must_ go out of scope before + // its synchronizer. + return std::bind(&Synchronizer::StatusCB, this, std::placeholders::_1); + } + Status Wait() { - l.Wait(); - return s; + l_.Wait(); + return s_; } + Status WaitFor(const MonoDelta& delta) { - if (PREDICT_FALSE(!l.WaitFor(delta))) { + if (PREDICT_FALSE(!l_.WaitFor(delta))) { return Status::TimedOut("Timed out while waiting for the callback to be called."); } - return s; + return s_; } + void Reset() { - l.Reset(1); + l_.Reset(1); } + private: + Status s_; + CountDownLatch l_; + DISALLOW_COPY_AND_ASSIGN(Synchronizer); - Status s; - CountDownLatch l; }; } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/util/status_callback.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/status_callback.h b/src/kudu/util/status_callback.h index 3a36b83..70bbb97 100644 --- a/src/kudu/util/status_callback.h +++ b/src/kudu/util/status_callback.h @@ -17,6 +17,7 @@ #ifndef KUDU_UTIL_STATUS_CALLBACK_H #define KUDU_UTIL_STATUS_CALLBACK_H +#include #include #include "kudu/gutil/callback_forward.h" @@ -29,6 +30,11 @@ class Status; // produce asynchronous results and may fail. typedef Callback StatusCallback; +// Like StatusCallback but uses the STL function objects. +// +// TODO(adar): should eventually replace all StatusCallback usage with this. +typedef std::function StdStatusCallback; + // To be used when a function signature requires a StatusCallback but none // is needed. extern void DoNothingStatusCB(const Status& status);