kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [7/7] incubator-kudu git commit: master: handle RPC responses on reactor threads
Date Thu, 07 Jul 2016 18:33:36 GMT
master: handle RPC responses on reactor threads

The introduction of the election rwlock requires that the handling of RPC
responses move off of the singleton thread pool used for invoking the
election callback, otherwise waiting on outstanding RPCs could lead to a
deadlock. Rather than introduce a new thread pool, I observed that none of
the responses take a lock or perform IO, so we can simplify the code a bit
by operating directly on the reactor threads.

There is one exception to the above statement: retried RPCs invoke Run() on
the reactor thread, and Run() can (theoretically) block on DNS resolution.
This has always been the case, though, so maybe we don't care?

Change-Id: I1af5a77ffa66fcc39f71243529a40aae2b542971
Reviewed-on: http://gerrit.cloudera.org:8080/3583
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/0212b7ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/0212b7ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/0212b7ec

Branch: refs/heads/master
Commit: 0212b7ecd59e1ae0054cfd6d227dbf7d8e8028f2
Parents: fb41e96
Author: Adar Dembo <adar@cloudera.com>
Authored: Wed Jul 6 18:36:22 2016 -0700
Committer: Adar Dembo <adar@cloudera.com>
Committed: Thu Jul 7 18:32:45 2016 +0000

----------------------------------------------------------------------
 src/kudu/master/catalog_manager.cc | 45 ++++++++++-----------------------
 src/kudu/master/catalog_manager.h  |  5 ++--
 2 files changed, 16 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/0212b7ec/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 1073a83..819b3a3 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -536,7 +536,7 @@ CatalogManager::CatalogManager(Master *master)
            // (to correctly serialize invocations of ElectedAsLeaderCb upon
            // closely timed consecutive elections).
            .set_max_threads(1)
