kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject kudu git commit: consensus: switch RaftConsensus to shared_ptr
Date Tue, 01 Aug 2017 00:16:40 GMT
Repository: kudu
Updated Branches:
  refs/heads/master a21a50f87 -> 1c0276a98


consensus: switch RaftConsensus to shared_ptr

Using shared_ptr instead of scoped_refptr means we can create weak pointers
to RaftConsensus, and I'd like to take advantage of that in future work.

The change is largely mechanical. The interesting part is the corresponding
switch from kudu::{Bind,Callback} to std::{bind,function}. To maintain
equivalent ownership semantics, the following conversions to bound arguments
are needed:
- Unretained(this) -> this
- this -> shared_from_this()

I also took the liberty of converting several pass-by-cref functions to use
move semantics.

Change-Id: I7fcaf5b7e4c4ce19126972fa0a81764b7da34e48
Reviewed-on: http://gerrit.cloudera.org:8080/7531
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1c0276a9
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1c0276a9
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1c0276a9

Branch: refs/heads/master
Commit: 1c0276a98cad2fbb91ac1b21a68961bd50265f9c
Parents: a21a50f
Author: Adar Dembo <adar@cloudera.com>
Authored: Thu Jul 27 15:12:42 2017 -0700
Committer: Adar Dembo <adar@cloudera.com>
Committed: Tue Aug 1 00:15:11 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus-test-util.h        |  19 ++--
 src/kudu/consensus/leader_election-test.cc      |  21 ++--
 src/kudu/consensus/leader_election.cc           |   2 +-
 src/kudu/consensus/leader_election.h            |   4 +-
 src/kudu/consensus/raft_consensus.cc            | 105 +++++++++++--------
 src/kudu/consensus/raft_consensus.h             |  30 +++---
 .../consensus/raft_consensus_quorum-test.cc     |  48 ++++-----
 src/kudu/master/catalog_manager.cc              |   2 +-
 src/kudu/master/sys_catalog.cc                  |   2 +-
 src/kudu/tablet/tablet_replica.cc               |  13 ++-
 src/kudu/tablet/tablet_replica.h                |   4 +-
 .../tablet/transactions/transaction_driver.cc   |   7 +-
 src/kudu/tserver/tablet_copy_source_session.cc  |   2 +-
 src/kudu/tserver/tablet_service.cc              |  33 +++---
 src/kudu/tserver/ts_tablet_manager.cc           |   4 +-
 src/kudu/tserver/tserver-path-handlers.cc       |   4 +-
 src/kudu/util/async_util.h                      |  38 +++++--
 src/kudu/util/status_callback.h                 |   6 ++
 18 files changed, 207 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/consensus/consensus-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index de3e4ea..b000dde 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <functional>
 #include <map>
 #include <memory>
 #include <mutex>
@@ -369,25 +370,25 @@ class NoOpTestPeerProxyFactory : public PeerProxyFactory {
   std::shared_ptr<rpc::Messenger> messenger_;
 };
 
