kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [3/3] kudu git commit: server: consolidate tablet prepare pools
Date Wed, 28 Jun 2017 00:05:39 GMT
server: consolidate tablet prepare pools

Using the new serial token functionality available in ThreadPool, we can now
safely consolidate all per-tablet prepare pools into a single server-wide
pool. Each replica allocates a token with which to submit to the prepare
pool, attaching the existing tablet-specific metrics to it.

The prepare pool is configured with no upper bound on the number of running
threads because prepare tasks can block on row/schema locks.

Change-Id: Ic393f739cc9c1b267142b20e04a1aaa5aed97cb0
Reviewed-on: http://gerrit.cloudera.org:8080/7150
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/74d67fd2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/74d67fd2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/74d67fd2

Branch: refs/heads/master
Commit: 74d67fd2b7c8ad9c315f0b9894a49616e00e235d
Parents: 31b852f
Author: Adar Dembo <adar@cloudera.com>
Authored: Mon Jun 12 01:34:56 2017 -0700
Committer: Todd Lipcon <todd@apache.org>
Committed: Wed Jun 28 00:03:14 2017 +0000

----------------------------------------------------------------------
 .../integration-tests/delete_tablet-itest.cc    |  1 +
 src/kudu/kserver/kserver.cc                     | 19 ++++++++-----
 src/kudu/kserver/kserver.h                      |  4 +++
 src/kudu/master/sys_catalog.cc                  |  3 ++-
 src/kudu/tablet/tablet_replica-test.cc          | 10 +++++--
 src/kudu/tablet/tablet_replica.cc               | 28 +++++++++-----------
 src/kudu/tablet/tablet_replica.h                | 14 +++++-----
 .../tablet/transactions/transaction_driver.cc   |  6 ++---
 .../tablet/transactions/transaction_driver.h    |  4 +--
 .../tserver/tablet_copy_source_session-test.cc  |  7 +++--
 src/kudu/tserver/ts_tablet_manager.cc           |  3 ++-
 11 files changed, 59 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/74d67fd2/src/kudu/integration-tests/delete_tablet-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/delete_tablet-itest.cc b/src/kudu/integration-tests/delete_tablet-itest.cc
