kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [4/5] kudu git commit: threadpool: token-based task sequencing
Date Tue, 27 Jun 2017 23:03:44 GMT
threadpool: token-based task sequencing

This patch adds task sequencing to util/threadpool. Task sequencing allows
M contexts to share a single pool with N threads while also ensuring that
the pool executes tasks belonging to each context in order. Previously this
was only achievable via "one singleton pool per context", which grossly
inflated the total number of threads and overall state.

The new logic is implemented in ThreadPoolToken. Tasks submitted via tokens
are logically grouped together. They may also be sequenced, depending on the
token's execution mode. Tokens expose Wait() variants which will block on
the task group. Tokens may also be shut down before being destroyed, which
has semantics equivalent to shutting down the pool itself (i.e. tasks may no
longer be submitted with these tokens).

Some notes:
- I evaluated two other implementations. In one, tokens had an implicit
  lifecycle that was automatically managed by the threadpool. While simpler
  for clients, the threadpool was more inefficient with more allocations and
  deallocations in each submission. In the other, token submission was built
  using regular task submission. This afforded a certain separation between
  the "core" of the threadpool and the token logic, but it complicated
  locking and tracking of queued tasks.
- I tried to keep submission (whether token-based or tokenless) fast. Just
  the change from std::list to std::deque for the main queue ought to
  improve performance of tokenless submissions. The next bottleneck is
  likely to be lock contention on the global threadpool lock.

Change-Id: If46dc34212027b6ea5dbc2ead7c7af8be57f2c70
Reviewed-on: http://gerrit.cloudera.org:8080/6874
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/428a34ba
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/428a34ba
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/428a34ba

Branch: refs/heads/master
Commit: 428a34ba0aecfb81178d821732feac694a9b35c1
Parents: 74f99e4
Author: Adar Dembo <adar@cloudera.com>
Authored: Thu May 11 20:58:52 2017 -0700
Committer: Todd Lipcon <todd@apache.org>
Committed: Tue Jun 27 22:54:56 2017 +0000

----------------------------------------------------------------------
 src/kudu/util/debug-util.h       |  12 +-
 src/kudu/util/threadpool-test.cc | 399 +++++++++++++++++++++++++++++++++-
 src/kudu/util/threadpool.cc      | 387 +++++++++++++++++++++++++++++----
 src/kudu/util/threadpool.h       | 274 +++++++++++++++++++----
 4 files changed, 986 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/428a34ba/src/kudu/util/debug-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/debug-util.h b/src/kudu/util/debug-util.h