-typedef std::unordered_map<std::string, scoped_refptr<RaftConsensus> > TestPeerMap;
+typedef std::unordered_map<std::string, std::shared_ptr<RaftConsensus>> TestPeerMap;
 
 // Thread-safe manager for list of peers being used in tests.
 class TestPeerMapManager {
  public:
   explicit TestPeerMapManager(RaftConfigPB config) : config_(std::move(config)) {}
 
-  void AddPeer(const std::string& peer_uuid, const scoped_refptr<RaftConsensus>& peer) {
+  void AddPeer(const std::string& peer_uuid, const std::shared_ptr<RaftConsensus>& peer) {
     std::lock_guard<simple_spinlock> lock(lock_);
     InsertOrDie(&peers_, peer_uuid, peer);
   }
 
-  Status GetPeerByIdx(int idx, scoped_refptr<RaftConsensus>* peer_out) const {
+  Status GetPeerByIdx(int idx, std::shared_ptr<RaftConsensus>* peer_out) const {
     CHECK_LT(idx, config_.peers_size());
     return GetPeerByUuid(config_.peers(idx).permanent_uuid(), peer_out);
   }
 
   Status GetPeerByUuid(const std::string& peer_uuid,
-                       scoped_refptr<RaftConsensus>* peer_out) const {
+                       std::shared_ptr<RaftConsensus>* peer_out) const {
     std::lock_guard<simple_spinlock> lock(lock_);
     if (!FindCopy(peers_, peer_uuid, peer_out)) {
       return Status::NotFound("Other consensus instance was destroyed");
@@ -493,7 +494,7 @@ class LocalTestPeerProxy : public TestPeerProxy {
 
     // Give the other peer a clean response object to write to.
     ConsensusResponsePB other_peer_resp;
-    scoped_refptr<RaftConsensus> peer;
+    std::shared_ptr<RaftConsensus> peer;
     Status s = peers_->GetPeerByUuid(peer_uuid_, &peer);
 
     if (s.ok()) {
@@ -526,7 +527,7 @@ class LocalTestPeerProxy : public TestPeerProxy {
     VoteResponsePB other_peer_resp;
     other_peer_resp.CopyFrom(*response);
 
-    scoped_refptr<RaftConsensus> peer;
+    std::shared_ptr<RaftConsensus> peer;
     Status s = peers_->GetPeerByUuid(peer_uuid_, &peer);
 
     if (s.ok()) {
@@ -662,8 +663,10 @@ class TestTransactionFactory : public ReplicaTransactionFactory {
 
   Status StartReplicaTransaction(const scoped_refptr<ConsensusRound>& round) OVERRIDE {
     auto txn = new TestDriver(pool_.get(), log_, round);
-    txn->round_->SetConsensusReplicatedCallback(Bind(&TestDriver::ReplicationFinished,
-                                                     Unretained(txn)));
+    txn->round_->SetConsensusReplicatedCallback(std::bind(
+        &TestDriver::ReplicationFinished,
+        txn,
+        std::placeholders::_1));
     return Status::OK();
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/consensus/leader_election-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/leader_election-test.cc b/src/kudu/consensus/leader_election-test.cc
index 915344a..dd62384 100644
--- a/src/kudu/consensus/leader_election-test.cc
+++ b/src/kudu/consensus/leader_election-test.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/consensus/leader_election.h"
 
+#include <functional>
 #include <memory>
 #include <string>
 #include <vector>
@@ -211,8 +212,9 @@ scoped_refptr<LeaderElection> LeaderElectionTest::SetUpElectionWithHighTermVoter
   scoped_refptr<LeaderElection> election(
       new LeaderElection(config_, proxy_factory_.get(), request, std::move(counter),
                          MonoDelta::FromSeconds(kLeaderElectionTimeoutSecs),
-                         Bind(&LeaderElectionTest::ElectionCallback,
-                              Unretained(this))));
+                         std::bind(&LeaderElectionTest::ElectionCallback,
+                                   this,
+                                   std::placeholders::_1)));
   return election;
 }
 
@@ -267,8 +269,9 @@ scoped_refptr<LeaderElection> LeaderElectionTest::SetUpElectionWithGrantDenyErro
   scoped_refptr<LeaderElection> election(
       new LeaderElection(config_, proxy_factory_.get(), request, std::move(counter),
                          MonoDelta::FromSeconds(kLeaderElectionTimeoutSecs),
-                         Bind(&LeaderElectionTest::ElectionCallback,
-                              Unretained(this))));
+                         std::bind(&LeaderElectionTest::ElectionCallback,
+                                   this,
+                                   std::placeholders::_1)));
   return election;
 }
 
@@ -293,8 +296,9 @@ TEST_F(LeaderElectionTest, TestPerfectElection) {
     scoped_refptr<LeaderElection> election(
         new LeaderElection(config_, proxy_factory_.get(), request, std::move(counter),
                            MonoDelta::FromSeconds(kLeaderElectionTimeoutSecs),
-                           Bind(&LeaderElectionTest::ElectionCallback,
-                                Unretained(this))));
+                           std::bind(&LeaderElectionTest::ElectionCallback,
+                                     this,
+                                     std::placeholders::_1)));
     election->Run();
     latch_.Wait();
 
@@ -421,8 +425,9 @@ TEST_F(LeaderElectionTest, TestFailToCreateProxy) {
   scoped_refptr<LeaderElection> election(
       new LeaderElection(config_, proxy_factory_.get(), request, std::move(counter),
                          MonoDelta::FromSeconds(kLeaderElectionTimeoutSecs),
-                         Bind(&LeaderElectionTest::ElectionCallback,
-                              Unretained(this))));
+                         std::bind(&LeaderElectionTest::ElectionCallback,
+                                   this,
+                                   std::placeholders::_1)));
   election->Run();
   latch_.Wait();
   ASSERT_EQ(kElectionTerm, result_->vote_request.candidate_term());

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/consensus/leader_election.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/leader_election.cc b/src/kudu/consensus/leader_election.cc
index 2d371d1..b180345 100644
--- a/src/kudu/consensus/leader_election.cc
+++ b/src/kudu/consensus/leader_election.cc
@@ -259,7 +259,7 @@ void LeaderElection::CheckForDecision() {
   // Respond outside of the lock.
   if (to_respond) {
     // This is thread-safe since result_ is write-once.
-    decision_callback_.Run(*result_);
+    decision_callback_(*result_);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/consensus/leader_election.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/leader_election.h b/src/kudu/consensus/leader_election.h
index 487e34c..a4b5ca5 100644
--- a/src/kudu/consensus/leader_election.h
+++ b/src/kudu/consensus/leader_election.h
@@ -18,6 +18,7 @@
 #ifndef KUDU_CONSENSUS_LEADER_ELECTION_H
 #define KUDU_CONSENSUS_LEADER_ELECTION_H
 
+#include <functional>
 #include <map>
 #include <string>
 #include <unordered_map>
@@ -25,7 +26,6 @@
 
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/raft_consensus.h"
-#include "kudu/gutil/callback.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
@@ -141,7 +141,7 @@ struct ElectionResult {
 // This class is thread-safe.
 class LeaderElection : public RefCountedThreadSafe<LeaderElection> {
  public:
-  typedef Callback<void(const ElectionResult&)> ElectionDecisionCallback;
+  typedef std::function<void(const ElectionResult&)> ElectionDecisionCallback;
   typedef std::unordered_map<std::string, PeerProxy*> ProxyMap;
 
   // Set up a new leader election driver.

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 2216366..1ebd0ef 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -462,10 +462,13 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
     *request.mutable_candidate_status()->mutable_last_received() =
         queue_->GetLastOpIdInLog();
 
-    election.reset(new LeaderElection(active_config,
-                                      peer_proxy_factory_.get(),
-                                      request, std::move(counter), timeout,
-                                      Bind(&RaftConsensus::ElectionCallback, this, reason)));
+    election.reset(new LeaderElection(
+        active_config,
+        peer_proxy_factory_.get(),
+        request, std::move(counter), timeout,
+        std::bind(&RaftConsensus::ElectionCallback,
+                  shared_from_this(),
+                  reason, std::placeholders::_1)));
   }
 
   // Start the election outside the lock.
@@ -505,8 +508,10 @@ Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) {
 
 scoped_refptr<ConsensusRound> RaftConsensus::NewRound(
     gscoped_ptr<ReplicateMsg> replicate_msg,
-    const ConsensusReplicatedCallback& replicated_cb) {
-  return make_scoped_refptr(new ConsensusRound(this, std::move(replicate_msg), replicated_cb));
+    ConsensusReplicatedCallback replicated_cb) {
+  return make_scoped_refptr(new ConsensusRound(this,
+                                               std::move(replicate_msg),
+                                               std::move(replicated_cb)));
 }
 
 void RaftConsensus::ReportFailureDetected(const std::string& name, const Status& /*msg*/) {
@@ -545,10 +550,12 @@ Status RaftConsensus::BecomeLeaderUnlocked() {
 
   scoped_refptr<ConsensusRound> round(
       new ConsensusRound(this, make_scoped_refptr(new RefCountedReplicate(replicate))));
-  round->SetConsensusReplicatedCallback(Bind(&RaftConsensus::NonTxRoundReplicationFinished,
-                                             Unretained(this),
-                                             Unretained(round.get()),
-                                             Bind(&DoNothingStatusCB)));
+  round->SetConsensusReplicatedCallback(std::bind(
+      &RaftConsensus::NonTxRoundReplicationFinished,
+      this,
+      round.get(),
+      &DoNothingStatusCB,
+      std::placeholders::_1));
   RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round));
 
   return Status::OK();
@@ -737,8 +744,11 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid,
   }
 
   // Run config change on thread pool after dropping lock.
-  WARN_NOT_OK(raft_pool_token_->SubmitClosure(Bind(&RaftConsensus::TryRemoveFollowerTask,
-                                                   this, uuid, committed_config, reason)),
+  WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryRemoveFollowerTask,
+                                                     shared_from_this(),
+                                                     uuid,
+                                                     committed_config,
+                                                     reason)),
               LogPrefixThreadSafe() + "Unable to start RemoteFollowerTask");
 }
 
@@ -753,7 +763,7 @@ void RaftConsensus::TryRemoveFollowerTask(const string& uuid,
   LOG(INFO) << LogPrefixThreadSafe() << "Attempting to remove follower "
             << uuid << " from the Raft config. Reason: " << reason;
   boost::optional<TabletServerErrorPB::Code> error_code;
-  WARN_NOT_OK(ChangeConfig(req, Bind(&DoNothingStatusCB), &error_code),
+  WARN_NOT_OK(ChangeConfig(req, &DoNothingStatusCB, &error_code),
               LogPrefixThreadSafe() + "Unable to remove follower " + uuid);
 }
 
@@ -1498,7 +1508,7 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
 }
 
 Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
-                                   const StatusCallback& client_cb,
+                                   StdStatusCallback client_cb,
                                    boost::optional<TabletServerErrorPB::Code>* error_code) {
   TRACE_EVENT2("consensus", "RaftConsensus::ChangeConfig",
                "peer", peer_uuid(),
@@ -1591,11 +1601,13 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
         return Status::NotSupported("Role change is not yet implemented.");
     }
 
-    RETURN_NOT_OK(ReplicateConfigChangeUnlocked(committed_config, new_config,
-                                                Bind(&RaftConsensus::MarkDirtyOnSuccess,
-                                                     Unretained(this),
-                                                     string("Config change replication complete"),
-                                                     client_cb)));
+    RETURN_NOT_OK(ReplicateConfigChangeUnlocked(
+        committed_config, new_config, std::bind(
+            &RaftConsensus::MarkDirtyOnSuccess,
+            this,
+            string("Config change replication complete"),
+            std::move(client_cb),
+            std::placeholders::_1)));
   }
   peer_manager_->SignalRequest();
   return Status::OK();
@@ -1800,13 +1812,17 @@ Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg
   VLOG_WITH_PREFIX_UNLOCKED(1) << "Starting consensus round: "
                                << SecureShortDebugString(msg->get()->id());
   scoped_refptr<ConsensusRound> round(new ConsensusRound(this, msg));
-  round->SetConsensusReplicatedCallback(Bind(&RaftConsensus::NonTxRoundReplicationFinished,
-                                             Unretained(this),
-                                             Unretained(round.get()),
-                                             Bind(&RaftConsensus::MarkDirtyOnSuccess,
-                                                  Unretained(this),
-                                                  string("Replicated consensus-only round"),
-                                                  Bind(&DoNothingStatusCB))));
+  StdStatusCallback client_cb = std::bind(&RaftConsensus::MarkDirtyOnSuccess,
+                                          this,
+                                          string("Replicated consensus-only round"),
+                                          &DoNothingStatusCB,
+                                          std::placeholders::_1);
+  round->SetConsensusReplicatedCallback(std::bind(
+      &RaftConsensus::NonTxRoundReplicationFinished,
+      this,
+      round.get(),
+      std::move(client_cb),
+      std::placeholders::_1));
   return AddPendingOperationUnlocked(round);
 }
 