index 0305991..e306963 100644
--- a/src/kudu/integration-tests/delete_tablet-itest.cc
+++ b/src/kudu/integration-tests/delete_tablet-itest.cc
@@ -76,6 +76,7 @@ TEST_F(DeleteTabletITest, TestDeleteFailedReplica) {
 
   // Shut down the TS and restart it after changing flags to ensure no data can
   // be written during tablet bootstrap.
+  tablet_replica.reset();
   mts->Shutdown();
   FLAGS_fs_wal_dir_reserved_bytes = INT64_MAX;
   ASSERT_OK(mts->Restart());

http://git-wip-us.apache.org/repos/asf/kudu/blob/74d67fd2/src/kudu/kserver/kserver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/kserver/kserver.cc b/src/kudu/kserver/kserver.cc
index 47ffd0e..fc5fbf4 100644
--- a/src/kudu/kserver/kserver.cc
+++ b/src/kudu/kserver/kserver.cc
@@ -74,13 +74,17 @@ Status KuduServer::Init() {
                 .set_metrics(std::move(metrics))
                 .Build(&tablet_apply_pool_));
 
-  // This pool is shared by all replicas hosted by this server.
+  // These pools are shared by all replicas hosted by this server.
   //
-  // Some submitted tasks use blocking IO, so we configure no upper bound on
-  // the maximum number of threads in each pool (otherwise the default value of
-  // "number of CPUs" may cause blocking tasks to starve other "fast" tasks).
-  // However, the effective upper bound is the number of replicas as each will
-  // submit its own tasks via a dedicated token.
+  // Submitted tasks use blocking IO (raft_pool_) or acquire long-held locks
+  // (tablet_prepare_pool_) so we configure no upper bound on the maximum
+  // number of threads in each pool (otherwise the default value of "number of
+  // CPUs" may cause blocking tasks to starve other "fast" tasks). However, the
+  // effective upper bound is the number of replicas as each will submit its
+  // own tasks via a dedicated token.
+  RETURN_NOT_OK(ThreadPoolBuilder("prepare")
+                .set_max_threads(std::numeric_limits<int>::max())
+                .Build(&tablet_prepare_pool_));
   RETURN_NOT_OK(ThreadPoolBuilder("raft")
                 .set_trace_metric_prefix("raft")
                 .set_max_threads(std::numeric_limits<int>::max())
@@ -114,6 +118,9 @@ void KuduServer::Shutdown() {
   if (tablet_apply_pool_) {
     tablet_apply_pool_->Shutdown();
   }
+  if (tablet_prepare_pool_) {
+    tablet_prepare_pool_->Shutdown();
+  }
   ServerBase::Shutdown();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/74d67fd2/src/kudu/kserver/kserver.h
----------------------------------------------------------------------
diff --git a/src/kudu/kserver/kserver.h b/src/kudu/kserver/kserver.h
index 44a70cb..100df3c 100644
--- a/src/kudu/kserver/kserver.h
+++ b/src/kudu/kserver/kserver.h
@@ -56,11 +56,15 @@ class KuduServer : public server::ServerBase {
   // Shuts down a KuduServer instance.
   virtual void Shutdown() override;
 
+  ThreadPool* tablet_prepare_pool() const { return tablet_prepare_pool_.get(); }
   ThreadPool* tablet_apply_pool() const { return tablet_apply_pool_.get(); }
   ThreadPool* raft_pool() const { return raft_pool_.get(); }
 
  private:
 
+  // Thread pool for preparing transactions, shared between all tablets.
+  gscoped_ptr<ThreadPool> tablet_prepare_pool_;
+
   // Thread pool for applying transactions, shared between all tablets.
   gscoped_ptr<ThreadPool> tablet_apply_pool_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/74d67fd2/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 624b2e2..81407b7 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -332,7 +332,8 @@ Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::TabletMetadata>&
                                               scoped_refptr<rpc::ResultTracker>(),
                                               log,
                                               tablet->GetMetricEntity(),
-                                              master_->raft_pool()),
+                                              master_->raft_pool(),
+                                              master_->tablet_prepare_pool()),
                         "Failed to Init() TabletReplica");
 
   RETURN_NOT_OK_PREPEND(tablet_replica_->Start(consensus_info),

http://git-wip-us.apache.org/repos/asf/kudu/blob/74d67fd2/src/kudu/tablet/tablet_replica-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index 5252ff9..f91293d 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -91,6 +91,7 @@ class TabletReplicaTest : public KuduTabletTest {
   virtual void SetUp() OVERRIDE {
     KuduTabletTest::SetUp();
 
+    ASSERT_OK(ThreadPoolBuilder("prepare").Build(&prepare_pool_));
     ASSERT_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
     ASSERT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
 
@@ -142,7 +143,8 @@ class TabletReplicaTest : public KuduTabletTest {
                                     scoped_refptr<rpc::ResultTracker>(),
                                     log,
                                     metric_entity_,
-                                    raft_pool_.get()));
+                                    raft_pool_.get(),
+                                    prepare_pool_.get()));
   }
 
   Status StartPeer(const ConsensusBootstrapInfo& info) {
@@ -158,6 +160,7 @@ class TabletReplicaTest : public KuduTabletTest {
 
   virtual void TearDown() OVERRIDE {
     tablet_replica_->Shutdown();
+    prepare_pool_->Shutdown();
     apply_pool_->Shutdown();
     KuduTabletTest::TearDown();
   }
@@ -265,9 +268,12 @@ class TabletReplicaTest : public KuduTabletTest {
   MetricRegistry metric_registry_;
   scoped_refptr<MetricEntity> metric_entity_;
   shared_ptr<Messenger> messenger_;
-  scoped_refptr<TabletReplica> tablet_replica_;
+  gscoped_ptr<ThreadPool> prepare_pool_;
   gscoped_ptr<ThreadPool> apply_pool_;
   gscoped_ptr<ThreadPool> raft_pool_;
+
+  // Must be destroyed before thread pools.
+  scoped_refptr<TabletReplica> tablet_replica_;
 };
 
 // A Transaction that waits on the apply_continue latch inside of Apply().

http://git-wip-us.apache.org/repos/asf/kudu/blob/74d67fd2/src/kudu/tablet/tablet_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index 9698098..baf1f27 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -129,21 +129,19 @@ Status TabletReplica::Init(const shared_ptr<Tablet>& tablet,
                            const scoped_refptr<ResultTracker>& result_tracker,
                            const scoped_refptr<Log>& log,
                            const scoped_refptr<MetricEntity>& metric_entity,
-                           ThreadPool* raft_pool) {
+                           ThreadPool* raft_pool,
+                           ThreadPool* prepare_pool) {
 
   DCHECK(tablet) << "A TabletReplica must be provided with a Tablet";
   DCHECK(log) << "A TabletReplica must be provided with a Log";
 
-  ThreadPoolMetrics metrics = {
-      METRIC_op_prepare_queue_length.Instantiate(metric_entity),
-      METRIC_op_prepare_queue_time.Instantiate(metric_entity),
-      METRIC_op_prepare_run_time.Instantiate(metric_entity)
-  };
-  RETURN_NOT_OK(ThreadPoolBuilder("prepare")
-                .set_max_threads(1)
-                .set_metrics(std::move(metrics))
-                .Build(&prepare_pool_));
-
+  prepare_pool_token_ = prepare_pool->NewTokenWithMetrics(
+      ThreadPool::ExecutionMode::SERIAL,
+      {
+          METRIC_op_prepare_queue_length.Instantiate(metric_entity),
+          METRIC_op_prepare_queue_time.Instantiate(metric_entity),
+          METRIC_op_prepare_run_time.Instantiate(metric_entity)
+      });
   {
     std::lock_guard<simple_spinlock> lock(lock_);
     CHECK_EQ(BOOTSTRAPPING, state_);
@@ -245,8 +243,8 @@ void TabletReplica::Shutdown() {
     txn_tracker_.WaitForAllToFinish();
   }
 
-  if (prepare_pool_) {
-    prepare_pool_->Shutdown();
+  if (prepare_pool_token_) {
+    prepare_pool_token_->Shutdown();
   }
 
   if (log_) {
@@ -554,7 +552,7 @@ Status TabletReplica::NewLeaderTransactionDriver(gscoped_ptr<Transaction>
transa
     &txn_tracker_,
     consensus_.get(),
     log_.get(),
-    prepare_pool_.get(),
+    prepare_pool_token_.get(),
     apply_pool_,
     &txn_order_verifier_);
   RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::LEADER));
@@ -569,7 +567,7 @@ Status TabletReplica::NewReplicaTransactionDriver(gscoped_ptr<Transaction>
trans
     &txn_tracker_,
     consensus_.get(),
     log_.get(),
-    prepare_pool_.get(),
+    prepare_pool_token_.get(),
     apply_pool_,
     &txn_order_verifier_);
   RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::REPLICA));

http://git-wip-us.apache.org/repos/asf/kudu/blob/74d67fd2/src/kudu/tablet/tablet_replica.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index 6c4846e..f54b115 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -69,7 +69,8 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
                       public consensus::ReplicaTransactionFactory {
  public:
   TabletReplica(const scoped_refptr<TabletMetadata>& meta,
-                consensus::RaftPeerPB local_peer_pb, ThreadPool* apply_pool,
+                consensus::RaftPeerPB local_peer_pb,
+                ThreadPool* apply_pool,
                 Callback<void(const std::string& reason)> mark_dirty_clbk);
 
   // Initializes the TabletReplica, namely creating the Log and initializing
@@ -80,7 +81,8 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
               const scoped_refptr<rpc::ResultTracker>& result_tracker,
               const scoped_refptr<log::Log>& log,
               const scoped_refptr<MetricEntity>& metric_entity,
-              ThreadPool* raft_pool);
+              ThreadPool* raft_pool,
+              ThreadPool* prepare_pool);
 
   // Starts the TabletReplica, making it available for Write()s. If this
   // TabletReplica is part of a consensus configuration this will connect it to other replicas