-           .Build(&worker_pool_));
+           .Build(&leader_election_pool_));
 }
 
 CatalogManager::~CatalogManager() {
@@ -576,7 +576,7 @@ Status CatalogManager::Init(bool is_first_run) {
 }
 
 Status CatalogManager::ElectedAsLeaderCb() {
-  return worker_pool_->SubmitClosure(
+  return leader_election_pool_->SubmitClosure(
       Bind(&CatalogManager::VisitTablesAndTabletsTask, Unretained(this)));
 }
 
@@ -736,7 +736,7 @@ void CatalogManager::Shutdown() {
   //
   // Must be done before shutting down the catalog, otherwise its tablet peer
   // may be destroyed while still in use by a table visitor.
-  worker_pool_->Shutdown();
+  leader_election_pool_->Shutdown();
 
   // Shut down the underlying storage for tables and tablets.
   if (sys_catalog_) {
@@ -1855,11 +1855,9 @@ class PickLeaderReplica : public TSPicker {
 class RetryingTSRpcTask : public MonitoredTask {
  public:
   RetryingTSRpcTask(Master *master,
-                    ThreadPool* callback_pool,
                     gscoped_ptr<TSPicker> replica_picker,
                     const scoped_refptr<TableInfo>& table)
     : master_(master),
-      callback_pool_(callback_pool),
       replica_picker_(std::move(replica_picker)),
       table_(table),
       start_ts_(MonoTime::Now(MonoTime::FINE)),
@@ -1915,6 +1913,8 @@ class RetryingTSRpcTask : public MonitoredTask {
   // be called to mutate the state_ variable. If retry is desired, then
   // no state change is made. Retries will automatically be attempted as long
   // as the state is kStateRunning and deadline_ has not yet passed.
+  //
+  // Runs on the reactor thread, so must not block or perform any IO.
   virtual void HandleResponse(int attempt) = 0;
 
   // Return the id of the tablet that is the subject of the async request.
@@ -1941,18 +1941,9 @@ class RetryingTSRpcTask : public MonitoredTask {
   }
 
   // Callback meant to be invoked from asynchronous RPC service proxy calls.
+  //
+  // Runs on a reactor thread, so should not block or do any IO.
   void RpcCallback() {
-    // Defer the actual work of the callback off of the reactor thread.
-    // This is necessary because our callbacks often do synchronous writes to
-    // the catalog table, and we can't do synchronous IO on the reactor.
-    CHECK_OK(callback_pool_->SubmitClosure(
-                 Bind(&RetryingTSRpcTask::DoRpcCallback,
-                      Unretained(this))));
-  }
-
-  // Handle the actual work of the RPC callback. This is run on the master's worker
-  // pool, rather than a reactor thread, so it may do blocking IO operations.
-  void DoRpcCallback() {
     if (!rpc_.status().ok()) {
       LOG(WARNING) << "TS " << target_ts_desc_->permanent_uuid() <<
": "
                    << type_name() << " RPC failed for tablet "
@@ -1970,7 +1961,6 @@ class RetryingTSRpcTask : public MonitoredTask {
   }
 
   Master * const master_;
-  ThreadPool* const callback_pool_;
   const gscoped_ptr<TSPicker> replica_picker_;
   const scoped_refptr<TableInfo> table_;
 
@@ -2086,11 +2076,9 @@ class RetryingTSRpcTask : public MonitoredTask {
 class RetrySpecificTSRpcTask : public RetryingTSRpcTask {
  public:
   RetrySpecificTSRpcTask(Master* master,
-                         ThreadPool* callback_pool,
                          const string& permanent_uuid,
                          const scoped_refptr<TableInfo>& table)
     : RetryingTSRpcTask(master,
-                        callback_pool,
                         gscoped_ptr<TSPicker>(new PickSpecificUUID(permanent_uuid)),
                         table),
       permanent_uuid_(permanent_uuid) {
@@ -2108,11 +2096,10 @@ class AsyncCreateReplica : public RetrySpecificTSRpcTask {
 
   // The tablet lock must be acquired for reading before making this call.
   AsyncCreateReplica(Master *master,
-                     ThreadPool *callback_pool,
                      const string& permanent_uuid,
                      const scoped_refptr<TabletInfo>& tablet,
                      const TabletMetadataLock& tablet_lock)
-    : RetrySpecificTSRpcTask(master, callback_pool, permanent_uuid, tablet->table().get()),
+    : RetrySpecificTSRpcTask(master, permanent_uuid, tablet->table().get()),
       tablet_id_(tablet->tablet_id()) {
     deadline_ = start_ts_;
     deadline_.AddDelta(MonoDelta::FromMilliseconds(FLAGS_tablet_creation_timeout_ms));
@@ -2175,12 +2162,12 @@ class AsyncCreateReplica : public RetrySpecificTSRpcTask {
 class AsyncDeleteReplica : public RetrySpecificTSRpcTask {
  public:
   AsyncDeleteReplica(
-      Master* master, ThreadPool* callback_pool, const string& permanent_uuid,
+      Master* master, const string& permanent_uuid,
       const scoped_refptr<TableInfo>& table, std::string tablet_id,
       TabletDataState delete_type,
       boost::optional<int64_t> cas_config_opid_index_less_or_equal,
       string reason)
-      : RetrySpecificTSRpcTask(master, callback_pool, permanent_uuid, table),
+      : RetrySpecificTSRpcTask(master, permanent_uuid, table),
         tablet_id_(std::move(tablet_id)),
         delete_type_(delete_type),
         cas_config_opid_index_less_or_equal_(
@@ -2269,10 +2256,8 @@ class AsyncDeleteReplica : public RetrySpecificTSRpcTask {
 class AsyncAlterTable : public RetryingTSRpcTask {
  public:
   AsyncAlterTable(Master *master,
-                  ThreadPool* callback_pool,
                   const scoped_refptr<TabletInfo>& tablet)
     : RetryingTSRpcTask(master,
-                        callback_pool,
                         gscoped_ptr<TSPicker>(new PickLeaderReplica(tablet)),
                         tablet->table().get()),
       tablet_(tablet) {
@@ -2368,11 +2353,9 @@ bool SelectRandomTSForReplica(const TSDescriptorVector& ts_descs,
 class AsyncAddServerTask : public RetryingTSRpcTask {
  public:
   AsyncAddServerTask(Master *master,
-                     ThreadPool* callback_pool,
                      const scoped_refptr<TabletInfo>& tablet,
                      const ConsensusStatePB& cstate)
     : RetryingTSRpcTask(master,
-                        callback_pool,
                         gscoped_ptr<TSPicker>(new PickLeaderReplica(tablet)),
                         tablet->table()),
       tablet_(tablet),
@@ -2496,7 +2479,7 @@ void CatalogManager::SendAlterTableRequest(const scoped_refptr<TableInfo>&
table
 }
 
 void CatalogManager::SendAlterTabletRequest(const scoped_refptr<TabletInfo>& tablet)
{
-  auto call = new AsyncAlterTable(master_, worker_pool_.get(), tablet);
+  auto call = new AsyncAlterTable(master_, tablet);
   tablet->table()->AddTask(call);
   WARN_NOT_OK(call->Run(), "Failed to send alter table request");
 }
@@ -2546,7 +2529,7 @@ void CatalogManager::SendDeleteReplicaRequest(
                                       TabletDataState_Name(delete_type),
                                       reason);
   AsyncDeleteReplica* call =
-      new AsyncDeleteReplica(master_, worker_pool_.get(), ts_uuid, table,
+      new AsyncDeleteReplica(master_, ts_uuid, table,
                              tablet_id, delete_type, cas_config_opid_index_less_or_equal,
                              reason);
   if (table != nullptr) {
@@ -2561,7 +2544,7 @@ void CatalogManager::SendDeleteReplicaRequest(
 
 void CatalogManager::SendAddServerRequest(const scoped_refptr<TabletInfo>& tablet,
                                           const ConsensusStatePB& cstate) {
-  auto task = new AsyncAddServerTask(master_, worker_pool_.get(), tablet, cstate);
+  auto task = new AsyncAddServerTask(master_, tablet, cstate);
   tablet->table()->AddTask(task);
   WARN_NOT_OK(task->Run(), "Failed to send new AddServer request");
 
@@ -2851,7 +2834,7 @@ void CatalogManager::SendCreateTabletRequest(const scoped_refptr<TabletInfo>&
ta
       tablet_lock.data().pb.committed_consensus_state().config();
   tablet->set_last_create_tablet_time(MonoTime::Now(MonoTime::FINE));
   for (const RaftPeerPB& peer : config.peers()) {
-    AsyncCreateReplica* task = new AsyncCreateReplica(master_, worker_pool_.get(),
+    AsyncCreateReplica* task = new AsyncCreateReplica(master_,
                                                       peer.permanent_uuid(),
                                                       tablet, tablet_lock);
     tablet->table()->AddTask(task);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/0212b7ec/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 75d6d8e..bc2e8e1 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -613,9 +613,8 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
   mutable simple_spinlock state_lock_;
   State state_;
 
-  // Used to defer work from reactor threads onto a thread where
-  // blocking behavior is permissible.
-  gscoped_ptr<ThreadPool> worker_pool_;
+  // Singleton pool that serializes invocations of ElectedAsLeaderCb().
+  gscoped_ptr<ThreadPool> leader_election_pool_;
 
   // This field is updated when a node becomes leader master,
   // waits for all outstanding uncommitted metadata (table and tablet metadata)


Mime
View raw message