@@ -1978,9 +1994,10 @@ void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) {
   MarkDirty(Substitute("New leader $0", uuid));
 }
 
-Status RaftConsensus::ReplicateConfigChangeUnlocked(const RaftConfigPB& old_config,
-                                                    const RaftConfigPB& new_config,
-                                                    const StatusCallback& client_cb) {
+Status RaftConsensus::ReplicateConfigChangeUnlocked(
+    const RaftConfigPB& old_config,
+    const RaftConfigPB& new_config,
+    StdStatusCallback client_cb) {
   DCHECK(lock_.is_locked());
   auto cc_replicate = new ReplicateMsg();
   cc_replicate->set_op_type(CHANGE_CONFIG_OP);
@@ -1992,10 +2009,12 @@ Status RaftConsensus::ReplicateConfigChangeUnlocked(const RaftConfigPB& old_conf
 
   scoped_refptr<ConsensusRound> round(
       new ConsensusRound(this, make_scoped_refptr(new RefCountedReplicate(cc_replicate))));
-  round->SetConsensusReplicatedCallback(Bind(&RaftConsensus::NonTxRoundReplicationFinished,
-                                             Unretained(this),
-                                             Unretained(round.get()),
-                                             client_cb));
+  round->SetConsensusReplicatedCallback(std::bind(
+      &RaftConsensus::NonTxRoundReplicationFinished,
+      this,
+      round.get(),
+      std::move(client_cb),
+      std::placeholders::_1));
 
   CHECK_OK(AppendNewRoundToQueueUnlocked(round));
   return Status::OK();
@@ -2068,8 +2087,10 @@ void RaftConsensus::ElectionCallback(ElectionReason reason, const ElectionResult
   // The election callback runs on a reactor thread, so we need to defer to our
   // threadpool. If the threadpool is already shut down for some reason, it's OK --
   // we're OK with the callback never running.
-  WARN_NOT_OK(raft_pool_token_->SubmitClosure(Bind(&RaftConsensus::DoElectionCallback,
-                                                   this, reason, result)),
+  WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::DoElectionCallback,
+                                                     shared_from_this(),
+                                                     reason,
+                                                     result)),
               LogPrefixThreadSafe() + "Unable to run election callback");
 }
 
@@ -2220,16 +2241,16 @@ void RaftConsensus::MarkDirty(const std::string& reason) {
 }
 
 void RaftConsensus::MarkDirtyOnSuccess(const string& reason,
-                                       const StatusCallback& client_cb,
+                                       const StdStatusCallback& client_cb,
                                        const Status& status) {
   if (PREDICT_TRUE(status.ok())) {
     MarkDirty(reason);
   }
-  client_cb.Run(status);
+  client_cb(status);
 }
 
 void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
-                                                  const StatusCallback& client_cb,
+                                                  const StdStatusCallback& client_cb,
                                                   const Status& status) {
   // NOTE: lock_ is held here because this is triggered by
   // PendingRounds::AbortOpsAfter() and AdvanceCommittedIndex().
@@ -2249,7 +2270,7 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
   if (!status.ok()) {
     LOG_WITH_PREFIX_UNLOCKED(INFO) << op_type_str << " replication failed: "
                                    << status.ToString();
-    client_cb.Run(status);
+    client_cb(status);
     return;
   }
   VLOG_WITH_PREFIX_UNLOCKED(1) << "Committing " << op_type_str << " with op id "
@@ -2262,7 +2283,7 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
                                    Bind(CrashIfNotOkStatusCB,
                                         "Enqueued commit operation failed to write to WAL")));
 