@@ -308,12 +310,8 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   // during them in order to reject RPCs, etc.
   mutable simple_spinlock state_change_lock_;
 
-  // IMPORTANT: correct execution of PrepareTask assumes that 'prepare_pool_'
-  // is single-threaded, moving to a multi-tablet setup where multiple TabletReplicas
-  // use the same 'prepare_pool_' needs to enforce that, for a single
-  // TabletReplica, PrepareTasks are executed *serially*.
-  // TODO move the prepare pool to TabletServer.
-  gscoped_ptr<ThreadPool> prepare_pool_;
+  // Token for serial task submission to the server-wide transaction prepare pool.
+  std::unique_ptr<ThreadPoolToken> prepare_pool_token_;
 
   // Pool that executes apply tasks for transactions. This is a multi-threaded
   // pool, constructor-injected by either the Master (for system tables) or

http://git-wip-us.apache.org/repos/asf/kudu/blob/74d67fd2/src/kudu/tablet/transactions/transaction_driver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.cc b/src/kudu/tablet/transactions/transaction_driver.cc
index 57f841a..0e73d1b 100644
--- a/src/kudu/tablet/transactions/transaction_driver.cc
+++ b/src/kudu/tablet/transactions/transaction_driver.cc
@@ -82,13 +82,13 @@ class FollowerTransactionCompletionCallback : public TransactionCompletionCallba
 TransactionDriver::TransactionDriver(TransactionTracker *txn_tracker,
                                      RaftConsensus* consensus,
                                      Log* log,
-                                     ThreadPool* prepare_pool,
+                                     ThreadPoolToken* prepare_pool_token,
                                      ThreadPool* apply_pool,
                                      TransactionOrderVerifier* order_verifier)
     : txn_tracker_(txn_tracker),
       consensus_(consensus),
       log_(log),