index 2617520..fc12445 100644
--- a/src/kudu/util/debug-util.h
+++ b/src/kudu/util/debug-util.h
@@ -94,19 +94,29 @@ void HexStackTraceToString(char* buf, size_t size);
 // Requires external synchronization.
 class StackTrace {
  public:
+
+  // Constructs a new (uncollected) stack trace.
   StackTrace()
     : num_frames_(0) {
   }
 
+  // Resets the stack trace to an uncollected state.
   void Reset() {
     num_frames_ = 0;
   }
 
+  // Returns true if Collect() (but not Reset()) has been called on this stack trace.
+  bool HasCollected() const {
+    return num_frames_ > 0;
+  }
+
+  // Copies the contents of 's' into this stack trace.
   void CopyFrom(const StackTrace& s) {
     memcpy(this, &s, sizeof(s));
   }
 
-  bool Equals(const StackTrace& s) {
+  // Returns true if the stack trace 's' matches this trace.
+  bool Equals(const StackTrace& s) const {
     return s.num_frames_ == num_frames_ &&
       strings::memeq(frames_, s.frames_,
                      num_frames_ * sizeof(frames_[0]));

http://git-wip-us.apache.org/repos/asf/kudu/blob/428a34ba/src/kudu/util/threadpool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index 6bd5826..0beb962 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -15,8 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <atomic>
+#include <limits>
 #include <memory>
+#include <mutex>
 #include <string>
+#include <thread>
+#include <vector>
 
 #include <boost/bind.hpp>
 #include <glog/logging.h>
@@ -24,16 +29,27 @@
 
 #include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/bind.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/barrier.h"
 #include "kudu/util/countdown_latch.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/promise.h"
+#include "kudu/util/random.h"
 #include "kudu/util/scoped_cleanup.h"
-#include "kudu/util/threadpool.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
+#include "kudu/util/threadpool.h"
 #include "kudu/util/trace.h"
 
+using std::atomic;
 using std::shared_ptr;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+
+using strings::Substitute;
 
 namespace kudu {
 
@@ -363,5 +379,386 @@ TEST_F(ThreadPoolTest, TestSlowDestructor) {
   ASSERT_LT((MonoTime::Now() - start).ToSeconds(), 5);
 }
 
+// For test cases that should run with both kinds of tokens.
+class ThreadPoolTestTokenTypes : public ThreadPoolTest,
+                                 public testing::WithParamInterface<ThreadPool::ExecutionMode> {};
+
+INSTANTIATE_TEST_CASE_P(Tokens, ThreadPoolTestTokenTypes,
+                        ::testing::Values(ThreadPool::ExecutionMode::SERIAL,
+                                          ThreadPool::ExecutionMode::CONCURRENT));
+
+
+TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitAndWait) {
+  unique_ptr<ThreadPoolToken> t = pool_->NewToken(GetParam());
+  int i = 0;
+  ASSERT_OK(t->SubmitFunc([&]() {
+    SleepFor(MonoDelta::FromMilliseconds(1));
+    i++;
+  }));
+  t->Wait();
+  ASSERT_EQ(1, i);
+}
+
+TEST_F(ThreadPoolTest, TestTokenSubmitsProcessedSerially) {
+  unique_ptr<ThreadPoolToken> t = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+  Random r(SeedRandom());
+  string result;
+  for (char c = 'a'; c < 'f'; c++) {
+    // Sleep a little first so that there's a higher chance of out-of-order
+    // appends if the submissions did execute in parallel.
+    int sleep_ms = r.Next() % 5;
+    ASSERT_OK(t->SubmitFunc([&result, c, sleep_ms]() {
+      SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+      result += c;
+    }));
+  }
+  t->Wait();
+  ASSERT_EQ("abcde", result);
+}
+
+TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitsProcessedConcurrently) {
+  const int kNumTokens = 5;
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_max_threads(kNumTokens)));
+  vector<unique_ptr<ThreadPoolToken>> tokens;
+
+  // A violation to the tested invariant would yield a deadlock, so let's set
+  // up an alarm to bail us out.
+  alarm(60);
+  auto cleanup = MakeScopedCleanup([&] {
+      alarm(0); // Disable alarm on test exit.
+  });
+  shared_ptr<Barrier> b = std::make_shared<Barrier>(kNumTokens + 1);
+  for (int i = 0; i < kNumTokens; i++) {
+    tokens.emplace_back(pool_->NewToken(GetParam()));
+    ASSERT_OK(tokens.back()->SubmitFunc([b]() {
+      b->Wait();
+    }));
+  }
+
+  // This will deadlock if the above tasks weren't all running concurrently.
+  b->Wait();
+}
+
+TEST_F(ThreadPoolTest, TestTokenSubmitsNonSequential) {
+  const int kNumSubmissions = 5;
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_max_threads(kNumSubmissions)));
+
+  // A violation to the tested invariant would yield a deadlock, so let's set
+  // up an alarm to bail us out.
+  alarm(60);
+  auto cleanup = MakeScopedCleanup([&] {
+      alarm(0); // Disable alarm on test exit.
+  });
+  shared_ptr<Barrier> b = std::make_shared<Barrier>(kNumSubmissions + 1);
+  unique_ptr<ThreadPoolToken> t = pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
+  for (int i = 0; i < kNumSubmissions; i++) {
+    ASSERT_OK(t->SubmitFunc([b]() {
+      b->Wait();
+    }));
+  }
+
+  // This will deadlock if the above tasks weren't all running concurrently.
+  b->Wait();
+}
+
+TEST_P(ThreadPoolTestTokenTypes, TestTokenShutdown) {
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_max_threads(4)));
+
+  unique_ptr<ThreadPoolToken> t1(pool_->NewToken(GetParam()));
+  unique_ptr<ThreadPoolToken> t2(pool_->NewToken(GetParam()));
+  CountDownLatch l1(1);
+  CountDownLatch l2(1);
+
+  // A violation to the tested invariant would yield a deadlock, so let's set
+  // up an alarm to bail us out.
+  alarm(60);
+  auto cleanup = MakeScopedCleanup([&] {
+      alarm(0); // Disable alarm on test exit.
+  });
+
+  for (int i = 0; i < 3; i++) {
+    ASSERT_OK(t1->SubmitFunc([&]() {
+      l1.Wait();
+    }));
+  }
+  for (int i = 0; i < 3; i++) {
+    ASSERT_OK(t2->SubmitFunc([&]() {
+      l2.Wait();
+    }));
+  }
+
+  // Unblock all of t1's tasks, but not t2's tasks.
+  l1.CountDown();
+
+  // If this also waited for t2's tasks, it would deadlock.
+  t1->Shutdown();
+
+  // We can no longer submit to t1 but we can still submit to t2.
+  ASSERT_TRUE(t1->SubmitFunc([](){}).IsServiceUnavailable());
+  ASSERT_OK(t2->SubmitFunc([](){}));
+
+  // Unblock t2's tasks.
+  l2.CountDown();
+  t2->Shutdown();
+}
+
+TEST_P(ThreadPoolTestTokenTypes, TestTokenWaitForAll) {
+  const int kNumTokens = 3;
+  const int kNumSubmissions = 20;
+  Random r(SeedRandom());
+  vector<unique_ptr<ThreadPoolToken>> tokens;
+  for (int i = 0; i < kNumTokens; i++) {
+    tokens.emplace_back(pool_->NewToken(GetParam()));
+  }
+
+  atomic<int32_t> v(0);
+  for (int i = 0; i < kNumSubmissions; i++) {
+    // Sleep a little first to raise the likelihood of the test thread
+    // reaching Wait() before the submissions finish.
+    int sleep_ms = r.Next() % 5;
+
+    auto task = [&v, sleep_ms]() {
+      SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+      v++;
+    };
+
+    // Half of the submissions will be token-less, and half will use a token.
+    if (i % 2 == 0) {
+      ASSERT_OK(pool_->SubmitFunc(task));
+    } else {
+      int token_idx = r.Next() % tokens.size();
+      ASSERT_OK(tokens[token_idx]->SubmitFunc(task));
+    }
+  }
+  pool_->Wait();
+  ASSERT_EQ(kNumSubmissions, v);
+}
+
+TEST_F(ThreadPoolTest, TestFuzz) {
+  const int kNumOperations = 1000;
+  Random r(SeedRandom());
+  vector<unique_ptr<ThreadPoolToken>> tokens;
+
+  for (int i = 0; i < kNumOperations; i++) {
+    // Operation distribution:
+    //
+    // - Submit without a token: 40%
+    // - Submit with a randomly selected token: 35%
+    // - Allocate a new token: 10%
+    // - Wait on a randomly selected token: 7%
+    // - Shutdown a randomly selected token: 4%
+    // - Deallocate a randomly selected token: 2%
+    // - Wait for all submissions: 2%
+    int op = r.Next() % 100;
+    if (op < 40) {
+      // Submit without a token.
+      int sleep_ms = r.Next() % 5;
+      ASSERT_OK(pool_->SubmitFunc([sleep_ms]() {
+        // Sleep a little first to increase task overlap.
+        SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+      }));
+    } else if (op < 75) {
+      // Submit with a randomly selected token.
+      if (tokens.empty()) {
+        continue;
+      }
+      int sleep_ms = r.Next() % 5;
+      int token_idx = r.Next() % tokens.size();
+      Status s = tokens[token_idx]->SubmitFunc([sleep_ms]() {
+        // Sleep a little first to increase task overlap.
+        SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+      });
+      ASSERT_TRUE(s.ok() || s.IsServiceUnavailable());
+    } else if (op < 85) {
+      // Allocate a token with a randomly selected policy.
+      ThreadPool::ExecutionMode mode = r.Next() % 2 ?
+          ThreadPool::ExecutionMode::SERIAL :
+          ThreadPool::ExecutionMode::CONCURRENT;
+      tokens.emplace_back(pool_->NewToken(mode));
+    } else if (op < 92) {
+      // Wait on a randomly selected token.
+      if (tokens.empty()) {
+        continue;
+      }
+      int token_idx = r.Next() % tokens.size();
+      tokens[token_idx]->Wait();
+    } else if (op < 96) {
+      // Shutdown a randomly selected token.
+      if (tokens.empty()) {
+        continue;
+      }
+      int token_idx = r.Next() % tokens.size();
+      tokens[token_idx]->Shutdown();
+    } else if (op < 98) {
+      // Deallocate a randomly selected token.
+      if (tokens.empty()) {
+        continue;
+      }
+      auto it = tokens.begin();
+      int token_idx = r.Next() % tokens.size();
+      std::advance(it, token_idx);
+      tokens.erase(it);
+    } else {
+      // Wait on everything.
+      ASSERT_LT(op, 100);
+      ASSERT_GE(op, 98);
+      pool_->Wait();
+    }
+  }
+
+  // Some test runs will shut down the pool before the tokens, and some won't.
+  // Either way should be safe.
+  if (r.Next() % 2 == 0) {
+    pool_->Shutdown();
+  }
+}
+
+TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmissionsAdhereToMaxQueueSize) {
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_min_threads(1)
+                                   .set_max_threads(1)
+                                   .set_max_queue_size(1)));
+
+  CountDownLatch latch(1);
+  unique_ptr<ThreadPoolToken> t = pool_->NewToken(GetParam());
+  auto cleanup = MakeScopedCleanup([&]() {
+    latch.CountDown();
+  });
+  // We will be able to submit two tasks: one for max_threads == 1 and one for
+  // max_queue_size == 1.
+  ASSERT_OK(t->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  ASSERT_OK(t->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  Status s = t->Submit(shared_ptr<Runnable>(new SlowTask(&latch)));
+  ASSERT_TRUE(s.IsServiceUnavailable());
+}
+
+TEST_F(ThreadPoolTest, TestTokenConcurrency) {
+  const int kNumTokens = 20;
+  const int kTestRuntimeSecs = 1;
+  const int kCycleThreads = 2;
+  const int kShutdownThreads = 2;
+  const int kWaitThreads = 2;
+  const int kSubmitThreads = 8;
+
+  vector<shared_ptr<ThreadPoolToken>> tokens;
+  Random rng(SeedRandom());
+
+  // Protects 'tokens' and 'rng'.
+  simple_spinlock lock;
+
+  // Fetch a token from 'tokens' at random.
+  auto GetRandomToken = [&]() -> shared_ptr<ThreadPoolToken> {
+    std::lock_guard<simple_spinlock> l(lock);
+    int idx = rng.Uniform(kNumTokens);
+    return tokens[idx];
+  };
+
+  // Preallocate all of the tokens.
+  for (int i = 0; i < kNumTokens; i++) {
+    ThreadPool::ExecutionMode mode;
+    {
+      std::lock_guard<simple_spinlock> l(lock);
+      mode = rng.Next() % 2 ?
+          ThreadPool::ExecutionMode::SERIAL :
+          ThreadPool::ExecutionMode::CONCURRENT;
+    }
+    tokens.emplace_back(pool_->NewToken(mode).release());
+  }
+
+  atomic<int64_t> total_num_tokens_cycled(0);
+  atomic<int64_t> total_num_tokens_shutdown(0);
+  atomic<int64_t> total_num_tokens_waited(0);
+  atomic<int64_t> total_num_tokens_submitted(0);
+
+  CountDownLatch latch(1);
+  vector<thread> threads;
+
+  for (int i = 0; i < kCycleThreads; i++) {
+    // Pick a token at random and replace it.
+    //
+    // The replaced token is only destroyed when the last ref is dropped,
+    // possibly by another thread.
+    threads.emplace_back([&]() {
+      int num_tokens_cycled = 0;
+      while (latch.count()) {
+        {
+          std::lock_guard<simple_spinlock> l(lock);
+          int idx = rng.Uniform(kNumTokens);
+          ThreadPool::ExecutionMode mode = rng.Next() % 2 ?
+              ThreadPool::ExecutionMode::SERIAL :
+              ThreadPool::ExecutionMode::CONCURRENT;
+          tokens[idx] = shared_ptr<ThreadPoolToken>(pool_->NewToken(mode).release());
+        }
+        num_tokens_cycled++;
+
+        // Sleep a bit, otherwise this thread outpaces the other threads and
+        // nothing interesting happens to most tokens.
+        SleepFor(MonoDelta::FromMicroseconds(10));
+      }
+      total_num_tokens_cycled += num_tokens_cycled;
+    });
+  }
+
+  for (int i = 0; i < kShutdownThreads; i++) {
+    // Pick a token at random and shut it down. Submitting a task to a shut
+    // down token will return a ServiceUnavailable error.
+    threads.emplace_back([&]() {
+      int num_tokens_shutdown = 0;
+      while (latch.count()) {
+        GetRandomToken()->Shutdown();
+        num_tokens_shutdown++;
+      }
+      total_num_tokens_shutdown += num_tokens_shutdown;
+    });
+  }
+
+  for (int i = 0; i < kWaitThreads; i++) {
+    // Pick a token at random and wait for any outstanding tasks.
+    threads.emplace_back([&]() {
+      int num_tokens_waited  = 0;
+      while (latch.count()) {
+        GetRandomToken()->Wait();
+        num_tokens_waited++;
+      }
+      total_num_tokens_waited += num_tokens_waited;
+    });
+  }
+
+  for (int i = 0; i < kSubmitThreads; i++) {
+    // Pick a token at random and submit a task to it.
+    threads.emplace_back([&]() {
+      int num_tokens_submitted = 0;
+      Random rng(SeedRandom());
+      while (latch.count()) {
+        int sleep_ms = rng.Next() % 5;
+        Status s = GetRandomToken()->SubmitFunc([sleep_ms]() {
+          // Sleep a little first so that tasks are running during other events.
+          SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+        });
+        CHECK(s.ok() || s.IsServiceUnavailable());
+        num_tokens_submitted++;
+      }
+      total_num_tokens_submitted += num_tokens_submitted;
+    });
+  }
+
+  SleepFor(MonoDelta::FromSeconds(kTestRuntimeSecs));
+  latch.CountDown();
+  for (auto& t : threads) {
+    t.join();
+  }
+
+  LOG(INFO) << Substitute("Tokens cycled ($0 threads): $1",
+                          kCycleThreads, total_num_tokens_cycled.load());
+  LOG(INFO) << Substitute("Tokens shutdown ($0 threads): $1",
+                          kShutdownThreads, total_num_tokens_shutdown.load());
+  LOG(INFO) << Substitute("Tokens waited ($0 threads): $1",
+                          kWaitThreads, total_num_tokens_waited.load());
+  LOG(INFO) << Substitute("Tokens submitted ($0 threads): $1",
+                          kSubmitThreads, total_num_tokens_submitted.load());
+}
 
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/428a34ba/src/kudu/util/threadpool.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index b3f4ddf..eda052f 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -17,13 +17,19 @@
 
 #include "kudu/util/threadpool.h"
 
+#include <algorithm>
+#include <iostream>
+#include <limits>
+#include <memory>
+#include <string>
+
 #include <boost/function.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
-#include <limits>
-#include <string>
 
 #include "kudu/gutil/callback.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/sysinfo.h"
@@ -34,6 +40,8 @@
 
 namespace kudu {
 
+using std::shared_ptr;
+using std::unique_ptr;
 using strings::Substitute;
 
 ////////////////////////////////////////////////////////
@@ -53,6 +61,22 @@ class FunctionRunnable : public Runnable {
 };
 
 ////////////////////////////////////////////////////////
+// ClosureRunnable
+////////////////////////////////////////////////////////
+
+class ClosureRunnable : public Runnable {
+ public:
+  explicit ClosureRunnable(Closure cl) : cl_(std::move(cl)) {}
+
+  void Run() OVERRIDE {
+    cl_.Run();
+  }
+
+ private:
+  Closure cl_;
+};
+
+////////////////////////////////////////////////////////
 // ThreadPoolBuilder
 ////////////////////////////////////////////////////////
 
@@ -98,6 +122,178 @@ Status ThreadPoolBuilder::Build(gscoped_ptr<ThreadPool>* pool) const {
 }
 
 ////////////////////////////////////////////////////////
+// ThreadPoolToken
+////////////////////////////////////////////////////////
+
+ThreadPoolToken::ThreadPoolToken(ThreadPool* pool,
+                                 ThreadPool::ExecutionMode mode)
+    : mode_(mode),
+      pool_(pool),
+      state_(State::IDLE),
+      not_running_cond_(&pool->lock_),
+      active_threads_(0) {
+}
+
+ThreadPoolToken::~ThreadPoolToken() {
+  Shutdown();
+  pool_->ReleaseToken(this);
+}
+
+Status ThreadPoolToken::SubmitClosure(Closure c) {
+  return Submit(std::make_shared<ClosureRunnable>(std::move(c)));
+}
+
+Status ThreadPoolToken::SubmitFunc(boost::function<void()> f) {
+  return Submit(std::make_shared<FunctionRunnable>(std::move(f)));
+}
+
+Status ThreadPoolToken::Submit(shared_ptr<Runnable> r) {
+  return pool_->DoSubmit(std::move(r), this);
+}
+
+void ThreadPoolToken::Shutdown() {
+  MutexLock unique_lock(pool_->lock_);
+  pool_->CheckNotPoolThreadUnlocked();
+
+  // Clear the queue under the lock, but defer the releasing of the tasks
+  // outside the lock, in case there are concurrent threads wanting to access
+  // the ThreadPool. The task's destructors may acquire locks, etc, so this
+  // also prevents lock inversions.
+  deque<ThreadPool::Task> to_release = std::move(entries_);
+  pool_->total_queued_tasks_ -= to_release.size();
+
+  switch (state()) {
+    case State::IDLE:
+      // There were no tasks outstanding; we can quiesce the token immediately.
+      Transition(State::QUIESCED);
+      break;
+    case State::RUNNING:
+      // There were outstanding tasks. If any are still running, switch to
+      // QUIESCING and wait for them to finish (the worker thread executing
+      // the token's last task will switch the token to QUIESCED). Otherwise,
+      // we can quiesce the token immediately.
+
+      // Note: this is an O(n) operation, but it's expected to be infrequent.
+      // Plus doing it this way (rather than switching to QUIESCING and waiting
+      // for a worker thread to process the queue entry) helps retain state
+      // transition symmetry with ThreadPool::Shutdown.
+      for (auto it = pool_->queue_.begin(); it != pool_->queue_.end();) {
+        if (*it == this) {
+          it = pool_->queue_.erase(it);
+        } else {
+          it++;
+        }
+      }
+
+      if (active_threads_ == 0) {
+        Transition(State::QUIESCED);
+        break;
+      }
+      Transition(State::QUIESCING);
+      FALLTHROUGH_INTENDED;
+    case State::QUIESCING:
+      // The token is already quiescing. Just wait for a worker thread to
+      // switch it to QUIESCED.
+      while (state() != State::QUIESCED) {
+        not_running_cond_.Wait();
+      }
+      break;
+    default:
+      break;
+  }
+
+  // Finally release the queued tasks, outside the lock.
+  unique_lock.Unlock();
+  for (auto& t : to_release) {
+    if (t.trace) {
+      t.trace->Release();
+    }
+  }
+}
+
+void ThreadPoolToken::Wait() {
+  MutexLock unique_lock(pool_->lock_);
+  pool_->CheckNotPoolThreadUnlocked();
+  while (IsActive()) {
+    not_running_cond_.Wait();
+  }
+}
+
+bool ThreadPoolToken::WaitUntil(const MonoTime& until) {
+  return WaitFor(until - MonoTime::Now());
+}
+
+bool ThreadPoolToken::WaitFor(const MonoDelta& delta) {
+  MutexLock unique_lock(pool_->lock_);
+  pool_->CheckNotPoolThreadUnlocked();
+  while (IsActive()) {
+    if (!not_running_cond_.TimedWait(delta)) {
+      return false;
+    }
+  }
+  return true;
+}
+
+void ThreadPoolToken::Transition(State new_state) {
+#ifndef NDEBUG
+  CHECK_NE(state_, new_state);
+
+  switch (state_) {
+    case State::IDLE:
+      CHECK(new_state == State::RUNNING ||
+            new_state == State::QUIESCED);
+      if (new_state == State::RUNNING) {
+        CHECK(!entries_.empty());
+      } else {
+        CHECK(entries_.empty());
+        CHECK_EQ(active_threads_, 0);
+      }
+      break;
+    case State::RUNNING:
+      CHECK(new_state == State::IDLE ||
+            new_state == State::QUIESCING ||
+            new_state == State::QUIESCED);
+      CHECK(entries_.empty());
+      if (new_state == State::QUIESCING) {
+        CHECK_GT(active_threads_, 0);
+      }
+      break;
+    case State::QUIESCING:
+      CHECK(new_state == State::QUIESCED);
+      CHECK_EQ(active_threads_, 0);
+      break;
+    case State::QUIESCED:
+      CHECK(false); // QUIESCED is a terminal state
+      break;
+    default:
+      LOG(FATAL) << "Unknown token state: " << state_;
+  }
+#endif
+
+  // Take actions based on the state we're entering.
+  switch (new_state) {
+    case State::IDLE:
+    case State::QUIESCED:
+      not_running_cond_.Broadcast();
+      break;
+    default:
+      break;
+  }
+
+  state_ = new_state;
+}
+
+const char* ThreadPoolToken::StateToString(State s) {
+  switch (s) {
+    case State::IDLE: return "IDLE"; break;
+    case State::RUNNING: return "RUNNING"; break;
+    case State::QUIESCING: return "QUIESCING"; break;
+    case State::QUIESCED: return "QUIESCED"; break;
+  }
+  return "<cannot reach here>";
+}
+
+////////////////////////////////////////////////////////
 // ThreadPool
 ////////////////////////////////////////////////////////
 
@@ -113,7 +309,8 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
     not_empty_(&lock_),
     num_threads_(0),
     active_threads_(0),
-    queue_size_(0) {
+    total_queued_tasks_(0),
+    tokenless_(NewToken(ExecutionMode::CONCURRENT)) {
 
   string prefix = !builder.trace_metric_prefix_.empty() ?
       builder.trace_metric_prefix_ : builder.name_;
@@ -127,6 +324,10 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
 }
 
 ThreadPool::~ThreadPool() {
+  // There should only be one live token: the one used in tokenless submission.
+  CHECK_EQ(1, tokens_.size()) << Substitute(
+      "Threadpool $0 destroyed with $1 allocated tokens",
+      name_, tokens_.size());
   Shutdown();
 }
 
@@ -150,43 +351,95 @@ void ThreadPool::Shutdown() {
   MutexLock unique_lock(lock_);
   CheckNotPoolThreadUnlocked();
 
+  // Note: this is the same error seen at submission if the pool is at
+  // capacity, so clients can't tell them apart. This isn't really a practical
+  // concern though because shutting down a pool typically requires clients to
+  // be quiesced first, so there's no danger of a client getting confused.
   pool_status_ = Status::ServiceUnavailable("The pool has been shut down.");
 
-  // Clear the queue_ member under the lock, but defer the releasing
-  // of the entries outside the lock, in case there are concurrent threads
+  // Clear the various queues under the lock, but defer the releasing
+  // of the tasks outside the lock, in case there are concurrent threads
   // wanting to access the ThreadPool. The task's destructors may acquire
   // locks, etc, so this also prevents lock inversions.
-  auto to_release = std::move(queue_);
   queue_.clear();
-  queue_size_ = 0;
-  not_empty_.Broadcast();
+  deque<deque<Task>> to_release;
+  for (auto* t : tokens_) {
+    if (!t->entries_.empty()) {
+      to_release.emplace_back(std::move(t->entries_));
+    }
+    switch (t->state()) {
+      case ThreadPoolToken::State::IDLE:
+        // The token is idle; we can quiesce it immediately.
+        t->Transition(ThreadPoolToken::State::QUIESCED);
+        break;
+      case ThreadPoolToken::State::RUNNING:
+        // The token has tasks associated with it. If they're merely queued
+        // (i.e. there are no active threads), the tasks will have been removed
+        // above and we can quiesce immediately. Otherwise, we need to wait for
+        // the threads to finish.
+        t->Transition(t->active_threads_ > 0 ?
+            ThreadPoolToken::State::QUIESCING :
+            ThreadPoolToken::State::QUIESCED);
+        break;
+      default:
+        break;
+    }
+  }
 
-  // The Runnable doesn't have Abort() so we must wait
-  // and hopefully the abort is done outside before calling Shutdown().
+  // The queues are empty. Wake any sleeping worker threads and wait for all
+  // of them to exit. Some worker threads will exit immediately upon waking,
+  // while others will exit after they finish executing an outstanding task.
+  total_queued_tasks_ = 0;
+  not_empty_.Broadcast();
   while (num_threads_ > 0) {
     no_threads_cond_.Wait();
   }
 
-  // Finally release the tasks that were in the queue, outside the lock.
+  // All the threads have exited. Check the state of each token.
+  for (auto* t : tokens_) {
+    DCHECK(t->state() == ThreadPoolToken::State::IDLE ||
+           t->state() == ThreadPoolToken::State::QUIESCED);
+  }
+
+  // Finally release the queued tasks, outside the lock.
   unique_lock.Unlock();
-  for (QueueEntry& e : to_release) {
-    if (e.trace) {
-      e.trace->Release();
+  for (auto& token : to_release) {
+    for (auto& t : token) {
+      if (t.trace) {
+        t.trace->Release();
+      }
     }
   }
 }
 
-Status ThreadPool::SubmitClosure(const Closure& task) {
-  // TODO: once all uses of boost::bind-based tasks are dead, implement this
-  // in a more straight-forward fashion.
-  return SubmitFunc(boost::bind(&Closure::Run, task));
+unique_ptr<ThreadPoolToken> ThreadPool::NewToken(ExecutionMode mode) {
+  MutexLock guard(lock_);
+  unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode));
+  InsertOrDie(&tokens_, t.get());
+  return t;
+}
+
+void ThreadPool::ReleaseToken(ThreadPoolToken* t) {
+  MutexLock guard(lock_);
+  CHECK(!t->IsActive()) << Substitute("Token with state $0 may not be released",
+                                      ThreadPoolToken::StateToString(t->state()));
+  CHECK_EQ(1, tokens_.erase(t));
+}
+
+Status ThreadPool::SubmitClosure(Closure c) {
+  return Submit(std::make_shared<ClosureRunnable>(std::move(c)));
 }
 
-Status ThreadPool::SubmitFunc(boost::function<void()> func) {
-  return Submit(std::shared_ptr<Runnable>(new FunctionRunnable(std::move(func))));
+Status ThreadPool::SubmitFunc(boost::function<void()> f) {
+  return Submit(std::make_shared<FunctionRunnable>(std::move(f)));
 }
 
-Status ThreadPool::Submit(std::shared_ptr<Runnable> task) {
+Status ThreadPool::Submit(shared_ptr<Runnable> r) {
+  return DoSubmit(std::move(r), tokenless_.get());
+}
+
+Status ThreadPool::DoSubmit(shared_ptr<Runnable> r, ThreadPoolToken* token) {
+  DCHECK(token);
   MonoTime submit_time = MonoTime::Now();
 
   MutexLock guard(lock_);
@@ -194,13 +447,17 @@ Status ThreadPool::Submit(std::shared_ptr<Runnable> task) {
     return pool_status_;
   }
 
+  if (PREDICT_FALSE(!token->MaySubmitNewTasks())) {
+    return Status::ServiceUnavailable("Thread pool token was shut down");
+  }
+
   // Size limit check.
   int64_t capacity_remaining = static_cast<int64_t>(max_threads_) - active_threads_ +
-                               static_cast<int64_t>(max_queue_size_) - queue_size_;
+                               static_cast<int64_t>(max_queue_size_) - total_queued_tasks_;
   if (capacity_remaining < 1) {
     return Status::ServiceUnavailable(
         Substitute("Thread pool is at capacity ($0/$1 tasks running, $2/$3 tasks queued)",
-                   num_threads_, max_threads_, queue_size_, max_queue_size_));
+                   num_threads_, max_threads_, total_queued_tasks_, max_queue_size_));
   }
 
   // Should we create another thread?
@@ -212,8 +469,10 @@ Status ThreadPool::Submit(std::shared_ptr<Runnable> task) {
   // It's also harmless.
   //
   // Of course, we never create more than max_threads_ threads no matter what.
+  int threads_from_this_submit =
+      token->IsActive() && token->mode() == ExecutionMode::SERIAL ? 0 : 1;
   int inactive_threads = num_threads_ - active_threads_;
-  int additional_threads = (queue_size_ + 1) - inactive_threads;
+  int additional_threads = (queue_.size() + threads_from_this_submit) - inactive_threads;
   if (additional_threads > 0 && num_threads_ < max_threads_) {
     Status status = CreateThreadUnlocked();
     if (!status.ok()) {
@@ -228,18 +487,29 @@ Status ThreadPool::Submit(std::shared_ptr<Runnable> task) {
     }
   }
 
-  QueueEntry e;
-  e.runnable = std::move(task);
-  e.trace = Trace::CurrentTrace();
+  Task task;
+  task.runnable = std::move(r);
+  task.trace = Trace::CurrentTrace();
   // Need to AddRef, since the thread which submitted the task may go away,
   // and we don't want the trace to be destructed while waiting in the queue.
-  if (e.trace) {
-    e.trace->AddRef();
+  if (task.trace) {
+    task.trace->AddRef();
   }
-  e.submit_time = submit_time;
-
-  queue_.emplace_back(std::move(e));
-  int length_at_submit = queue_size_++;
+  task.submit_time = submit_time;
+
+  // Add the task to the token's queue.
+  ThreadPoolToken::State state = token->state();
+  DCHECK(state == ThreadPoolToken::State::IDLE ||
+         state == ThreadPoolToken::State::RUNNING);
+  token->entries_.emplace_back(std::move(task));
+  if (state == ThreadPoolToken::State::IDLE ||
+      token->mode() == ExecutionMode::CONCURRENT) {
+    queue_.emplace_back(token);
+    if (state == ThreadPoolToken::State::IDLE) {
+      token->Transition(ThreadPoolToken::State::RUNNING);
+    }
+  }
+  int length_at_submit = total_queued_tasks_++;
 
   guard.Unlock();
   not_empty_.Signal();
@@ -254,7 +524,7 @@ Status ThreadPool::Submit(std::shared_ptr<Runnable> task) {
 void ThreadPool::Wait() {
   MutexLock unique_lock(lock_);
   CheckNotPoolThreadUnlocked();
-  while ((!queue_.empty()) || (active_threads_ > 0)) {
+  while (total_queued_tasks_ > 0 || active_threads_ > 0) {
     idle_cond_.Wait();
   }
 }
@@ -266,7 +536,7 @@ bool ThreadPool::WaitUntil(const MonoTime& until) {
 bool ThreadPool::WaitFor(const MonoDelta& delta) {
   MutexLock unique_lock(lock_);
   CheckNotPoolThreadUnlocked();
-  while ((!queue_.empty()) || (active_threads_ > 0)) {
+  while (total_queued_tasks_ > 0 || active_threads_ > 0) {
     if (!idle_cond_.TimedWait(delta)) {
       return false;
     }
@@ -274,7 +544,6 @@ bool ThreadPool::WaitFor(const MonoDelta& delta) {
   return true;
 }
 
-
 void ThreadPool::SetQueueLengthHistogram(const scoped_refptr<Histogram>& hist) {
   queue_length_histogram_ = hist;
 }
@@ -317,23 +586,28 @@ void ThreadPool::DispatchThread(bool permanent) {
       continue;
     }
 
-    // Fetch a pending task
-    QueueEntry entry = std::move(queue_.front());
+    // Get the next token and task to execute.
+    ThreadPoolToken* token = queue_.front();
     queue_.pop_front();
-    queue_size_--;
+    DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
+    DCHECK(!token->entries_.empty());
+    Task task = std::move(token->entries_.front());
+    token->entries_.pop_front();
+    token->active_threads_++;
+    --total_queued_tasks_;
     ++active_threads_;
 
     unique_lock.Unlock();
 
     // Release the reference which was held by the queued item.
-    ADOPT_TRACE(entry.trace);
-    if (entry.trace) {
-      entry.trace->Release();
+    ADOPT_TRACE(task.trace);
+    if (task.trace) {
+      task.trace->Release();
     }
 
     // Update metrics
     MonoTime now(MonoTime::Now());
-    int64_t queue_time_us = (now - entry.submit_time).ToMicroseconds();
+    int64_t queue_time_us = (now - task.submit_time).ToMicroseconds();
     TRACE_COUNTER_INCREMENT(queue_time_trace_metric_name_, queue_time_us);
     if (queue_time_us_histogram_) {
       queue_time_us_histogram_->Increment(queue_time_us);
@@ -344,7 +618,7 @@ void ThreadPool::DispatchThread(bool permanent) {
       MicrosecondsInt64 start_wall_us = GetMonoTimeMicros();
       MicrosecondsInt64 start_cpu_us = GetThreadCpuTimeMicros();
 
-      entry.runnable->Run();
+      task.runnable->Run();
 
       int64_t wall_us = GetMonoTimeMicros() - start_wall_us;
       int64_t cpu_us = GetThreadCpuTimeMicros() - start_cpu_us;
@@ -361,9 +635,26 @@ void ThreadPool::DispatchThread(bool permanent) {
     // objects, and we don't want to block submission of the threadpool.
     // In the worst case, the destructor might even try to do something
     // with this threadpool, and produce a deadlock.
-    entry.runnable.reset();
+    task.runnable.reset();
     unique_lock.Lock();
 
+    // Possible states:
+    // 1. The token was shut down while we ran its task. Transition to QUIESCED.
+    // 2. The token has no more queued tasks. Transition back to IDLE.
+    // 3. The token has more tasks. Requeue it and transition back to RUNNABLE.
+    ThreadPoolToken::State state = token->state();
+    DCHECK(state == ThreadPoolToken::State::RUNNING ||
+           state == ThreadPoolToken::State::QUIESCING);
+    if (--token->active_threads_ == 0) {
+      if (state == ThreadPoolToken::State::QUIESCING) {
+        DCHECK(token->entries_.empty());
+        token->Transition(ThreadPoolToken::State::QUIESCED);
+      } else if (token->entries_.empty()) {
+        token->Transition(ThreadPoolToken::State::IDLE);
+      } else if (token->mode() == ExecutionMode::SERIAL) {
+        queue_.emplace_back(token);
+      }
+    }
     if (--active_threads_ == 0) {
       idle_cond_.Broadcast();
     }
@@ -381,7 +672,7 @@ void ThreadPool::DispatchThread(bool permanent) {
     // Sanity check: if we're the last thread exiting, the queue ought to be
     // empty. Otherwise it will never get processed.
     CHECK(queue_.empty());
-    DCHECK_EQ(0, queue_size_);
+    DCHECK_EQ(0, total_queued_tasks_);
   }
 }
 
@@ -407,4 +698,8 @@ void ThreadPool::CheckNotPoolThreadUnlocked() {
   }
 }
 
+std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) {
+  return o << ThreadPoolToken::StateToString(s);
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/428a34ba/src/kudu/util/threadpool.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index 7ac6d17..e15c991 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -17,13 +17,14 @@
 #ifndef KUDU_UTIL_THREAD_POOL_H
 #define KUDU_UTIL_THREAD_POOL_H
 
-#include <boost/function.hpp>
-#include <gtest/gtest_prod.h>
-#include <list>
+#include <deque>
+#include <iosfwd>
 #include <memory>
-#include <unordered_set>
 #include <string>
-#include <vector>
+#include <unordered_set>
+
+#include <boost/function.hpp>
+#include <gtest/gtest_prod.h>
 
 #include "kudu/gutil/callback_forward.h"
 #include "kudu/gutil/gscoped_ptr.h"
@@ -40,6 +41,7 @@ namespace kudu {
 class Histogram;
 class Thread;
 class ThreadPool;
+class ThreadPoolToken;
 class Trace;
 
 class Runnable {
@@ -124,8 +126,25 @@ class ThreadPoolBuilder {
 };
 
 // Thread pool with a variable number of threads.
-// The pool can execute a class that implements the Runnable interface, or a
-// boost::function, which can be obtained via boost::bind().
+//
+// Tasks submitted directly to the thread pool enter a FIFO queue and are
+// dispatched to a worker thread when one becomes free. Tasks may also be
+// submitted via ThreadPoolTokens. The token Wait() and Shutdown() functions
+// can then be used to block on logical groups of tasks.
+//
+// A token operates in one of two ExecutionModes, determined at token
+// construction time:
+// 1. SERIAL: submitted tasks are run one at a time.
+// 2. CONCURRENT: submitted tasks may be run in parallel. This isn't unlike
+//    tasks submitted without a token, but the logical grouping that tokens
+//    impart can be useful when a pool is shared by many contexts (e.g. to
+//    safely shut down one context, to derive context-specific metrics, etc.).
+//
+// Tasks submitted without a token or via ExecutionMode::CONCURRENT tokens are
+// processed in FIFO order. On the other hand, ExecutionMode::SERIAL tokens are
+// processed in a round-robin fashion, one task at a time. This prevents them
+// from starving one another. However, tokenless (and CONCURRENT token-based)
+// tasks can starve SERIAL token-based tasks.
 //
 // Usage Example:
 //    static void Func(int n) { ... }
@@ -140,7 +159,7 @@ class ThreadPoolBuilder {
 //            .set_idle_timeout(MonoDelta::FromMilliseconds(2000))
 //            .Build(&thread_pool));
 //    thread_pool->Submit(shared_ptr<Runnable>(new Task()));
-//    thread_pool->Submit(boost::bind(&Func, 10));
+//    thread_pool->SubmitFunc(boost::bind(&Func, 10));
 class ThreadPool {
  public:
   ~ThreadPool();
@@ -153,16 +172,16 @@ class ThreadPool {
   //       require an explicit "abort" notification to exit from the run loop.
   void Shutdown();
 
-  // Submit a function using the kudu Closure system.
-  Status SubmitClosure(const Closure& task) WARN_UNUSED_RESULT;
+  // Submits a function using the kudu Closure system.
+  Status SubmitClosure(Closure c) WARN_UNUSED_RESULT;
 
-  // Submit a function binded using boost::bind(&FuncName, args...)
-  Status SubmitFunc(boost::function<void()> func) WARN_UNUSED_RESULT;
+  // Submits a function bound using boost::bind(&FuncName, args...).
+  Status SubmitFunc(boost::function<void()> f) WARN_UNUSED_RESULT;
 
-  // Submit a Runnable class
-  Status Submit(std::shared_ptr<Runnable> task) WARN_UNUSED_RESULT;
+  // Submits a Runnable class.
+  Status Submit(std::shared_ptr<Runnable> r) WARN_UNUSED_RESULT;
 
-  // Wait until all the tasks are completed.
+  // Waits until all the tasks are completed.
   void Wait();
 
   // Waits for the pool to reach the idle state, or until 'until' time is reached.
@@ -173,52 +192,66 @@ class ThreadPool {
   // Returns true if the pool reached the idle state, false otherwise.
   bool WaitFor(const MonoDelta& delta);
 
-  // Return the current number of tasks waiting in the queue.
-  // Typically used for metrics.
-  int queue_length() const {
-    return ANNOTATE_UNPROTECTED_READ(queue_size_);
-  }
+  // Allocates a new token for use in token-based task submission. All tokens
+  // must be destroyed before their ThreadPool is destroyed.
+  //
+  // There is no limit on the number of tokens that may be allocated.
+  enum class ExecutionMode {
+    // Tasks submitted via this token will be executed serially.
+    SERIAL,
 
-  // Attach a histogram which measures the queue length seen by tasks when they enter
+    // Tasks submitted via this token may be executed concurrently.
+    CONCURRENT,
+  };
+  std::unique_ptr<ThreadPoolToken> NewToken(ExecutionMode mode);
+
+  // Attaches a histogram which measures the queue length seen by tasks when they enter
   // the thread pool's queue.
   void SetQueueLengthHistogram(const scoped_refptr<Histogram>& hist);
 
-  // Attach a histogram which measures the amount of time that tasks spend waiting in
+  // Attaches a histogram which measures the amount of time that tasks spend waiting in
   // the queue.
   void SetQueueTimeMicrosHistogram(const scoped_refptr<Histogram>& hist);
 
-  // Attach a histogram which measures the amount of time that tasks spend running.
+  // Attaches a histogram which measures the amount of time that tasks spend running.
   void SetRunTimeMicrosHistogram(const scoped_refptr<Histogram>& hist);
 
  private:
+  FRIEND_TEST(ThreadPoolTest, TestThreadPoolWithNoMinimum);
+  FRIEND_TEST(ThreadPoolTest, TestVariableSizeThreadPool);
+
   friend class ThreadPoolBuilder;
+  friend class ThreadPoolToken;
+
+  // Client-provided task to be executed by this pool.
+  struct Task {
+    std::shared_ptr<Runnable> runnable;
+    Trace* trace;
+
+    // Time at which the entry was submitted to the pool.
+    MonoTime submit_time;
+  };
 
-  // Create a new thread pool using a builder.
+  // Creates a new thread pool using a builder.
   explicit ThreadPool(const ThreadPoolBuilder& builder);
 
-  // Initialize the thread pool by starting the minimum number of threads.
+  // Initializes the thread pool by starting the minimum number of threads.
   Status Init();
 
   // Dispatcher responsible for dequeueing and executing the tasks
   void DispatchThread(bool permanent);
 
-  // Create new thread. Required that lock_ is held.
+  // Creates new thread. Required that lock_ is held.
   Status CreateThreadUnlocked();
 
   // Aborts if the current thread is a member of this thread pool.
   void CheckNotPoolThreadUnlocked();
 
- private:
-  FRIEND_TEST(ThreadPoolTest, TestThreadPoolWithNoMinimum);
-  FRIEND_TEST(ThreadPoolTest, TestVariableSizeThreadPool);
-
-  struct QueueEntry {
-    std::shared_ptr<Runnable> runnable;
-    Trace* trace;
+  // Submits a task to be run via token.
+  Status DoSubmit(std::shared_ptr<Runnable> r, ThreadPoolToken* token);
 
-    // Time at which the entry was submitted to the pool.
-    MonoTime submit_time;
-  };
+  // Releases token 't' and invalidates it.
+  void ReleaseToken(ThreadPoolToken* t);
 
   const std::string name_;
   const int min_threads_;
@@ -226,15 +259,53 @@ class ThreadPool {
   const int max_queue_size_;
   const MonoDelta idle_timeout_;
 
+  // Overall status of the pool. Set to an error when the pool is shut down.
+  //
+  // Protected by 'lock_'.
   Status pool_status_;
+
+  // Synchronizes many of the members of the pool and all of its
+  // condition variables.
   Mutex lock_;
+
+  // Condition variable for "pool is idling". Waiters wake up when
+  // active_threads_ reaches zero.
   ConditionVariable idle_cond_;
+
+  // Condition variable for "pool has no threads". Waiters wake up when
+  // num_threads_ reaches zero.
   ConditionVariable no_threads_cond_;
+
+  // Condition variable for "queue is not empty". Waiters wake up when
+  // a new task is queued.
   ConditionVariable not_empty_;
+
+  // Number of threads currently running.
+  //
+  // Protected by lock_.
   int num_threads_;
+
+  // Number of threads currently running and executing client tasks.
+  //
+  // Protected by lock_.
   int active_threads_;
-  int queue_size_;
-  std::list<QueueEntry> queue_;
+
+  // Total number of client tasks queued, either directly (queue_) or
+  // indirectly (tokens_).
+  //
+  // Protected by lock_.
+  int total_queued_tasks_;
+
+  // All allocated tokens.
+  //
+  // Protected by lock_.
+  std::unordered_set<ThreadPoolToken*> tokens_;
+
+  // FIFO of tokens from which tasks should be executed. Does not own the
+  // tokens; they are owned by clients and are removed from the FIFO on shutdown.
+  //
+  // Protected by lock_.
+  std::deque<ThreadPoolToken*> queue_;
 
   // Pointers to all running threads. Raw pointers are safe because a Thread
   // may only go out of scope after being removed from threads_.
@@ -242,6 +313,9 @@ class ThreadPool {
   // Protected by lock_.
   std::unordered_set<Thread*> threads_;
 
+  // ExecutionMode::CONCURRENT token used by the pool for tokenless submission.
+  std::unique_ptr<ThreadPoolToken> tokenless_;
+
   scoped_refptr<Histogram> queue_length_histogram_;
   scoped_refptr<Histogram> queue_time_us_histogram_;
   scoped_refptr<Histogram> run_time_us_histogram_;
@@ -253,5 +327,129 @@ class ThreadPool {
   DISALLOW_COPY_AND_ASSIGN(ThreadPool);
 };
 
+// Entry point for token-based task submission and blocking for a particular
+// thread pool. Tokens can only be created via ThreadPool::NewToken().
+//
+// All functions are thread-safe. Mutable members are protected via the
+// ThreadPool's lock.
+class ThreadPoolToken {
+ public:
+  // Destroys the token.
+  //
+  // May be called on a token with outstanding tasks, as Shutdown() will be
+  // called first to take care of them.
+  ~ThreadPoolToken();
+
+  // Submits a function using the kudu Closure system.
+  Status SubmitClosure(Closure c) WARN_UNUSED_RESULT;
+
+  // Submits a function bound using boost::bind(&FuncName, args...).
+  Status SubmitFunc(boost::function<void()> f) WARN_UNUSED_RESULT;
+
+  // Submits a Runnable class.
+  Status Submit(std::shared_ptr<Runnable> r) WARN_UNUSED_RESULT;
+
+  // Marks the token as unusable for future submissions. Any queued tasks not
+  // yet running are destroyed. If tasks are in flight, Shutdown() will wait
+  // on their completion before returning.
+  void Shutdown();
+
+  // Waits until all the tasks submitted via this token are completed.
+  void Wait();
+
+  // Waits for all submissions using this token are complete, or until 'until'
+  // time is reached.
+  //
+  // Returns true if all submissions are complete, false otherwise.
+  bool WaitUntil(const MonoTime& until);
+
+  // Waits for all submissions using this token are complete, or until 'delta'
+  // time elapses.
+  //
+  // Returns true if all submissions are complete, false otherwise.
+  bool WaitFor(const MonoDelta& delta);
+
+ private:
+  // All possible token states. Legal state transitions:
+  //   IDLE      -> RUNNING: task is submitted via token
+  //   IDLE      -> QUIESCED: token or pool is shut down
+  //   RUNNING   -> IDLE: worker thread finishes executing a task and
+  //                      there are no more tasks queued to the token
+  //   RUNNING   -> QUIESCING: token or pool is shut down while worker thread
+  //                           is executing a task
+  //   RUNNING   -> QUIESCED: token or pool is shut down
+  //   QUIESCING -> QUIESCED:  worker thread finishes executing a task
+  //                           belonging to a shut down token or pool
+  enum class State {
+    // Token has no queued tasks.
+    IDLE,
+
+    // A worker thread is running one of the token's previously queued tasks.
+    RUNNING,
+
+    // No new tasks may be submitted to the token. A worker thread is still
+    // running a previously queued task.
+    QUIESCING,
+
+    // No new tasks may be submitted to the token. There are no active tasks
+    // either. At this state, the token may only be destroyed.
+    QUIESCED,
+  };
+
+  // Writes a textual representation of the token state in 's' to 'o'.
+  friend std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s);
+
+  friend class ThreadPool;
+
+  // Returns a textual representation of 's' suitable for debugging.
+  static const char* StateToString(State s);
+
+  // Constructs a new token.
+  //
+  // The token may not outlive its thread pool ('pool').
+  ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode);
+
+  // Changes this token's state to 'new_state' taking actions as needed.
+  void Transition(State new_state);
+
+  // Returns true if this token has a task queued and ready to run, or if a
+  // task belonging to this token is already running.
+  bool IsActive() const {
+    return state_ == State::RUNNING ||
+           state_ == State::QUIESCING;
+  }
+
+  // Returns true if new tasks may be submitted to this token.
+  bool MaySubmitNewTasks() const {
+    return state_ != State::QUIESCING &&
+           state_ != State::QUIESCED;
+  }
+
+  State state() const { return state_; }
+  ThreadPool::ExecutionMode mode() const { return mode_; }
+
+  // Token's configured execution mode.
+  const ThreadPool::ExecutionMode mode_;
+
+  // Pointer to the token's thread pool.
+  ThreadPool* pool_;
+
+  // Token state machine.
+  State state_;
+
+  // Queued client tasks.
+  std::deque<ThreadPool::Task> entries_;
+
+  // Condition variable for "token is idle". Waiters wake up when the token
+  // transitions to IDLE or QUIESCED.
+  ConditionVariable not_running_cond_;
+
+  // Number of worker threads currently executing tasks belonging to this
+  // token.
+  int active_threads_;
+
+  DISALLOW_COPY_AND_ASSIGN(ThreadPoolToken);
+};
+
 } // namespace kudu
 #endif


Mime
View raw message