-  client_cb.Run(status);
+  client_cb(status);
 }
 
 void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, const Status& status) {
@@ -2650,8 +2671,8 @@ ConsensusRound::ConsensusRound(RaftConsensus* consensus,
 }
 
 void ConsensusRound::NotifyReplicationFinished(const Status& status) {
-  if (PREDICT_FALSE(replicated_cb_.is_null())) return;
-  replicated_cb_.Run(status);
+  if (PREDICT_FALSE(!replicated_cb_)) return;
+  replicated_cb_(status);
 }
 
 Status ConsensusRound::CheckBoundTerm(int64_t current_term) const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 7a66045..0eb3652 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -82,9 +82,9 @@ struct ConsensusOptions {
 };
 
 typedef int64_t ConsensusTerm;
-typedef StatusCallback ConsensusReplicatedCallback;
+typedef StdStatusCallback ConsensusReplicatedCallback;
 
-class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
+class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
                       public PeerMessageQueueObserver {
  public:
 
@@ -124,6 +124,7 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
                 RaftPeerPB local_peer_pb,
                 scoped_refptr<ConsensusMetadataManager> cmeta_manager,
                 ThreadPool* raft_pool);
+  ~RaftConsensus();
 
   // Initializes the RaftConsensus object. This should be called before
   // publishing this object to any thread other than the thread that invoked
@@ -166,7 +167,7 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
   // increase the reference count for the provided callbacks.
   scoped_refptr<ConsensusRound> NewRound(
       gscoped_ptr<ReplicateMsg> replicate_msg,
-      const ConsensusReplicatedCallback& replicated_cb);
+      ConsensusReplicatedCallback replicated_cb);
 
   // Call StartElection(), log a warning if the call fails (usually due to
   // being shut down).
@@ -247,7 +248,7 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
 
   // Implement a ChangeConfig() request.
   Status ChangeConfig(const ChangeConfigRequestPB& req,
-                      const StatusCallback& client_cb,
+                      StdStatusCallback client_cb,
                       boost::optional<tserver::TabletServerErrorPB::Code>* error_code);
 
   // Implement an UnsafeChangeConfig() request.
@@ -373,9 +374,6 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
   using LockGuard = std::lock_guard<simple_spinlock>;
   using UniqueLock = std::unique_lock<simple_spinlock>;
 
-  // Private because this class is refcounted.
-  ~RaftConsensus();
-
   // Returns string description for State enum value.
   static const char* State_Name(State state);
 
@@ -386,9 +384,10 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
   // Replicate (as leader) a pre-validated config change. This includes
   // updating the peers and setting the new_configuration as pending.
   // The old_configuration must be the currently-committed configuration.
-  Status ReplicateConfigChangeUnlocked(const RaftConfigPB& old_config,
-                                       const RaftConfigPB& new_config,
-                                       const StatusCallback& client_cb);
+  Status ReplicateConfigChangeUnlocked(
+      const RaftConfigPB& old_config,
+      const RaftConfigPB& new_config,
+      StdStatusCallback client_cb);
 
   // Update the peers and queue to be consistent with a new active configuration.
   // Should only be called by the leader.
@@ -569,7 +568,7 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
   // Calls MarkDirty() if 'status' == OK. Then, always calls 'client_cb' with
   // 'status' as its argument.
   void MarkDirtyOnSuccess(const std::string& reason,
-                          const StatusCallback& client_cb,
+                          const StdStatusCallback& client_cb,
                           const Status& status);
 
   // Attempt to remove the follower with the specified 'uuid' from the config,
@@ -602,7 +601,7 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
   //
   // NOTE: Must be called while holding 'lock_'.
   void NonTxRoundReplicationFinished(ConsensusRound* round,
-                                     const StatusCallback& client_cb,
+                                     const StdStatusCallback& client_cb,
                                      const Status& status);
 
   // As a leader, append a new ConsensusRound to the queue.