-      prepare_pool_(prepare_pool),
+      prepare_pool_token_(prepare_pool_token),
       apply_pool_(apply_pool),
       order_verifier_(order_verifier),
       trace_(new Trace()),
@@ -190,7 +190,7 @@ Status TransactionDriver::ExecuteAsync() {
   }
 
   if (s.ok()) {
-    s = prepare_pool_->SubmitClosure(
+    s = prepare_pool_token_->SubmitClosure(
       Bind(&TransactionDriver::PrepareTask, Unretained(this)));
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/74d67fd2/src/kudu/tablet/transactions/transaction_driver.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.h b/src/kudu/tablet/transactions/transaction_driver.h
index e0a987c..03eef10 100644
--- a/src/kudu/tablet/transactions/transaction_driver.h
+++ b/src/kudu/tablet/transactions/transaction_driver.h
@@ -222,7 +222,7 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver>
{
   TransactionDriver(TransactionTracker* txn_tracker,
                     consensus::RaftConsensus* consensus,
                     log::Log* log,
-                    ThreadPool* prepare_pool,
+                    ThreadPoolToken* prepare_pool_token,
                     ThreadPool* apply_pool,
                     TransactionOrderVerifier* order_verifier);
 
@@ -345,7 +345,7 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver>
{
   TransactionTracker* const txn_tracker_;
   consensus::RaftConsensus* const consensus_;
   log::Log* const log_;
-  ThreadPool* const prepare_pool_;
+  ThreadPoolToken* const prepare_pool_token_;
   ThreadPool* const apply_pool_;
   TransactionOrderVerifier* const order_verifier_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/74d67fd2/src/kudu/tserver/tablet_copy_source_session-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc b/src/kudu/tserver/tablet_copy_source_session-test.cc
index 7b09fcb..7ded27d 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -73,7 +73,8 @@ class TabletCopyTest : public KuduTabletTest {
   TabletCopyTest()
     : KuduTabletTest(Schema({ ColumnSchema("key", STRING),
                               ColumnSchema("val", INT32) }, 1)) {
-    CHECK_OK(ThreadPoolBuilder("test-exec").Build(&apply_pool_));
+    CHECK_OK(ThreadPoolBuilder("prepare").Build(&prepare_pool_));
+    CHECK_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
     CHECK_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
   }
 
@@ -137,7 +138,8 @@ class TabletCopyTest : public KuduTabletTest {
                                     scoped_refptr<rpc::ResultTracker>(),
                                     log,
                                     metric_entity,
-                                    raft_pool_.get()));
+                                    raft_pool_.get(),
+                                    prepare_pool_.get()));
     consensus::ConsensusBootstrapInfo boot_info;
     ASSERT_OK(tablet_replica_->Start(boot_info));
     ASSERT_OK(tablet_replica_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10)));
@@ -215,6 +217,7 @@ class TabletCopyTest : public KuduTabletTest {
 
   MetricRegistry metric_registry_;
   scoped_refptr<LogAnchorRegistry> log_anchor_registry_;
+  gscoped_ptr<ThreadPool> prepare_pool_;
   gscoped_ptr<ThreadPool> apply_pool_;
   gscoped_ptr<ThreadPool> raft_pool_;
   scoped_refptr<TabletReplica> tablet_replica_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/74d67fd2/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 322ae46..99e580e 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -773,7 +773,8 @@ void TSTabletManager::OpenTablet(const scoped_refptr<TabletMetadata>&
meta,
                        server_->result_tracker(),
                        log,
                        tablet->GetMetricEntity(),
-                       server_->raft_pool());
+                       server_->raft_pool(),
+                       server_->tablet_prepare_pool());
 
     if (!s.ok()) {
       LOG(ERROR) << LogPrefix(tablet_id) << "Tablet failed to init: "


Mime
View raw message