kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject kudu git commit: consensus_peers: capture weak refs in functors submitted to thread pools
Date Tue, 01 Aug 2017 22:02:17 GMT
Repository: kudu
Updated Branches:
  refs/heads/master de43ffe77 -> 9f7fed68c


consensus_peers: capture weak refs in functors submitted to thread pools

By capturing weak refs we allow peers to be destroyed earlier, closer to
last-user-facing-ref-was-dropped time. While this reduces the amount of
"unproductive" submitted tasks processed, I think the real benefit is a
clearer expression of lifecycle: we capture weak refs where we do not wish
to prolong the Peer's lifespan.

Note: this change is less useful than I had hoped due to the continued
strong ref capture in async proxy calls. Weak refs don't work there as the
krpc subsystem requires response objects to stay alive right up until the
supplied callback is invoked.

Change-Id: I6dbb777e7e141f1f5bf48026779f1d4b26185195
Reviewed-on: http://gerrit.cloudera.org:8080/7543
Reviewed-by: Todd Lipcon <todd@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9f7fed68
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9f7fed68
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9f7fed68

Branch: refs/heads/master
Commit: 9f7fed68c1e72b6878940e2d876eb66023e48f72
Parents: de43ffe
Author: Adar Dembo <adar@cloudera.com>
Authored: Wed Jul 19 20:17:31 2017 -0700
Committer: Adar Dembo <adar@cloudera.com>
Committed: Tue Aug 1 22:01:33 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus_peers.cc | 58 ++++++++++++++++--------------
 src/kudu/consensus/consensus_peers.h  |  2 +-
 2 files changed, 32 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/9f7fed68/src/kudu/consensus/consensus_peers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index 751621c..bbf48ce 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -79,6 +79,7 @@ namespace kudu {
 namespace consensus {
 
 using std::shared_ptr;
+using std::weak_ptr;
 using rpc::Messenger;
 using rpc::RpcController;
 using strings::Substitute;
@@ -131,7 +132,7 @@ Status Peer::Init() {
   }
 
   // Schedule the first heartbeat.
-  ScheduleNextHeartbeatAndMaybeSignalRequest(Status::OK());
+  ScheduleNextHeartbeatAndMaybeSignalRequest();
   return Status::OK();
 }
 
@@ -142,12 +143,14 @@ Status Peer::SignalRequest(bool even_if_queue_empty) {
     return Status::IllegalState("Peer was closed.");
   }
 
-  // Capture a shared_ptr reference into the submitted functor so that we're
-  // guaranteed that this object outlives the functor.
-  shared_ptr<Peer> s_this = shared_from_this();
-  RETURN_NOT_OK(raft_pool_token_->SubmitFunc([even_if_queue_empty, s_this]() {
-        s_this->SendNextRequest(even_if_queue_empty);
-      }));
+  // Capture a weak_ptr reference into the submitted functor so that we can
+  // safely handle the functor outliving its peer.
+  weak_ptr<Peer> w_this = shared_from_this();
+  RETURN_NOT_OK(raft_pool_token_->SubmitFunc([even_if_queue_empty, w_this]() {
+    if (auto p = w_this.lock()) {
+      p->SendNextRequest(even_if_queue_empty);
+    }
+  }));
   return Status::OK();
 }
 
@@ -295,11 +298,14 @@ void Peer::ProcessResponse() {
   // of the response handling logic on our thread pool and not on the reactor
   // thread.
   //
-  // Capture a shared_ptr reference into the submitted functor so that we're
-  // guaranteed that this object outlives the functor.
-  shared_ptr<Peer> s_this = shared_from_this();
-  Status s = raft_pool_token_->SubmitFunc([s_this]() {
-    s_this->DoProcessResponse();
+  // Capture a weak_ptr reference into the submitted functor so that we can
+  // safely handle the functor outliving its peer.
+  weak_ptr<Peer> w_this = shared_from_this();
+  Status s = raft_pool_token_->SubmitFunc([w_this]() {
+    if (auto p = w_this.lock()) {
+      p->DoProcessResponse();
+
+    }
   });
   if (PREDICT_FALSE(!s.ok())) {
     LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to process peer response: " <<
s.ToString()
@@ -408,12 +414,7 @@ Peer::~Peer() {
   request_.mutable_ops()->ExtractSubrange(0, request_.ops_size(), nullptr);
 }
 
-void Peer::ScheduleNextHeartbeatAndMaybeSignalRequest(const Status& status) {
-  if (!status.ok()) {
-    // The reactor was shut down.
-    return;
-  }
-
+void Peer::ScheduleNextHeartbeatAndMaybeSignalRequest() {
   // We must schedule the next callback with the lowest possible jittered
   // time as the delay so that if SendNextRequest() resets the next heartbeat
   // time with a very low value, the scheduled callback can still honor it.
@@ -446,15 +447,18 @@ void Peer::ScheduleNextHeartbeatAndMaybeSignalRequest(const Status&
status) {
     SignalRequest(true);
   }
 
-  // Capture a shared_ptr reference into the submitted functor so that we're
-  // guaranteed that this object outlives the functor.
-  //
-  // Note: we use std::bind and not boost::bind here because the latter doesn't
-  // work with std::shared_ptr.
-  shared_ptr<Peer> s_this = shared_from_this();
-  messenger_->ScheduleOnReactor(
-      std::bind(&Peer::ScheduleNextHeartbeatAndMaybeSignalRequest, s_this,
-                std::placeholders::_1), schedule_delay);
+  // Capture a weak_ptr reference into the submitted functor so that we can
+  // safely handle the functor outliving its peer.
+  weak_ptr<Peer> w_this = shared_from_this();
+  messenger_->ScheduleOnReactor([w_this](const Status& s) {
+    if (!s.ok()) {
+      // The reactor was shut down.
+      return;
+    }
+    if (auto p = w_this.lock()) {
+      p->ScheduleNextHeartbeatAndMaybeSignalRequest();
+    }
+  }, schedule_delay);
 }
 
 MonoDelta Peer::GetHeartbeatJitterLowerBound() {

http://git-wip-us.apache.org/repos/asf/kudu/blob/9f7fed68/src/kudu/consensus/consensus_peers.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.h b/src/kudu/consensus/consensus_peers.h
index 3dfc072..5b3662c 100644
--- a/src/kudu/consensus/consensus_peers.h
+++ b/src/kudu/consensus/consensus_peers.h
@@ -156,7 +156,7 @@ class Peer : public std::enable_shared_from_this<Peer> {
   // Schedules the next heartbeat for this peer, optionally sending a heartbeat
   // now if it makes sense to do so. Initially called from Init() to schedule
   // the first heartbeat, and subsequently as a callback running on a reactor thread.
-  void ScheduleNextHeartbeatAndMaybeSignalRequest(const Status& status);
+  void ScheduleNextHeartbeatAndMaybeSignalRequest();
 
   // Resets the next time that we should heartbeat to this peer. Does not
   // perform any actual scheduling.


Mime
View raw message