@@ -827,7 +826,8 @@ class ConsensusRound : public RefCountedThreadSafe<ConsensusRound> {
  public:
   // Ctor used for leader transactions. Leader transactions can and must specify the
   // callbacks prior to initiating the consensus round.
-  ConsensusRound(RaftConsensus* consensus, gscoped_ptr<ReplicateMsg> replicate_msg,
+  ConsensusRound(RaftConsensus* consensus,
+                 gscoped_ptr<ReplicateMsg> replicate_msg,
                  ConsensusReplicatedCallback replicated_cb);
 
   // Ctor used for follower/learner transactions. These transactions do not use the
@@ -855,8 +855,8 @@ class ConsensusRound : public RefCountedThreadSafe<ConsensusRound> {
   // permanently failed to replicate if 'status' is anything else. If 'status'
   // is OK() then the operation can be applied to the state machine, otherwise
   // the operation should be aborted.
-  void SetConsensusReplicatedCallback(const ConsensusReplicatedCallback& replicated_cb) {
-    replicated_cb_ = replicated_cb;
+  void SetConsensusReplicatedCallback(ConsensusReplicatedCallback replicated_cb) {
+    replicated_cb_ = std::move(replicated_cb);
   }
 
   // If a continuation was set, notifies it that the round has been replicated.

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index f9d2533..491e1ad 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -167,7 +167,7 @@ class RaftConsensusQuorumTest : public KuduTest {
       RaftPeerPB local_peer_pb;
       RETURN_NOT_OK(GetRaftConfigMember(config_, fs_managers_[i]->uuid(), &local_peer_pb));
 
-      scoped_refptr<RaftConsensus> peer(
+      shared_ptr<RaftConsensus> peer(
           new RaftConsensus(options_,
                             config_.peers(i),
                             cmeta_managers_[i],
@@ -184,7 +184,7 @@ class RaftConsensusQuorumTest : public KuduTest {
 
     TestPeerMap all_peers = peers_->GetPeerMapCopy();
     for (int i = 0; i < config_.peers_size(); i++) {
-      scoped_refptr<RaftConsensus> peer;
+      shared_ptr<RaftConsensus> peer;
       RETURN_NOT_OK(peers_->GetPeerByIdx(i, &peer));
 
       gscoped_ptr<PeerProxyFactory> proxy_factory(new LocalTestPeerProxyFactory(peers_.get()));
@@ -218,16 +218,16 @@ class RaftConsensusQuorumTest : public KuduTest {
 
     // Automatically elect the last node in the list.
     const int kLeaderIdx = num - 1;
-    scoped_refptr<RaftConsensus> leader;
+    shared_ptr<RaftConsensus> leader;
     RETURN_NOT_OK(peers_->GetPeerByIdx(kLeaderIdx, &leader));
     RETURN_NOT_OK(leader->EmulateElection());
     return Status::OK();
   }
 
   LocalTestPeerProxy* GetLeaderProxyToPeer(int peer_idx, int leader_idx) {
-    scoped_refptr<RaftConsensus> follower;
+    shared_ptr<RaftConsensus> follower;
     CHECK_OK(peers_->GetPeerByIdx(peer_idx, &follower));
-    scoped_refptr<RaftConsensus> leader;
+    shared_ptr<RaftConsensus> leader;
     CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader));
     for (LocalTestPeerProxy* proxy : down_cast<LocalTestPeerProxyFactory*>(
         leader->peer_proxy_factory_.get())->GetProxies()) {
@@ -246,12 +246,12 @@ class RaftConsensusQuorumTest : public KuduTest {
     msg->mutable_noop_request();
     msg->set_timestamp(clock_->Now().ToUint64());
 
-    scoped_refptr<RaftConsensus> peer;
+    shared_ptr<RaftConsensus> peer;
     CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer));
 
     // Use a latch in place of a Transaction callback.
     gscoped_ptr<Synchronizer> sync(new Synchronizer());
-    *round = peer->NewRound(std::move(msg), sync->AsStatusCallback());
+    *round = peer->NewRound(std::move(msg), sync->AsStdStatusCallback());
     InsertOrDie(&syncs_, round->get(), sync.release());
     RETURN_NOT_OK_PREPEND(peer->Replicate(round->get()),
                           Substitute("Unable to replicate to peer $0", peer_idx));
@@ -289,7 +289,7 @@ class RaftConsensusQuorumTest : public KuduTest {
   }
 
   void WaitForReplicateIfNotAlreadyPresent(const OpId& to_wait_for, int peer_idx) {
-    scoped_refptr<RaftConsensus> peer;
+    shared_ptr<RaftConsensus> peer;
     CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer));
     while (true) {
       if (OpIdCompare(peer->queue_->GetLastOpIdInLog(), to_wait_for) >= 0) {
@@ -307,7 +307,7 @@ class RaftConsensusQuorumTest : public KuduTest {
     MonoDelta timeout(MonoDelta::FromSeconds(10));
     MonoTime start(MonoTime::Now());
 
-    scoped_refptr<RaftConsensus> peer;
+    shared_ptr<RaftConsensus> peer;
     CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer));
 
     int backoff_exp = 0;
@@ -329,7 +329,7 @@ class RaftConsensusQuorumTest : public KuduTest {
                << "op " << to_wait_for << " on replica. Last committed op on replica: "
                << committed.index() << ". Dumping state and quitting.";
     vector<string> lines;
-    scoped_refptr<RaftConsensus> leader;
+    shared_ptr<RaftConsensus> leader;
     CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader));
     for (const string& line : lines) {
       LOG(ERROR) << line;
@@ -385,7 +385,7 @@ class RaftConsensusQuorumTest : public KuduTest {
     }
 
     if (wait_mode == WAIT_FOR_ALL_REPLICAS) {
-      scoped_refptr<RaftConsensus> leader;
+      shared_ptr<RaftConsensus> leader;
       CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader));
 
       TestPeerMap all_peers = peers_->GetPeerMapCopy();
@@ -439,7 +439,7 @@ class RaftConsensusQuorumTest : public KuduTest {
     vector<LogEntryPB*> leader_entries;
     ElementDeleter leader_entry_deleter(&leader_entries);
     GatherLogEntries(leader_idx, logs_[leader_idx], &leader_entries);
-    scoped_refptr<RaftConsensus> leader;
+    shared_ptr<RaftConsensus> leader;
     CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader));
 
     for (int replica_idx = first_replica_idx; replica_idx < last_replica_idx; replica_idx++) {
@@ -447,7 +447,7 @@ class RaftConsensusQuorumTest : public KuduTest {
       ElementDeleter replica_entry_deleter(&replica_entries);
       GatherLogEntries(replica_idx, logs_[replica_idx], &replica_entries);
 
-      scoped_refptr<RaftConsensus> replica;
+      shared_ptr<RaftConsensus> replica;
       CHECK_OK(peers_->GetPeerByIdx(replica_idx, &replica));
       VerifyReplica(leader_entries,
                     replica_entries,
@@ -661,7 +661,7 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind) {
   {
     // lock one of the replicas down by obtaining the state lock
     // and never letting it go.
-    scoped_refptr<RaftConsensus> follower0;
+    shared_ptr<RaftConsensus> follower0;
     CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
 
     RaftConsensus::LockGuard l(follower0->lock_);
@@ -704,11 +704,11 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind) {
   {
     // lock two of the replicas down by obtaining the state locks
     // and never letting them go.
-    scoped_refptr<RaftConsensus> follower0;
+    shared_ptr<RaftConsensus> follower0;
     CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
     RaftConsensus::LockGuard l_0(follower0->lock_);
 
-    scoped_refptr<RaftConsensus> follower1;
+    shared_ptr<RaftConsensus> follower1;
     CHECK_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1));
     RaftConsensus::LockGuard l_1(follower1->lock_);
 
@@ -811,14 +811,14 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderHeartbeats) {
 
   ASSERT_OK(BuildConfig(3));
 
-  scoped_refptr<RaftConsensus> follower0;
+  shared_ptr<RaftConsensus> follower0;
   CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
-  scoped_refptr<RaftConsensus> follower1;
+  shared_ptr<RaftConsensus> follower1;
   CHECK_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1));
 
   ASSERT_OK(StartPeers());
 
-  scoped_refptr<RaftConsensus> leader;
+  shared_ptr<RaftConsensus> leader;
   CHECK_OK(peers_->GetPeerByIdx(kLeaderIdx, &leader));
   ASSERT_OK(leader->EmulateElection());
 
@@ -884,13 +884,13 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) {
 
     // Now shutdown the current leader.
     LOG(INFO) << "Shutting down current leader with index " << (current_config_size - 1);
-    scoped_refptr<RaftConsensus> current_leader;
+    shared_ptr<RaftConsensus> current_leader;
     CHECK_OK(peers_->GetPeerByIdx(current_config_size - 1, &current_leader));
     current_leader->Shutdown();
     peers_->RemovePeer(current_leader->peer_uuid());
 
     // ... and make the peer before it become leader.
-    scoped_refptr<RaftConsensus> new_leader;
+    shared_ptr<RaftConsensus> new_leader;
     CHECK_OK(peers_->GetPeerByIdx(current_config_size - 2, &new_leader));
 
     // This will force an election in which we expect to make the last
@@ -951,10 +951,10 @@ TEST_F(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty) {
   ConsensusRequestPB req;
   ConsensusResponsePB resp;
 
-  scoped_refptr<RaftConsensus> leader;
+  shared_ptr<RaftConsensus> leader;
   CHECK_OK(peers_->GetPeerByIdx(2, &leader));
 
-  scoped_refptr<RaftConsensus> follower;
+  shared_ptr<RaftConsensus> follower;
   CHECK_OK(peers_->GetPeerByIdx(0, &follower));
 
   req.set_caller_uuid(leader->peer_uuid());
@@ -1018,7 +1018,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   ASSERT_TRUE(OpIdLessThan(MinimumOpId(), last_op_id));
 
   const int kPeerIndex = 1;
-  scoped_refptr<RaftConsensus> peer;
+  shared_ptr<RaftConsensus> peer;
   CHECK_OK(peers_->GetPeerByIdx(kPeerIndex, &peer));
   auto flush_count = [&]() {
     return peer->consensus_metadata_for_tests()->flush_count_for_tests();

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 5ea25e8..5b7c0d5 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1102,7 +1102,7 @@ bool CatalogManager::IsInitialized() const {
 }
 
 RaftPeerPB::Role CatalogManager::Role() const {
-  scoped_refptr<consensus::RaftConsensus> consensus;
+  shared_ptr<consensus::RaftConsensus> consensus;
   {
     std::lock_guard<simple_spinlock> l(state_lock_);
     if (state_ == kRunning) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 5816484..7ca586b 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -274,7 +274,7 @@ Status SysCatalogTable::CreateDistributedConfig(const MasterOptions& options,
 
 void SysCatalogTable::SysCatalogStateChanged(const string& tablet_id, const string& reason) {
   CHECK_EQ(tablet_id, tablet_replica_->tablet_id());
-  scoped_refptr<consensus::RaftConsensus> consensus  = tablet_replica_->shared_consensus();
+  shared_ptr<consensus::RaftConsensus> consensus = tablet_replica_->shared_consensus();
   if (!consensus) {
     LOG_WITH_PREFIX(WARNING) << "Received notification of tablet state change "
                              << "but tablet no longer running. Tablet ID: "

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/tablet/tablet_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index 07679d1..2a87fa0 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -18,6 +18,7 @@
 #include "kudu/tablet/tablet_replica.h"
 
 #include <algorithm>
+#include <functional>
 #include <memory>
 #include <mutex>
 #include <string>
@@ -157,8 +158,10 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
     TRACE("Creating consensus instance");
     ConsensusOptions options;
     options.tablet_id = meta_->tablet_id();
-    scoped_refptr<RaftConsensus> consensus(new RaftConsensus(std::move(options), local_peer_pb_,
-                                                             cmeta_manager_, raft_pool));
+    shared_ptr<RaftConsensus> consensus(std::make_shared<RaftConsensus>(std::move(options),
+                                                                        local_peer_pb_,
+                                                                        cmeta_manager_,
+                                                                        raft_pool));
     RETURN_NOT_OK(consensus->Init());
 
     scoped_refptr<MetricEntity> metric_entity;
@@ -545,9 +548,11 @@ Status TabletReplica::StartReplicaTransaction(const scoped_refptr<ConsensusRound
   scoped_refptr<TransactionDriver> driver;
   RETURN_NOT_OK(NewReplicaTransactionDriver(std::move(transaction), &driver));
 
-  // Unretained is required to avoid a refcount cycle.
+  // A raw pointer is required to avoid a refcount cycle.
   state->consensus_round()->SetConsensusReplicatedCallback(
-      Bind(&TransactionDriver::ReplicationFinished, Unretained(driver.get())));
+      std::bind(&TransactionDriver::ReplicationFinished,
+                driver.get(),
+                std::placeholders::_1));
 
   RETURN_NOT_OK(driver->ExecuteAsync());
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/tablet/tablet_replica.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index e813ba4..307ddd9 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -131,7 +131,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
     return consensus_.get();
   }
 
-  scoped_refptr<consensus::RaftConsensus> shared_consensus() const {
+  std::shared_ptr<consensus::RaftConsensus> shared_consensus() const {
     std::lock_guard<simple_spinlock> lock(lock_);
     return consensus_;
   }
@@ -304,7 +304,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   scoped_refptr<log::Log> log_;
   std::shared_ptr<Tablet> tablet_;
   std::shared_ptr<rpc::Messenger> messenger_;
-  scoped_refptr<consensus::RaftConsensus> consensus_;
+  std::shared_ptr<consensus::RaftConsensus> consensus_;
   simple_spinlock prepare_replicate_lock_;
 
   // Lock protecting state_, last_status_, as well as smart pointers to collaborating

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/tablet/transactions/transaction_driver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.cc b/src/kudu/tablet/transactions/transaction_driver.cc
index 0e73d1b..ebee430 100644
--- a/src/kudu/tablet/transactions/transaction_driver.cc
+++ b/src/kudu/tablet/transactions/transaction_driver.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/tablet/transactions/transaction_driver.h"
 
+#include <functional>
 #include <mutex>
 
 #include "kudu/consensus/time_manager.h"
@@ -133,10 +134,12 @@ Status TransactionDriver::Init(gscoped_ptr<Transaction> transaction,
     gscoped_ptr<ReplicateMsg> replicate_msg;
     transaction_->NewReplicateMsg(&replicate_msg);
     if (consensus_) { // sometimes NULL in tests
-      // Unretained is required to avoid a refcount cycle.
+      // A raw pointer is required to avoid a refcount cycle.
       mutable_state()->set_consensus_round(
         consensus_->NewRound(std::move(replicate_msg),
-                             Bind(&TransactionDriver::ReplicationFinished, Unretained(this))));
+                             std::bind(&TransactionDriver::ReplicationFinished,
+                                       this,
+                                       std::placeholders::_1)));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/tserver/tablet_copy_source_session.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_source_session.cc b/src/kudu/tserver/tablet_copy_source_session.cc
index 85faa49..f470647 100644
--- a/src/kudu/tserver/tablet_copy_source_session.cc
+++ b/src/kudu/tserver/tablet_copy_source_session.cc
@@ -120,7 +120,7 @@ Status TabletCopySourceSession::Init() {
   // We do this after snapshotting the log to avoid a scenario where the latest
   // entry in the log has a term higher than the term stored in the consensus
   // metadata, which will results in a CHECK failure on RaftConsensus init.
-  scoped_refptr<consensus::RaftConsensus> consensus = tablet_replica_->shared_consensus();
+  shared_ptr<consensus::RaftConsensus> consensus = tablet_replica_->shared_consensus();
   if (!consensus) {
     tablet::TabletStatePB tablet_state = tablet_replica_->state();
     return Status::IllegalState(Substitute(

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 68fa88b..c430982 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -18,12 +18,14 @@
 #include "kudu/tserver/tablet_service.h"
 
 #include <algorithm>
-#include <boost/optional.hpp>
+#include <functional>
 #include <memory>
 #include <string>
 #include <unordered_set>
 #include <vector>
 
+#include <boost/optional.hpp>
+
 #include "kudu/common/iterator.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
@@ -214,7 +216,7 @@ template<class RespClass>
 bool GetConsensusOrRespond(const scoped_refptr<TabletReplica>& replica,
                            RespClass* resp,
                            rpc::RpcContext* context,
-                           scoped_refptr<RaftConsensus>* consensus) {
+                           shared_ptr<RaftConsensus>* consensus) {
   *consensus = replica->shared_consensus();
   if (!*consensus) {
     Status s = Status::ServiceUnavailable("Raft Consensus unavailable. Tablet not running");
@@ -255,8 +257,15 @@ void HandleResponse(const ReqType* req, RespType* resp,
 }
 
 template <class ReqType, class RespType>
-static StatusCallback BindHandleResponse(const ReqType* req, RespType* resp, RpcContext* context) {
-  return Bind(&HandleResponse<ReqType, RespType>, req, resp, context);
+static StdStatusCallback BindHandleResponse(
+    const ReqType* req,
+    RespType* resp,
+    RpcContext* context) {
+  return std::bind(&HandleResponse<ReqType, RespType>,
+                   req,
+                   resp,
+                   context,
+                   std::placeholders::_1);
 }
 
 } // namespace
@@ -856,7 +865,7 @@ void ConsensusServiceImpl::UpdateConsensus(const ConsensusRequestPB* req,
   replica->permanent_uuid();
 
   // Submit the update directly to the TabletReplica's RaftConsensus instance.
-  scoped_refptr<RaftConsensus> consensus;
+  shared_ptr<RaftConsensus> consensus;
   if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
   Status s = consensus->Update(req, resp);
   if (PREDICT_FALSE(!s.ok())) {
@@ -886,7 +895,7 @@ void ConsensusServiceImpl::RequestConsensusVote(const VoteRequestPB* req,
   }
 
   // Submit the vote request directly to the consensus instance.
-  scoped_refptr<RaftConsensus> consensus;
+  shared_ptr<RaftConsensus> consensus;
   if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
   Status s = consensus->RequestVote(req, resp);
   if (PREDICT_FALSE(!s.ok())) {
@@ -911,7 +920,7 @@ void ConsensusServiceImpl::ChangeConfig(const ChangeConfigRequestPB* req,
     return;
   }
 
-  scoped_refptr<RaftConsensus> consensus;
+  shared_ptr<RaftConsensus> consensus;
   if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
   boost::optional<TabletServerErrorPB::Code> error_code;
   Status s = consensus->ChangeConfig(*req, BindHandleResponse(req, resp, context), &error_code);
@@ -935,7 +944,7 @@ void ConsensusServiceImpl::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB*
     return;
   }
 
-  scoped_refptr<RaftConsensus> consensus;
+  shared_ptr<RaftConsensus> consensus;
   if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
   TabletServerErrorPB::Code error_code;
   Status s = consensus->UnsafeChangeConfig(*req, &error_code);
@@ -966,7 +975,7 @@ void ConsensusServiceImpl::RunLeaderElection(const RunLeaderElectionRequestPB* r
     return;
   }
 
-  scoped_refptr<RaftConsensus> consensus;
+  shared_ptr<RaftConsensus> consensus;
   if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
   Status s = consensus->StartElection(
       consensus::RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE,
@@ -992,7 +1001,7 @@ void ConsensusServiceImpl::LeaderStepDown(const LeaderStepDownRequestPB* req,
     return;
   }
 
-  scoped_refptr<RaftConsensus> consensus;
+  shared_ptr<RaftConsensus> consensus;
   if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
   Status s = consensus->StepDown(resp);
   if (PREDICT_FALSE(!s.ok())) {
@@ -1022,7 +1031,7 @@ void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *re
                          TabletServerErrorPB::TABLET_NOT_RUNNING, context);
     return;
   }
-  scoped_refptr<RaftConsensus> consensus;
+  shared_ptr<RaftConsensus> consensus;
   if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
   if (PREDICT_FALSE(req->opid_type() == consensus::UNKNOWN_OPID_TYPE)) {
     HandleUnknownError(Status::InvalidArgument("Invalid opid_type specified to GetLastOpId()"),
@@ -1057,7 +1066,7 @@ void ConsensusServiceImpl::GetConsensusState(const consensus::GetConsensusStateR
       continue;
     }
 
-    scoped_refptr<RaftConsensus> consensus(replica->shared_consensus());
+    shared_ptr<RaftConsensus> consensus(replica->shared_consensus());
     if (!consensus) {
       continue;
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 079ad89..3444968 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -620,7 +620,7 @@ Status TSTabletManager::DeleteTablet(
   // restarting the tablet if the local replica committed a higher config
   // change op during that time, or potentially something else more invasive.
   if (cas_config_opid_index_less_or_equal && !tablet_deleted) {
-    scoped_refptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
+    shared_ptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
     if (!consensus) {
       *error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
       return Status::IllegalState("Raft Consensus not available. Tablet shutting down");
@@ -953,7 +953,7 @@ void TSTabletManager::CreateReportedTabletPB(const string& tablet_id,
   reported_tablet->set_schema_version(replica->tablet_metadata()->schema_version());
 
   // We cannot get consensus state information unless the TabletReplica is running.
-  scoped_refptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
+  shared_ptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
   if (consensus) {
     *reported_tablet->mutable_consensus_state() = consensus->ConsensusState();
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/tserver/tserver-path-handlers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tserver-path-handlers.cc b/src/kudu/tserver/tserver-path-handlers.cc
index 26220dd..476832d 100644
--- a/src/kudu/tserver/tserver-path-handlers.cc
+++ b/src/kudu/tserver/tserver-path-handlers.cc
@@ -251,7 +251,7 @@ void TabletServerPathHandlers::HandleTabletsPage(const Webserver::WebRequest& re
                                  .PartitionDebugString(replica->tablet_metadata()->partition(),
                                                        replica->tablet_metadata()->schema());
 
-      scoped_refptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
+      shared_ptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
       (*output) << Substitute(
           // Table name, tablet id, partition
           "<tr><td>$0</td><td>$1</td><td>$2</td>"
@@ -456,7 +456,7 @@ void TabletServerPathHandlers::HandleConsensusStatusPage(const Webserver::WebReq
   string id;
   scoped_refptr<TabletReplica> replica;
   if (!LoadTablet(tserver_, req, &id, &replica, output)) return;
-  scoped_refptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
+  shared_ptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
   if (!consensus) {
     *output << "Tablet " << EscapeForHtmlToString(id) << " not running";
     return;

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/util/async_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/async_util.h b/src/kudu/util/async_util.h
index 1e2830c..727b7f7 100644
--- a/src/kudu/util/async_util.h
+++ b/src/kudu/util/async_util.h
@@ -19,6 +19,8 @@
 #ifndef KUDU_UTIL_ASYNC_UTIL_H
 #define KUDU_UTIL_ASYNC_UTIL_H
 
+#include <functional>
+
 #include "kudu/gutil/bind.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/util/countdown_latch.h"
@@ -35,12 +37,14 @@ namespace kudu {
 class Synchronizer {
  public:
   Synchronizer()
-    : l(1) {
+    : l_(1) {
   }
+
   void StatusCB(const Status& status) {
-    s = status;
-    l.CountDown();
+    s_ = status;
+    l_.CountDown();
   }
+
   StatusCallback AsStatusCallback() {
     // Synchronizers are often declared on the stack, so it doesn't make
     // sense for a callback to take a reference to its synchronizer.
@@ -49,23 +53,37 @@ class Synchronizer {
     // its synchronizer.
     return Bind(&Synchronizer::StatusCB, Unretained(this));
   }
+
+  StdStatusCallback AsStdStatusCallback() {
+    // Synchronizers are often declared on the stack, so it doesn't make
+    // sense for a callback to take a reference to its synchronizer.
+    //
+    // Note: this means the returned callback _must_ go out of scope before
+    // its synchronizer.
+    return std::bind(&Synchronizer::StatusCB, this, std::placeholders::_1);
+  }
+
   Status Wait() {
-    l.Wait();
-    return s;
+    l_.Wait();
+    return s_;
   }
+
   Status WaitFor(const MonoDelta& delta) {
-    if (PREDICT_FALSE(!l.WaitFor(delta))) {
+    if (PREDICT_FALSE(!l_.WaitFor(delta))) {
       return Status::TimedOut("Timed out while waiting for the callback to be called.");
     }
-    return s;
+    return s_;
   }
+
   void Reset() {
-    l.Reset(1);
+    l_.Reset(1);
   }
+
  private:
+  Status s_;
+  CountDownLatch l_;
+
   DISALLOW_COPY_AND_ASSIGN(Synchronizer);
-  Status s;
-  CountDownLatch l;
 };
 
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c0276a9/src/kudu/util/status_callback.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/status_callback.h b/src/kudu/util/status_callback.h
index 3a36b83..70bbb97 100644
--- a/src/kudu/util/status_callback.h
+++ b/src/kudu/util/status_callback.h
@@ -17,6 +17,7 @@
 #ifndef KUDU_UTIL_STATUS_CALLBACK_H
 #define KUDU_UTIL_STATUS_CALLBACK_H
 
+#include <functional>
 #include <string>
 
 #include "kudu/gutil/callback_forward.h"
@@ -29,6 +30,11 @@ class Status;
 // produce asynchronous results and may fail.
 typedef Callback<void(const Status& status)> StatusCallback;
 
+// Like StatusCallback but uses the STL function objects.
+//
+// TODO(adar): should eventually replace all StatusCallback usage with this.
+typedef std::function<void(const Status& status)> StdStatusCallback;
+
 // To be used when a function signature requires a StatusCallback but none
 // is needed.
 extern void DoNothingStatusCB(const Status& status);


Mime
View raw message