impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taras...@apache.org
Subject [03/30] incubator-impala git commit: IMPALA-4669: [KUTIL] Import kudu_util library from kudu@314c9d8
Date Sat, 17 Jun 2017 07:25:29 GMT
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/thread.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/thread.cc b/be/src/kudu/util/thread.cc
new file mode 100644
index 0000000..471b87d
--- /dev/null
+++ b/be/src/kudu/util/thread.cc
@@ -0,0 +1,617 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Copied from Impala and adapted to Kudu.
+
+#include "kudu/util/thread.h"
+
+#include <sys/resource.h>
+#include <sys/syscall.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <algorithm>
+#include <map>
+#include <memory>
+#include <set>
+#include <sstream>
+#include <vector>
+
+#if defined(__linux__)
+#include <sys/prctl.h>
+#endif // defined(__linux__)
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/mathlimits.h"
+#include "kudu/gutil/once.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/kernel_stack_watchdog.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/os-util.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/url-coding.h"
+#include "kudu/util/trace.h"
+#include "kudu/util/web_callback_registry.h"
+
+using boost::bind;
+using boost::mem_fn;
+using std::endl;
+using std::map;
+using std::ostringstream;
+using std::shared_ptr;
+using strings::Substitute;
+
+METRIC_DEFINE_gauge_uint64(server, threads_started,
+                           "Threads Started",
+                           kudu::MetricUnit::kThreads,
+                           "Total number of threads started on this server",
+                           kudu::EXPOSE_AS_COUNTER);
+
+METRIC_DEFINE_gauge_uint64(server, threads_running,
+                           "Threads Running",
+                           kudu::MetricUnit::kThreads,
+                           "Current number of running threads");
+
+METRIC_DEFINE_gauge_uint64(server, cpu_utime,
+                           "User CPU Time",
+                           kudu::MetricUnit::kMilliseconds,
+                           "Total user CPU time of the process",
+                           kudu::EXPOSE_AS_COUNTER);
+
+METRIC_DEFINE_gauge_uint64(server, cpu_stime,
+                           "System CPU Time",
+                           kudu::MetricUnit::kMilliseconds,
+                           "Total system CPU time of the process",
+                           kudu::EXPOSE_AS_COUNTER);
+
+METRIC_DEFINE_gauge_uint64(server, voluntary_context_switches,
+                           "Voluntary Context Switches",
+                           kudu::MetricUnit::kContextSwitches,
+                           "Total voluntary context switches",
+                           kudu::EXPOSE_AS_COUNTER);
+
+METRIC_DEFINE_gauge_uint64(server, involuntary_context_switches,
+                           "Involuntary Context Switches",
+                           kudu::MetricUnit::kContextSwitches,
+                           "Total involuntary context switches",
+                           kudu::EXPOSE_AS_COUNTER);
+
+namespace kudu {
+
+static uint64_t GetCpuUTime() {
+  rusage ru;
+  CHECK_ERR(getrusage(RUSAGE_SELF, &ru));
+  return ru.ru_utime.tv_sec * 1000UL + ru.ru_utime.tv_usec / 1000UL;
+}
+
+static uint64_t GetCpuSTime() {
+  rusage ru;
+  CHECK_ERR(getrusage(RUSAGE_SELF, &ru));
+  return ru.ru_stime.tv_sec * 1000UL + ru.ru_stime.tv_usec / 1000UL;
+}
+
+static uint64_t GetVoluntaryContextSwitches() {
+  rusage ru;
+  CHECK_ERR(getrusage(RUSAGE_SELF, &ru));
+  return ru.ru_nvcsw;;
+}
+
+static uint64_t GetInVoluntaryContextSwitches() {
+  rusage ru;
+  CHECK_ERR(getrusage(RUSAGE_SELF, &ru));
+  return ru.ru_nivcsw;
+}
+
+class ThreadMgr;
+
+__thread Thread* Thread::tls_ = NULL;
+
+// Singleton instance of ThreadMgr. Only visible in this file, used only by Thread.
+// The Thread class adds a reference to thread_manager while it is supervising a thread so
+// that a race between the end of the process's main thread (and therefore the destruction
+// of thread_manager) and the end of a thread that tries to remove itself from the
+// manager after the destruction can be avoided.
+static shared_ptr<ThreadMgr> thread_manager;
+
+// Controls the single (lazy) initialization of thread_manager.
+static GoogleOnceType once = GOOGLE_ONCE_INIT;
+
+// A singleton class that tracks all live threads, and groups them together for easy
+// auditing. Used only by Thread.
+class ThreadMgr {
+ public:
+  ThreadMgr()
+      : metrics_enabled_(false),
+        threads_started_metric_(0),
+        threads_running_metric_(0) {
+  }
+
+  ~ThreadMgr() {
+    MutexLock l(lock_);
+    thread_categories_.clear();
+  }
+
+  static void SetThreadName(const std::string& name, int64 tid);
+
+  Status StartInstrumentation(const scoped_refptr<MetricEntity>& metrics, WebCallbackRegistry* web);
+
+  // Registers a thread to the supplied category. The key is a pthread_t,
+  // not the system TID, since pthread_t is less prone to being recycled.
+  void AddThread(const pthread_t& pthread_id, const string& name, const string& category,
+      int64_t tid);
+
+  // Removes a thread from the supplied category. If the thread has
+  // already been removed, this is a no-op.
+  void RemoveThread(const pthread_t& pthread_id, const string& category);
+
+ private:
+  // Container class for any details we want to capture about a thread
+  // TODO: Add start-time.
+  // TODO: Track fragment ID.
+  class ThreadDescriptor {
+   public:
+    ThreadDescriptor() { }
+    ThreadDescriptor(string category, string name, int64_t thread_id)
+        : name_(std::move(name)),
+          category_(std::move(category)),
+          thread_id_(thread_id) {}
+
+    const string& name() const { return name_; }
+    const string& category() const { return category_; }
+    int64_t thread_id() const { return thread_id_; }
+
+   private:
+    string name_;
+    string category_;
+    int64_t thread_id_;
+  };
+
+  // A ThreadCategory is a set of threads that are logically related.
+  // TODO: unordered_map is incompatible with pthread_t, but would be more
+  // efficient here.
+  typedef map<const pthread_t, ThreadDescriptor> ThreadCategory;
+
+  // All thread categorys, keyed on the category name.
+  typedef map<string, ThreadCategory> ThreadCategoryMap;
+
+  // Protects thread_categories_ and metrics_enabled_
+  Mutex lock_;
+
+  // All thread categorys that ever contained a thread, even if empty
+  ThreadCategoryMap thread_categories_;
+
+  // True after StartInstrumentation(..) returns
+  bool metrics_enabled_;
+
+  // Counters to track all-time total number of threads, and the
+  // current number of running threads.
+  uint64_t threads_started_metric_;
+  uint64_t threads_running_metric_;
+
+  // Metric callbacks.
+  uint64_t ReadThreadsStarted();
+  uint64_t ReadThreadsRunning();
+
+  // Webpage callback; prints all threads by category
+  void ThreadPathHandler(const WebCallbackRegistry::WebRequest& args, ostringstream* output);
+  void PrintThreadCategoryRows(const ThreadCategory& category, ostringstream* output);
+};
+
+void ThreadMgr::SetThreadName(const string& name, int64 tid) {
+  // On linux we can get the thread names to show up in the debugger by setting
+  // the process name for the LWP.  We don't want to do this for the main
+  // thread because that would rename the process, causing tools like killall
+  // to stop working.
+  if (tid == getpid()) {
+    return;
+  }
+
+#if defined(__linux__)
+  // http://0pointer.de/blog/projects/name-your-threads.html
+  // Set the name for the LWP (which gets truncated to 15 characters).
+  // Note that glibc also has a 'pthread_setname_np' api, but it may not be
+  // available everywhere and it's only benefit over using prctl directly is
+  // that it can set the name of threads other than the current thread.
+  int err = prctl(PR_SET_NAME, name.c_str());
+#else
+  int err = pthread_setname_np(name.c_str());
+#endif // defined(__linux__)
+  // We expect EPERM failures in sandboxed processes, just ignore those.
+  if (err < 0 && errno != EPERM) {
+    PLOG(ERROR) << "SetThreadName";
+  }
+}
+
+Status ThreadMgr::StartInstrumentation(const scoped_refptr<MetricEntity>& metrics,
+                                       WebCallbackRegistry* web) {
+  MutexLock l(lock_);
+  metrics_enabled_ = true;
+
+  // Use function gauges here so that we can register a unique copy of these metrics in
+  // multiple tservers, even though the ThreadMgr is itself a singleton.
+  metrics->NeverRetire(
+      METRIC_threads_started.InstantiateFunctionGauge(metrics,
+        Bind(&ThreadMgr::ReadThreadsStarted, Unretained(this))));
+  metrics->NeverRetire(
+      METRIC_threads_running.InstantiateFunctionGauge(metrics,
+        Bind(&ThreadMgr::ReadThreadsRunning, Unretained(this))));
+  metrics->NeverRetire(
+      METRIC_cpu_utime.InstantiateFunctionGauge(metrics,
+        Bind(&GetCpuUTime)));
+  metrics->NeverRetire(
+      METRIC_cpu_stime.InstantiateFunctionGauge(metrics,
+        Bind(&GetCpuSTime)));
+  metrics->NeverRetire(
+      METRIC_voluntary_context_switches.InstantiateFunctionGauge(metrics,
+        Bind(&GetVoluntaryContextSwitches)));
+  metrics->NeverRetire(
+      METRIC_involuntary_context_switches.InstantiateFunctionGauge(metrics,
+        Bind(&GetInVoluntaryContextSwitches)));
+
+  if (web) {
+    WebCallbackRegistry::PathHandlerCallback thread_callback =
+        bind<void>(mem_fn(&ThreadMgr::ThreadPathHandler), this, _1, _2);
+    DCHECK_NOTNULL(web)->RegisterPathHandler("/threadz", "Threads", thread_callback);
+  }
+  return Status::OK();
+}
+
+uint64_t ThreadMgr::ReadThreadsStarted() {
+  MutexLock l(lock_);
+  return threads_started_metric_;
+}
+
+uint64_t ThreadMgr::ReadThreadsRunning() {
+  MutexLock l(lock_);
+  return threads_running_metric_;
+}
+
+void ThreadMgr::AddThread(const pthread_t& pthread_id, const string& name,
+    const string& category, int64_t tid) {
+  // These annotations cause TSAN to ignore the synchronization on lock_
+  // without causing the subsequent mutations to be treated as data races
+  // in and of themselves (that's what IGNORE_READS_AND_WRITES does).
+  //
+  // Why do we need them here and in SuperviseThread()? TSAN operates by
+  // observing synchronization events and using them to establish "happens
+  // before" relationships between threads. Where these relationships are
+  // not built, shared state access constitutes a data race. The
+  // synchronization events here, in RemoveThread(), and in
+  // SuperviseThread() may cause TSAN to establish a "happens before"
+  // relationship between thread functors, ignoring potential data races.
+  // The annotations prevent this from happening.
+  ANNOTATE_IGNORE_SYNC_BEGIN();
+  ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN();
+  {
+    MutexLock l(lock_);
+    thread_categories_[category][pthread_id] = ThreadDescriptor(category, name, tid);
+    if (metrics_enabled_) {
+      threads_running_metric_++;
+      threads_started_metric_++;
+    }
+  }
+  ANNOTATE_IGNORE_SYNC_END();
+  ANNOTATE_IGNORE_READS_AND_WRITES_END();
+}
+
+void ThreadMgr::RemoveThread(const pthread_t& pthread_id, const string& category) {
+  ANNOTATE_IGNORE_SYNC_BEGIN();
+  ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN();
+  {
+    MutexLock l(lock_);
+    auto category_it = thread_categories_.find(category);
+    DCHECK(category_it != thread_categories_.end());
+    category_it->second.erase(pthread_id);
+    if (metrics_enabled_) {
+      threads_running_metric_--;
+    }
+  }
+  ANNOTATE_IGNORE_SYNC_END();
+  ANNOTATE_IGNORE_READS_AND_WRITES_END();
+}
+
+void ThreadMgr::PrintThreadCategoryRows(const ThreadCategory& category,
+    ostringstream* output) {
+  for (const ThreadCategory::value_type& thread : category) {
+    ThreadStats stats;
+    Status status = GetThreadStats(thread.second.thread_id(), &stats);
+    if (!status.ok()) {
+      KLOG_EVERY_N(INFO, 100) << "Could not get per-thread statistics: "
+                              << status.ToString();
+    }
+    (*output) << "<tr><td>" << thread.second.name() << "</td><td>"
+              << (static_cast<double>(stats.user_ns) / 1e9) << "</td><td>"
+              << (static_cast<double>(stats.kernel_ns) / 1e9) << "</td><td>"
+              << (static_cast<double>(stats.iowait_ns) / 1e9) << "</td></tr>";
+  }
+}
+
+void ThreadMgr::ThreadPathHandler(const WebCallbackRegistry::WebRequest& req,
+    ostringstream* output) {
+  MutexLock l(lock_);
+  vector<const ThreadCategory*> categories_to_print;
+  auto category_name = req.parsed_args.find("group");
+  if (category_name != req.parsed_args.end()) {
+    string group = EscapeForHtmlToString(category_name->second);
+    (*output) << "<h2>Thread Group: " << group << "</h2>" << endl;
+    if (group != "all") {
+      ThreadCategoryMap::const_iterator category = thread_categories_.find(group);
+      if (category == thread_categories_.end()) {
+        (*output) << "Thread group '" << group << "' not found" << endl;
+        return;
+      }
+      categories_to_print.push_back(&category->second);
+      (*output) << "<h3>" << category->first << " : " << category->second.size()
+                << "</h3>";
+    } else {
+      for (const ThreadCategoryMap::value_type& category : thread_categories_) {
+        categories_to_print.push_back(&category.second);
+      }
+      (*output) << "<h3>All Threads : </h3>";
+    }
+
+    (*output) << "<table class='table table-hover table-border'>";
+    (*output) << "<thead><tr><th>Thread name</th><th>Cumulative User CPU(s)</th>"
+              << "<th>Cumulative Kernel CPU(s)</th>"
+              << "<th>Cumulative IO-wait(s)</th></tr></thead>";
+    (*output) << "<tbody>\n";
+
+    for (const ThreadCategory* category : categories_to_print) {
+      PrintThreadCategoryRows(*category, output);
+    }
+    (*output) << "</tbody></table>";
+  } else {
+    (*output) << "<h2>Thread Groups</h2>";
+    if (metrics_enabled_) {
+      (*output) << "<h4>" << threads_running_metric_ << " thread(s) running";
+    }
+    (*output) << "<a href='/threadz?group=all'><h3>All Threads</h3>";
+
+    for (const ThreadCategoryMap::value_type& category : thread_categories_) {
+      string category_arg;
+      UrlEncode(category.first, &category_arg);
+      (*output) << "<a href='/threadz?group=" << category_arg << "'><h3>"
+                << category.first << " : " << category.second.size() << "</h3></a>";
+    }
+  }
+}
+
+static void InitThreading() {
+  // Warm up the stack trace library. This avoids a race in libunwind initialization
+  // by making sure we initialize it before we start any other threads.
+  ignore_result(GetStackTraceHex());
+  thread_manager.reset(new ThreadMgr());
+}
+
+Status StartThreadInstrumentation(const scoped_refptr<MetricEntity>& server_metrics,
+                                  WebCallbackRegistry* web) {
+  GoogleOnceInit(&once, &InitThreading);
+  return thread_manager->StartInstrumentation(server_metrics, web);
+}
+
+ThreadJoiner::ThreadJoiner(Thread* thr)
+  : thread_(CHECK_NOTNULL(thr)),
+    warn_after_ms_(kDefaultWarnAfterMs),
+    warn_every_ms_(kDefaultWarnEveryMs),
+    give_up_after_ms_(kDefaultGiveUpAfterMs) {
+}
+
+ThreadJoiner& ThreadJoiner::warn_after_ms(int ms) {
+  warn_after_ms_ = ms;
+  return *this;
+}
+
+ThreadJoiner& ThreadJoiner::warn_every_ms(int ms) {
+  warn_every_ms_ = ms;
+  return *this;
+}
+
+ThreadJoiner& ThreadJoiner::give_up_after_ms(int ms) {
+  give_up_after_ms_ = ms;
+  return *this;
+}
+
+Status ThreadJoiner::Join() {
+  if (Thread::current_thread() &&
+      Thread::current_thread()->tid() == thread_->tid()) {
+    return Status::InvalidArgument("Can't join on own thread", thread_->name_);
+  }
+
+  // Early exit: double join is a no-op.
+  if (!thread_->joinable_) {
+    return Status::OK();
+  }
+
+  int waited_ms = 0;
+  bool keep_trying = true;
+  while (keep_trying) {
+    if (waited_ms >= warn_after_ms_) {
+      LOG(WARNING) << Substitute("Waited for $0ms trying to join with $1 (tid $2)",
+                                 waited_ms, thread_->name_, thread_->tid_);
+    }
+
+    int remaining_before_giveup = MathLimits<int>::kMax;
+    if (give_up_after_ms_ != -1) {
+      remaining_before_giveup = give_up_after_ms_ - waited_ms;
+    }
+
+    int remaining_before_next_warn = warn_every_ms_;
+    if (waited_ms < warn_after_ms_) {
+      remaining_before_next_warn = warn_after_ms_ - waited_ms;
+    }
+
+    if (remaining_before_giveup < remaining_before_next_warn) {
+      keep_trying = false;
+    }
+
+    int wait_for = std::min(remaining_before_giveup, remaining_before_next_warn);
+
+    if (thread_->done_.WaitFor(MonoDelta::FromMilliseconds(wait_for))) {
+      // Unconditionally join before returning, to guarantee that any TLS
+      // has been destroyed (pthread_key_create() destructors only run
+      // after a pthread's user method has returned).
+      int ret = pthread_join(thread_->thread_, NULL);
+      CHECK_EQ(ret, 0);
+      thread_->joinable_ = false;
+      return Status::OK();
+    }
+    waited_ms += wait_for;
+  }
+  return Status::Aborted(strings::Substitute("Timed out after $0ms joining on $1",
+                                             waited_ms, thread_->name_));
+}
+
+Thread::~Thread() {
+  if (joinable_) {
+    int ret = pthread_detach(thread_);
+    CHECK_EQ(ret, 0);
+  }
+}
+
+void Thread::CallAtExit(const Closure& cb) {
+  CHECK_EQ(Thread::current_thread(), this);
+  exit_callbacks_.push_back(cb);
+}
+
+std::string Thread::ToString() const {
+  return Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid_, name_, category_);
+}
+
+Status Thread::StartThread(const std::string& category, const std::string& name,
+                           const ThreadFunctor& functor, uint64_t flags,
+                           scoped_refptr<Thread> *holder) {
+  TRACE_COUNTER_INCREMENT("threads_started", 1);
+  TRACE_COUNTER_SCOPE_LATENCY_US("thread_start_us");
+  const string log_prefix = Substitute("$0 ($1) ", name, category);
+  SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "starting thread");
+
+  // Temporary reference for the duration of this function.
+  scoped_refptr<Thread> t(new Thread(category, name, functor));
+
+  {
+    SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "creating pthread");
+    SCOPED_WATCH_STACK((flags & NO_STACK_WATCHDOG) ? 0 : 250);
+    int ret = pthread_create(&t->thread_, NULL, &Thread::SuperviseThread, t.get());
+    if (ret) {
+      return Status::RuntimeError("Could not create thread", strerror(ret), ret);
+    }
+  }
+
+  // The thread has been created and is now joinable.
+  //
+  // Why set this in the parent and not the child? Because only the parent
+  // (or someone communicating with the parent) can join, so joinable must
+  // be set before the parent returns.
+  t->joinable_ = true;
+
+  // Optional, and only set if the thread was successfully created.
+  if (holder) {
+    *holder = t;
+  }
+
+  // The tid_ member goes through the following states:
+  // 1  CHILD_WAITING_TID: the child has just been spawned and is waiting
+  //    for the parent to finish writing to caller state (i.e. 'holder').
+  // 2. PARENT_WAITING_TID: the parent has updated caller state and is now
+  //    waiting for the child to write the tid.
+  // 3. <value>: both the parent and the child are free to continue. If the
+  //    value is INVALID_TID, the child could not discover its tid.
+  Release_Store(&t->tid_, PARENT_WAITING_TID);
+  {
+    SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix,
+                                     "waiting for new thread to publish its TID");
+    int loop_count = 0;
+    while (Acquire_Load(&t->tid_) == PARENT_WAITING_TID) {
+      boost::detail::yield(loop_count++);
+    }
+  }
+
+  VLOG(2) << "Started thread " << t->tid()<< " - " << category << ":" << name;
+  return Status::OK();
+}
+
+void* Thread::SuperviseThread(void* arg) {
+  Thread* t = static_cast<Thread*>(arg);
+  int64_t system_tid = Thread::CurrentThreadId();
+  if (system_tid == -1) {
+    string error_msg = ErrnoToString(errno);
+    KLOG_EVERY_N(INFO, 100) << "Could not determine thread ID: " << error_msg;
+  }
+  string name = strings::Substitute("$0-$1", t->name(), system_tid);
+
+  // Take an additional reference to the thread manager, which we'll need below.
+  GoogleOnceInit(&once, &InitThreading);
+  ANNOTATE_IGNORE_SYNC_BEGIN();
+  shared_ptr<ThreadMgr> thread_mgr_ref = thread_manager;
+  ANNOTATE_IGNORE_SYNC_END();
+
+  // Set up the TLS.
+  //
+  // We could store a scoped_refptr in the TLS itself, but as its
+  // lifecycle is poorly defined, we'll use a bare pointer and take an
+  // additional reference on t out of band, in thread_ref.
+  scoped_refptr<Thread> thread_ref = t;
+  t->tls_ = t;
+
+  // Wait until the parent has updated all caller-visible state, then write
+  // the TID to 'tid_', thus completing the parent<-->child handshake.
+  int loop_count = 0;
+  while (Acquire_Load(&t->tid_) == CHILD_WAITING_TID) {
+    boost::detail::yield(loop_count++);
+  }
+  Release_Store(&t->tid_, system_tid);
+
+  thread_manager->SetThreadName(name, t->tid());
+  thread_manager->AddThread(pthread_self(), name, t->category(), t->tid());
+
+  // FinishThread() is guaranteed to run (even if functor_ throws an
+  // exception) because pthread_cleanup_push() creates a scoped object
+  // whose destructor invokes the provided callback.
+  pthread_cleanup_push(&Thread::FinishThread, t);
+  t->functor_();
+  pthread_cleanup_pop(true);
+
+  return NULL;
+}
+
+void Thread::FinishThread(void* arg) {
+  Thread* t = static_cast<Thread*>(arg);
+
+  for (Closure& c : t->exit_callbacks_) {
+    c.Run();
+  }
+
+  // We're here either because of the explicit pthread_cleanup_pop() in
+  // SuperviseThread() or through pthread_exit(). In either case,
+  // thread_manager is guaranteed to be live because thread_mgr_ref in
+  // SuperviseThread() is still live.
+  thread_manager->RemoveThread(pthread_self(), t->category());
+
+  // Signal any Joiner that we're done.
+  t->done_.CountDown();
+
+  VLOG(2) << "Ended thread " << t->tid() << " - "
+          << t->category() << ":" << t->name();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/thread.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/thread.h b/be/src/kudu/util/thread.h
new file mode 100644
index 0000000..46e2505
--- /dev/null
+++ b/be/src/kudu/util/thread.h
@@ -0,0 +1,362 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Copied from Impala and adapted to Kudu.
+
+#ifndef KUDU_UTIL_THREAD_H
+#define KUDU_UTIL_THREAD_H
+
+#include <pthread.h>
+#include <sys/syscall.h>
+#include <sys/types.h>
+
+#include <string>
+#include <vector>
+
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/async_util.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class MetricEntity;
+class Thread;
+class WebCallbackRegistry;
+
+// Utility to join on a thread, printing warning messages if it
+// takes too long. For example:
+//
+//   ThreadJoiner(&my_thread, "processing thread")
+//     .warn_after_ms(1000)
+//     .warn_every_ms(5000)
+//     .Join();
+//
+// TODO: would be nice to offer a way to use ptrace() or signals to
+// dump the stack trace of the thread we're trying to join on if it
+// gets stuck. But, after looking for 20 minutes or so, it seems
+// pretty complicated to get right.
+class ThreadJoiner {
+ public:
+  explicit ThreadJoiner(Thread* thread);
+
+  // Start emitting warnings after this many milliseconds.
+  //
+  // Default: 1000 ms.
+  ThreadJoiner& warn_after_ms(int ms);
+
+  // After the warnings after started, emit another warning at the
+  // given interval.
+  //
+  // Default: 1000 ms.
+  ThreadJoiner& warn_every_ms(int ms);
+
+  // If the thread has not stopped after this number of milliseconds, give up
+  // joining on it and return Status::Aborted.
+  //
+  // -1 (the default) means to wait forever trying to join.
+  ThreadJoiner& give_up_after_ms(int ms);
+
+  // Join the thread, subject to the above parameters. If the thread joining
+  // fails for any reason, returns RuntimeError. If it times out, returns
+  // Aborted.
+  Status Join();
+
+ private:
+  enum {
+    kDefaultWarnAfterMs = 1000,
+    kDefaultWarnEveryMs = 1000,
+    kDefaultGiveUpAfterMs = -1 // forever
+  };
+
+  Thread* thread_;
+
+  int warn_after_ms_;
+  int warn_every_ms_;
+  int give_up_after_ms_;
+
+  DISALLOW_COPY_AND_ASSIGN(ThreadJoiner);
+};
+
+// Thin wrapper around pthread that can register itself with the singleton ThreadMgr
+// (a private class implemented in thread.cc entirely, which tracks all live threads so
+// that they may be monitored via the debug webpages). This class has a limited subset of
+// boost::thread's API. Construction is almost the same, but clients must supply a
+// category and a name for each thread so that they can be identified in the debug web
+// UI. Otherwise, Join() is the only supported method from boost::thread.
+//
+// Each Thread object knows its operating system thread ID (TID), which can be used to
+// attach debuggers to specific threads, to retrieve resource-usage statistics from the
+// operating system, and to assign threads to resource control groups.
+//
+// Threads are shared objects, but in a degenerate way. They may only have
+// up to two referents: the caller that created the thread (parent), and
+// the thread itself (child). Moreover, the only two methods to mutate state
+// (Join() and the destructor) are constrained: the child may not Join() on
+// itself, and the destructor is only run when there's one referent left.
+// These constraints allow us to access thread internals without any locks.
+class Thread : public RefCountedThreadSafe<Thread> {
+ public:
+
+  // Flags passed to Thread::CreateWithFlags().
+  enum CreateFlags {
+    NO_FLAGS = 0,
+
+    // Disable the use of KernelStackWatchdog to detect and log slow
+    // thread creations. This is necessary when starting the kernel stack
+    // watchdog thread itself to avoid reentrancy.
+    NO_STACK_WATCHDOG = 1 << 0
+  };
+
+  // This constructor pattern mimics that in boost::thread. There is
+  // one constructor for each number of arguments that the thread
+  // function accepts. To extend the set of acceptable signatures, add
+  // another constructor with <class F, class A1.... class An>.
+  //
+  // In general:
+  //  - category: string identifying the thread category to which this thread belongs,
+  //    used for organising threads together on the debug UI.
+  //  - name: name of this thread. Will be appended with "-<thread-id>" to ensure
+  //    uniqueness.
+  //  - F - a method type that supports operator(), and the instance passed to the
+  //    constructor is executed immediately in a separate thread.
+  //  - A1...An - argument types whose instances are passed to f(...)
+  //  - holder - optional shared pointer to hold a reference to the created thread.
+  template <class F>
+  static Status CreateWithFlags(const std::string& category, const std::string& name,
+                                const F& f, uint64_t flags,
+                                scoped_refptr<Thread>* holder) {
+    return StartThread(category, name, f, flags, holder);
+
+  }
+  template <class F>
+  static Status Create(const std::string& category, const std::string& name, const F& f,
+                       scoped_refptr<Thread>* holder) {
+    return StartThread(category, name, f, NO_FLAGS, holder);
+  }
+
+  template <class F, class A1>
+  static Status Create(const std::string& category, const std::string& name, const F& f,
+                       const A1& a1, scoped_refptr<Thread>* holder) {
+    return StartThread(category, name, boost::bind(f, a1), NO_FLAGS, holder);
+  }
+
+  template <class F, class A1, class A2>
+  static Status Create(const std::string& category, const std::string& name, const F& f,
+                       const A1& a1, const A2& a2, scoped_refptr<Thread>* holder) {
+    return StartThread(category, name, boost::bind(f, a1, a2), NO_FLAGS, holder);
+  }
+
+  template <class F, class A1, class A2, class A3>
+  static Status Create(const std::string& category, const std::string& name, const F& f,
+                       const A1& a1, const A2& a2, const A3& a3, scoped_refptr<Thread>* holder) {
+    return StartThread(category, name, boost::bind(f, a1, a2, a3), NO_FLAGS, holder);
+  }
+
+  template <class F, class A1, class A2, class A3, class A4>
+  static Status Create(const std::string& category, const std::string& name, const F& f,
+                       const A1& a1, const A2& a2, const A3& a3, const A4& a4,
+                       scoped_refptr<Thread>* holder) {
+    return StartThread(category, name, boost::bind(f, a1, a2, a3, a4), NO_FLAGS, holder);
+  }
+
+  template <class F, class A1, class A2, class A3, class A4, class A5>
+  static Status Create(const std::string& category, const std::string& name, const F& f,
+                       const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5,
+                       scoped_refptr<Thread>* holder) {
+    return StartThread(category, name, boost::bind(f, a1, a2, a3, a4, a5), NO_FLAGS, holder);
+  }
+
+  template <class F, class A1, class A2, class A3, class A4, class A5, class A6>
+  static Status Create(const std::string& category, const std::string& name, const F& f,
+                       const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5,
+                       const A6& a6, scoped_refptr<Thread>* holder) {
+    return StartThread(category, name, boost::bind(f, a1, a2, a3, a4, a5, a6), NO_FLAGS, holder);
+  }
+
+  // Emulates boost::thread and detaches.
+  ~Thread();
+
+  // Blocks until this thread finishes execution. Once this method returns, the thread
+  // will be unregistered with the ThreadMgr and will not appear in the debug UI.
+  void Join() { ThreadJoiner(this).Join(); }
+
+  // Call the given Closure on the thread before it exits. The closures are executed
+  // in the order they are added.
+  //
+  // NOTE: This must only be called on the currently executing thread, to avoid having
+  // to reason about complicated races (eg registering a callback on an already-dead
+  // thread).
+  //
+  // This callback is guaranteed to be called except in the case of a process crash.
+  void CallAtExit(const Closure& cb);
+
+  // The thread ID assigned to this thread by the operating system. If the OS does not
+  // support retrieving the tid, returns Thread::INVALID_TID.
+  int64_t tid() const { return tid_; }
+
+  // Returns the thread's pthread ID.
+  pthread_t pthread_id() const { return thread_; }
+
+  const std::string& name() const { return name_; }
+  const std::string& category() const { return category_; }
+
+  // Return a string representation of the thread identifying information.
+  std::string ToString() const;
+
+  // The current thread of execution, or NULL if the current thread isn't a kudu::Thread.
+  // This call is signal-safe.
+  static Thread* current_thread() { return tls_; }
+
+  // Returns a unique, stable identifier for this thread. Note that this is a static
+  // method and thus can be used on any thread, including the main thread of the
+  // process.
+  //
+  // In general, this should be used when a value is required that is unique to
+  // a thread and must work on any thread including the main process thread.
+  //
+  // NOTE: this is _not_ the TID, but rather a unique value assigned by the
+  // thread implementation. So, this value should not be presented to the user
+  // in log messages, etc.
+  static int64_t UniqueThreadId() {
+#if defined(__linux__)
+    // This cast is a little bit ugly, but it is significantly faster than
+    // calling syscall(SYS_gettid). In particular, this speeds up some code
+    // paths in the tracing implementation.
+    return static_cast<int64_t>(pthread_self());
+#elif defined(__APPLE__)
+    uint64_t tid;
+    CHECK_EQ(0, pthread_threadid_np(NULL, &tid));
+    return tid;
+#else
+#error Unsupported platform
+#endif
+  }
+
+  // Returns the system thread ID (tid on Linux) for the current thread. Note
+  // that this is a static method and thus can be used from any thread,
+  // including the main thread of the process. This is in contrast to
+  // Thread::tid(), which only works on kudu::Threads.
+  //
+  // Thread::tid() will return the same value, but the value is cached in the
+  // Thread object, so will be faster to call.
+  //
+  // Thread::UniqueThreadId() (or Thread::tid()) should be preferred for
+  // performance sensistive code, however it is only guaranteed to return a
+  // unique and stable thread ID, not necessarily the system thread ID.
+  static int64_t CurrentThreadId() {
+#if defined(__linux__)
+    return syscall(SYS_gettid);
+#else
+    return UniqueThreadId();
+#endif
+  }
+
+ private:
+  friend class ThreadJoiner;
+
+  // The various special values for tid_ that describe the various steps
+  // in the parent<-->child handshake.
+  enum {
+    INVALID_TID = -1,
+    CHILD_WAITING_TID = -2,
+    PARENT_WAITING_TID = -3,
+  };
+
+  // Function object that wraps the user-supplied function to run in a separate thread.
+  typedef boost::function<void ()> ThreadFunctor;
+
+  Thread(std::string category, std::string name, ThreadFunctor functor)
+      : thread_(0),
+        category_(std::move(category)),
+        name_(std::move(name)),
+        tid_(CHILD_WAITING_TID),
+        functor_(std::move(functor)),
+        done_(1),
+        joinable_(false) {}
+
+  // Library-specific thread ID.
+  pthread_t thread_;
+
+  // Name and category for this thread.
+  const std::string category_;
+  const std::string name_;
+
+  // OS-specific thread ID. Once the constructor finishes StartThread(),
+  // guaranteed to be set either to a non-negative integer, or to INVALID_TID.
+  int64_t tid_;
+
+  // User function to be executed by this thread.
+  const ThreadFunctor functor_;
+
+  // Joiners wait on this latch to be notified if the thread is done.
+  //
+  // Note that Joiners must additionally pthread_join(), otherwise certain
+  // resources that callers expect to be destroyed (like TLS) may still be
+  // alive when a Joiner finishes.
+  CountDownLatch done_;
+
+  bool joinable_;
+
+  // Thread local pointer to the current thread of execution. Will be NULL if the current
+  // thread is not a Thread.
+  static __thread Thread* tls_;
+
+  std::vector<Closure> exit_callbacks_;
+
+  // Starts the thread running SuperviseThread(), and returns once that thread has
+  // initialised and its TID has been read. Waits for notification from the started
+  // thread that initialisation is complete before returning. On success, stores a
+  // reference to the thread in holder.
+  static Status StartThread(const std::string& category, const std::string& name,
+                            const ThreadFunctor& functor, uint64_t flags,
+                            scoped_refptr<Thread>* holder);
+
+  // Wrapper for the user-supplied function. Invoked from the new thread,
+  // with the Thread as its only argument. Executes functor_, but before
+  // doing so registers with the global ThreadMgr and reads the thread's
+  // system ID. After functor_ terminates, unregisters with the ThreadMgr.
+  // Always returns NULL.
+  //
+  // SuperviseThread() notifies StartThread() when thread initialisation is
+  // completed via the tid_, which is set to the new thread's system ID.
+  // By that point in time SuperviseThread() has also taken a reference to
+  // the Thread object, allowing it to safely refer to it even after the
+  // caller drops its reference.
+  //
+  // Additionally, StartThread() notifies SuperviseThread() when the actual
+  // Thread object has been assigned (SuperviseThread() is spinning during
+  // this time). Without this, the new thread may reference the actual
+  // Thread object before it has been assigned by StartThread(). See
+  // KUDU-11 for more details.
+  static void* SuperviseThread(void* arg);
+
+  // Invoked when the user-supplied function finishes or in the case of an
+  // abrupt exit (i.e. pthread_exit()). Cleans up after SuperviseThread().
+  static void FinishThread(void* arg);
+};
+
+// Registers /threadz with the debug webserver, and creates thread-tracking metrics under
+// the given entity. If 'web' is NULL, does not register the path handler.
+Status StartThreadInstrumentation(const scoped_refptr<MetricEntity>& server_metrics,
+                                  WebCallbackRegistry* web);
+} // namespace kudu
+
+#endif /* KUDU_UTIL_THREAD_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/thread_restrictions.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/thread_restrictions.cc b/be/src/kudu/util/thread_restrictions.cc
new file mode 100644
index 0000000..40372c1
--- /dev/null
+++ b/be/src/kudu/util/thread_restrictions.cc
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <gperftools/heap-checker.h>
+
+#include "kudu/util/debug/leakcheck_disabler.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/threadlocal.h"
+#include "kudu/util/thread_restrictions.h"
+
+#ifdef ENABLE_THREAD_RESTRICTIONS
+
+namespace kudu {
+
+namespace {
+
+struct LocalThreadRestrictions {
+  LocalThreadRestrictions()
+    : io_allowed(true),
+      wait_allowed(true),
+      singleton_allowed(true) {
+  }
+
+  bool io_allowed;
+  bool wait_allowed;
+  bool singleton_allowed;
+};
+
+LocalThreadRestrictions* LoadTLS() {
+  // Disable leak check. LSAN sometimes gets false positives on thread locals.
+  // See: https://github.com/google/sanitizers/issues/757
+  debug::ScopedLeakCheckDisabler d;
+  BLOCK_STATIC_THREAD_LOCAL(LocalThreadRestrictions, local_thread_restrictions);
+  return local_thread_restrictions;
+}
+
+} // anonymous namespace
+
+bool ThreadRestrictions::SetIOAllowed(bool allowed) {
+  bool previous_allowed = LoadTLS()->io_allowed;
+  LoadTLS()->io_allowed = allowed;
+  return previous_allowed;
+}
+
+void ThreadRestrictions::AssertIOAllowed() {
+  CHECK(LoadTLS()->io_allowed)
+    << "Function marked as IO-only was called from a thread that "
+    << "disallows IO!  If this thread really should be allowed to "
+    << "make IO calls, adjust the call to "
+    << "kudu::ThreadRestrictions::SetIOAllowed() in this thread's "
+    << "startup. "
+    << (Thread::current_thread() ? Thread::current_thread()->ToString() : "(not a kudu::Thread)");
+}
+
+bool ThreadRestrictions::SetWaitAllowed(bool allowed) {
+  bool previous_allowed = LoadTLS()->wait_allowed;
+  LoadTLS()->wait_allowed = allowed;
+  return previous_allowed;
+}
+
+void ThreadRestrictions::AssertWaitAllowed() {
+  CHECK(LoadTLS()->wait_allowed)
+    << "Waiting is not allowed to be used on this thread to prevent "
+    << "server-wide latency aberrations and deadlocks. "
+    << (Thread::current_thread() ? Thread::current_thread()->ToString() : "(not a kudu::Thread)");
+}
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/thread_restrictions.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/thread_restrictions.h b/be/src/kudu/util/thread_restrictions.h
new file mode 100644
index 0000000..23f0cd5
--- /dev/null
+++ b/be/src/kudu/util/thread_restrictions.h
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Some portions: Copyright (c) 2012, The Chromium Authors.
+#ifndef KUDU_UTIL_THREAD_RESTRICTIONS_H
+#define KUDU_UTIL_THREAD_RESTRICTIONS_H
+
+#include "kudu/gutil/macros.h"
+
+#ifndef NDEBUG
+#define ENABLE_THREAD_RESTRICTIONS 1
+#endif
+
+namespace kudu {
+
+// Certain behavior is disallowed on certain threads.  ThreadRestrictions helps
+// enforce these rules.  Examples of such rules:
+//
+// * Do not do blocking IO
+// * Do not wait on synchronization variables or sleep
+//
+// Here's more about how the protection works:
+//
+// 1) If a thread should not be allowed to make IO calls, mark it:
+//      ThreadRestrictions::SetIOAllowed(false);
+//    By default, threads *are* allowed to make IO calls.
+//    In particular, threads like RPC reactors should never do blocking IO
+//    because it may stall other unrelated requests.
+//
+// 2) If a function makes a call that will go out to disk, check whether the
+//    current thread is allowed:
+//      ThreadRestrictions::AssertIOAllowed();
+//
+//
+// Style tip: where should you put AssertIOAllowed checks?  It's best
+// if you put them as close to the disk access as possible, at the
+// lowest level.  This rule is simple to follow and helps catch all
+// callers.  For example, if your function GoDoSomeBlockingDiskCall()
+// only calls other functions in Kudu and doesn't access the underlying
+// disk, you should go add the AssertIOAllowed checks in the helper functions.
+class ThreadRestrictions {
+ public:
+  // Constructing a ScopedAllowIO temporarily allows IO for the current
+  // thread.  Doing this is almost certainly always incorrect, but sometimes
+  // it makes more sense to allow an exception and file a bug in the backlog
+  // to improve it later.
+  class ScopedAllowIO {
+   public:
+    ScopedAllowIO() { previous_value_ = SetIOAllowed(true); }
+    ~ScopedAllowIO() { SetIOAllowed(previous_value_); }
+   private:
+    // Whether IO is allowed when the ScopedAllowIO was constructed.
+    bool previous_value_;
+
+    DISALLOW_COPY_AND_ASSIGN(ScopedAllowIO);
+  };
+
+  // Constructing a ScopedAllowWait temporarily allows waiting on the current
+  // thread.  Doing this is almost always incorrect: consider carefully whether
+  // you should instead be deferring work to a different thread.
+  class ScopedAllowWait {
+   public:
+    ScopedAllowWait() { previous_value_ = SetWaitAllowed(true); }
+    ~ScopedAllowWait() { SetWaitAllowed(previous_value_); }
+   private:
+    // Whether singleton use is allowed when the ScopedAllowWait was
+    // constructed.
+    bool previous_value_;
+
+    DISALLOW_COPY_AND_ASSIGN(ScopedAllowWait);
+  };
+
+
+#if ENABLE_THREAD_RESTRICTIONS
+  // Set whether the current thread to make IO calls.
+  // Threads start out in the *allowed* state.
+  // Returns the previous value.
+  static bool SetIOAllowed(bool allowed);
+
+  // Check whether the current thread is allowed to make IO calls,
+  // and FATALs if not.  See the block comment above the class for
+  // a discussion of where to add these checks.
+  static void AssertIOAllowed();
+
+  // Set whether the current thread may wait/block.  Returns the previous
+  // value.
+  static bool SetWaitAllowed(bool allowed);
+
+  // Check whether the current thread is allowed to wait/block.
+  // FATALs if not.
+  static void AssertWaitAllowed();
+#else
+  // Inline the empty definitions of these functions so that they can be
+  // compiled out.
+  static bool SetIOAllowed(bool allowed) { return true; }
+  static void AssertIOAllowed() {}
+  static bool SetWaitAllowed(bool allowed) { return true; }
+  static void AssertWaitAllowed() {}
+#endif
+
+ private:
+  DISALLOW_IMPLICIT_CONSTRUCTORS(ThreadRestrictions);
+};
+
+} // namespace kudu
+
+#endif /* KUDU_UTIL_THREAD_RESTRICTIONS_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/threadlocal.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/threadlocal.cc b/be/src/kudu/util/threadlocal.cc
new file mode 100644
index 0000000..11e8e33
--- /dev/null
+++ b/be/src/kudu/util/threadlocal.cc
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/util/threadlocal.h"
+
+#include <pthread.h>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/once.h"
+#include "kudu/util/errno.h"
+
+namespace kudu {
+namespace threadlocal {
+namespace internal {
+
+// One key used by the entire process to attach destructors on thread exit.
+static pthread_key_t destructors_key;
+
+// The above key must only be initialized once per process.
+static GoogleOnceType once = GOOGLE_ONCE_INIT;
+
+// Call all the destructors associated with all THREAD_LOCAL instances in this
+// thread.
+static void InvokeDestructors(void* t) {
+  PerThreadDestructorList* d = reinterpret_cast<PerThreadDestructorList*>(t);
+  while (d != nullptr) {
+    d->destructor(d->arg);
+    PerThreadDestructorList* next = d->next;
+    delete d;
+    d = next;
+  }
+}
+
+// This key must be initialized only once.
+static void CreateKey() {
+  int ret = pthread_key_create(&destructors_key, &InvokeDestructors);
+  // Linux supports up to 1024 keys, we will use only one for all thread locals.
+  CHECK_EQ(0, ret) << "pthread_key_create() failed, cannot add destructor to thread: "
+      << "error " << ret << ": " << ErrnoToString(ret);
+}
+
+// Adds a destructor to the list.
+void AddDestructor(PerThreadDestructorList* p) {
+  GoogleOnceInit(&once, &CreateKey);
+
+  // Returns NULL if nothing is set yet.
+  p->next = reinterpret_cast<PerThreadDestructorList*>(pthread_getspecific(destructors_key));
+  int ret = pthread_setspecific(destructors_key, p);
+  // The only time this check should fail is if we are out of memory, or if
+  // somehow key creation failed, which should be caught by the above CHECK.
+  CHECK_EQ(0, ret) << "pthread_setspecific() failed, cannot update destructor list: "
+      << "error " << ret << ": " << ErrnoToString(ret);
+}
+
+} // namespace internal
+} // namespace threadlocal
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/threadlocal.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/threadlocal.h b/be/src/kudu/util/threadlocal.h
new file mode 100644
index 0000000..2380487
--- /dev/null
+++ b/be/src/kudu/util/threadlocal.h
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_THREADLOCAL_H_
+#define KUDU_UTIL_THREADLOCAL_H_
+
+// Block-scoped static thread local implementation.
+//
+// Usage is similar to a C++11 thread_local. The BLOCK_STATIC_THREAD_LOCAL macro
+// defines a thread-local pointer to the specified type, which is lazily
+// instantiated by any thread entering the block for the first time. The
+// constructor for the type T is invoked at macro execution time, as expected,
+// and its destructor is invoked when the corresponding thread's Runnable
+// returns, or when the thread exits.
+//
+// Inspired by Poco <http://pocoproject.org/docs/Poco.ThreadLocal.html>,
+// Andrew Tomazos <http://stackoverflow.com/questions/12049684/>, and
+// the C++11 thread_local API.
+//
+// Example usage:
+//
+// // Invokes a 3-arg constructor on SomeClass:
+// BLOCK_STATIC_THREAD_LOCAL(SomeClass, instance, arg1, arg2, arg3);
+// instance->DoSomething();
+//
+#define BLOCK_STATIC_THREAD_LOCAL(T, t, ...)                                    \
+static __thread T* t;                                                           \
+do {                                                                            \
+  if (PREDICT_FALSE(t == NULL)) {                                               \
+    t = new T(__VA_ARGS__);                                                     \
+    threadlocal::internal::PerThreadDestructorList* dtor_list =                 \
+        new threadlocal::internal::PerThreadDestructorList();                   \
+    dtor_list->destructor = threadlocal::internal::Destroy<T>;                  \
+    dtor_list->arg = t;                                                         \
+    threadlocal::internal::AddDestructor(dtor_list);                            \
+  }                                                                             \
+} while (false)
+
+// Class-scoped static thread local implementation.
+//
+// Very similar in implementation to the above block-scoped version, but
+// requires a bit more syntax and vigilance to use properly.
+//
+// DECLARE_STATIC_THREAD_LOCAL(Type, instance_var_) must be placed in the
+// class header, as usual for variable declarations.
+//
+// Because these variables are static, they must also be defined in the impl
+// file with DEFINE_STATIC_THREAD_LOCAL(Type, Classname, instance_var_),
+// which is very much like defining any static member, i.e. int Foo::member_.
+//
+// Finally, each thread must initialize the instance before using it by calling
+// INIT_STATIC_THREAD_LOCAL(Type, instance_var_, ...). This is a cheap
+// call, and may be invoked at the top of any method which may reference a
+// thread-local variable.
+//
+// Due to all of these requirements, you should probably declare TLS members
+// as private.
+//
+// Example usage:
+//
+// // foo.h
+// #include "kudu/utils/file.h"
+// class Foo {
+//  public:
+//   void DoSomething(std::string s);
+//  private:
+//   DECLARE_STATIC_THREAD_LOCAL(utils::File, file_);
+// };
+//
+// // foo.cc
+// #include "kudu/foo.h"
+// DEFINE_STATIC_THREAD_LOCAL(utils::File, Foo, file_);
+// void Foo::WriteToFile(std::string s) {
+//   // Call constructor if necessary.
+//   INIT_STATIC_THREAD_LOCAL(utils::File, file_, "/tmp/file_location.txt");
+//   file_->Write(s);
+// }
+
+// Goes in the class declaration (usually in a header file).
+// dtor must be destructed _after_ t, so it gets defined first.
+// Uses a mangled variable name for dtor since it must also be a member of the
+// class.
+#define DECLARE_STATIC_THREAD_LOCAL(T, t)                                                     \
+static __thread T* t
+
+// You must also define the instance in the .cc file.
+#define DEFINE_STATIC_THREAD_LOCAL(T, Class, t)                                               \
+__thread T* Class::t
+
+// Must be invoked at least once by each thread that will access t.
+#define INIT_STATIC_THREAD_LOCAL(T, t, ...)                                       \
+do {                                                                              \
+  if (PREDICT_FALSE(t == NULL)) {                                                 \
+    t = new T(__VA_ARGS__);                                                       \
+    threadlocal::internal::PerThreadDestructorList* dtor_list =                   \
+        new threadlocal::internal::PerThreadDestructorList();                     \
+    dtor_list->destructor = threadlocal::internal::Destroy<T>;                    \
+    dtor_list->arg = t;                                                           \
+    threadlocal::internal::AddDestructor(dtor_list);                              \
+  }                                                                               \
+} while (false)
+
+// Internal implementation below.
+
+namespace kudu {
+namespace threadlocal {
+namespace internal {
+
+// List of destructors for all thread locals instantiated on a given thread.
+struct PerThreadDestructorList {
+  void (*destructor)(void*);
+  void* arg;
+  PerThreadDestructorList* next;
+};
+
+// Add a destructor to the list.
+void AddDestructor(PerThreadDestructorList* p);
+
+// Destroy the passed object of type T.
+template<class T>
+static void Destroy(void* t) {
+  // With tcmalloc, this should be pretty cheap (same thread as new).
+  delete reinterpret_cast<T*>(t);
+}
+
+} // namespace internal
+} // namespace threadlocal
+} // namespace kudu
+
+#endif // KUDU_UTIL_THREADLOCAL_H_

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/threadlocal_cache.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/threadlocal_cache.h b/be/src/kudu/util/threadlocal_cache.h
new file mode 100644
index 0000000..e9ab3c2
--- /dev/null
+++ b/be/src/kudu/util/threadlocal_cache.h
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "kudu/util/threadlocal.h"
+
+#include <boost/optional/optional.hpp>
+#include <array>
+#include <memory>
+#include <utility>
+
+namespace kudu {
+
+// A small thread-local cache for arbitrary objects.
+//
+// This can be used as a contention-free "lookaside" type cache for frequently-accessed
+// objects to avoid having to go to a less-efficient centralized cache.
+//
+// 'Key' must be copyable, and comparable using operator==().
+// 'T' has no particular requirements.
+template<class Key, class T>
+class ThreadLocalCache {
+ public:
+  // The number of entries in the cache.
+  // NOTE: this should always be a power of two for good performance, so that the
+  // compiler can optimize the modulo operations into bit-mask operations.
+  static constexpr int kItemCapacity = 4;
+
+  // Look up a key in the cache. Returns either the existing entry with this key,
+  // or nullptr if no entry matched.
+  T* Lookup(const Key& key) {
+    // Our cache is so small that a linear search is likely to be more efficient than
+    // any kind of actual hashing. We always start the search at wherever we most
+    // recently found a hit.
+    for (int i = 0; i < kItemCapacity; i++) {
+      int idx = (last_hit_ + i) % kItemCapacity;
+      auto& p = cache_[idx];
+      if (p.first == key) {
+        last_hit_ = idx;
+        return p.second.get_ptr();
+      }
+    }
+    return nullptr;
+  }
+
+  // Insert a new entry into the cache. If the cache is full (as it usually is in the
+  // steady state), this replaces one of the existing entries. The 'args' are forwarded
+  // to T's constructor.
+  //
+  // NOTE: entries returned by a previous call to Lookup() may possibly be invalidated
+  // by this function.
+  template<typename ... Args>
+  T* EmplaceNew(const Key& key, Args&&... args) {
+    auto& p = cache_[next_slot_++ % kItemCapacity];
+    p.second.emplace(std::forward<Args>(args)...);
+    p.first = key;
+    return p.second.get_ptr();
+  }
+
+  // Get the the cache instance for this thread, creating it if it has not yet been
+  // created.
+  //
+  // The instance is automatically deleted and any cached items destructed when the
+  // thread exits.
+  static ThreadLocalCache* GetInstance() {
+    INIT_STATIC_THREAD_LOCAL(ThreadLocalCache, tl_instance_);
+    return tl_instance_;
+  }
+
+ private:
+  using EntryPair = std::pair<Key, boost::optional<T>>;
+  std::array<EntryPair, kItemCapacity> cache_;
+
+  // The next slot that we will write into. We always modulo this by the capacity
+  // before use.
+  uint8_t next_slot_ = 0;
+  // The slot where we last got a cache hit, so we can start our search at the same
+  // spot, optimizing for the case of repeated lookups of the same hot element.
+  uint8_t last_hit_ = 0;
+
+  static_assert(kItemCapacity <= 1 << (sizeof(next_slot_) * 8),
+                "next_slot_ must be large enough for capacity");
+  static_assert(kItemCapacity <= 1 << (sizeof(last_hit_) * 8),
+                "last_hit_ must be large enough for capacity");
+
+  DECLARE_STATIC_THREAD_LOCAL(ThreadLocalCache, tl_instance_);
+};
+
+// Define the thread-local storage for the ThreadLocalCache template.
+// We can't use DEFINE_STATIC_THREAD_LOCAL here because the commas in the
+// template arguments confuse the C preprocessor.
+template<class K, class T>
+__thread ThreadLocalCache<K,T>* ThreadLocalCache<K,T>::tl_instance_;
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/threadpool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/threadpool-test.cc b/be/src/kudu/util/threadpool-test.cc
new file mode 100644
index 0000000..6bd5826
--- /dev/null
+++ b/be/src/kudu/util/threadpool-test.cc
@@ -0,0 +1,367 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <string>
+
+#include <boost/bind.hpp>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/bind.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/promise.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/threadpool.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/trace.h"
+
+using std::shared_ptr;
+
+namespace kudu {
+
+static const char* kDefaultPoolName = "test";
+
+class ThreadPoolTest : public KuduTest {
+ public:
+
+  virtual void SetUp() override {
+    KuduTest::SetUp();
+    ASSERT_OK(ThreadPoolBuilder(kDefaultPoolName).Build(&pool_));
+  }
+
+  Status RebuildPoolWithBuilder(const ThreadPoolBuilder& builder) {
+    return builder.Build(&pool_);
+  }
+
+  Status RebuildPoolWithMinMax(int min_threads, int max_threads) {
+    return ThreadPoolBuilder(kDefaultPoolName)
+        .set_min_threads(min_threads)
+        .set_max_threads(max_threads)
+        .Build(&pool_);
+  }
+
+ protected:
+  gscoped_ptr<ThreadPool> pool_;
+};
+
+TEST_F(ThreadPoolTest, TestNoTaskOpenClose) {
+  ASSERT_OK(RebuildPoolWithMinMax(4, 4));
+  pool_->Shutdown();
+}
+
+static void SimpleTaskMethod(int n, Atomic32 *counter) {
+  while (n--) {
+    base::subtle::NoBarrier_AtomicIncrement(counter, 1);
+    boost::detail::yield(n);
+  }
+}
+
+class SimpleTask : public Runnable {
+ public:
+  SimpleTask(int n, Atomic32 *counter)
+    : n_(n), counter_(counter) {
+  }
+
+  void Run() OVERRIDE {
+    SimpleTaskMethod(n_, counter_);
+  }
+
+ private:
+  int n_;
+  Atomic32 *counter_;
+};
+
+TEST_F(ThreadPoolTest, TestSimpleTasks) {
+  ASSERT_OK(RebuildPoolWithMinMax(4, 4));
+
+  Atomic32 counter(0);
+  std::shared_ptr<Runnable> task(new SimpleTask(15, &counter));
+
+  ASSERT_OK(pool_->SubmitFunc(boost::bind(&SimpleTaskMethod, 10, &counter)));
+  ASSERT_OK(pool_->Submit(task));
+  ASSERT_OK(pool_->SubmitFunc(boost::bind(&SimpleTaskMethod, 20, &counter)));
+  ASSERT_OK(pool_->Submit(task));
+  ASSERT_OK(pool_->SubmitClosure(Bind(&SimpleTaskMethod, 123, &counter)));
+  pool_->Wait();
+  ASSERT_EQ(10 + 15 + 20 + 15 + 123, base::subtle::NoBarrier_Load(&counter));
+  pool_->Shutdown();
+}
+
+static void IssueTraceStatement() {
+  TRACE("hello from task");
+}
+
+// Test that the thread-local trace is propagated to tasks
+// submitted to the threadpool.
+TEST_F(ThreadPoolTest, TestTracePropagation) {
+  ASSERT_OK(RebuildPoolWithMinMax(1, 1));
+
+  scoped_refptr<Trace> t(new Trace);
+  {
+    ADOPT_TRACE(t.get());
+    ASSERT_OK(pool_->SubmitFunc(&IssueTraceStatement));
+  }
+  pool_->Wait();
+  ASSERT_STR_CONTAINS(t->DumpToString(), "hello from task");
+}
+
+TEST_F(ThreadPoolTest, TestSubmitAfterShutdown) {
+  ASSERT_OK(RebuildPoolWithMinMax(1, 1));
+  pool_->Shutdown();
+  Status s = pool_->SubmitFunc(&IssueTraceStatement);
+  ASSERT_EQ("Service unavailable: The pool has been shut down.",
+            s.ToString());
+}
+
+class SlowTask : public Runnable {
+ public:
+  explicit SlowTask(CountDownLatch* latch)
+    : latch_(latch) {
+  }
+
+  void Run() OVERRIDE {
+    latch_->Wait();
+  }
+
+ private:
+  CountDownLatch* latch_;
+};
+
+TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_min_threads(0)
+                                   .set_max_threads(3)
+                                   .set_idle_timeout(MonoDelta::FromMilliseconds(1))));
+
+  // There are no threads to start with.
+  ASSERT_TRUE(pool_->num_threads_ == 0);
+  // We get up to 3 threads when submitting work.
+  CountDownLatch latch(1);
+  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  ASSERT_EQ(2, pool_->num_threads_);
+  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  ASSERT_EQ(3, pool_->num_threads_);
+  // The 4th piece of work gets queued.
+  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  ASSERT_EQ(3, pool_->num_threads_);
+  // Finish all work
+  latch.CountDown();
+  pool_->Wait();
+  ASSERT_EQ(0, pool_->active_threads_);
+  pool_->Shutdown();
+  ASSERT_EQ(0, pool_->num_threads_);
+}
+
+// Regression test for a bug where a task is submitted exactly
+// as a thread is about to exit. Previously this could hang forever.
+TEST_F(ThreadPoolTest, TestRace) {
+  alarm(60);
+  auto cleanup = MakeScopedCleanup([]() {
+    alarm(0); // Disable alarm on test exit.
+  });
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_min_threads(0)
+                                   .set_max_threads(1)
+                                   .set_idle_timeout(MonoDelta::FromMicroseconds(1))));
+
+  for (int i = 0; i < 500; i++) {
+    CountDownLatch l(1);
+    ASSERT_OK(pool_->SubmitFunc(boost::bind(&CountDownLatch::CountDown, &l)));
+    l.Wait();
+    // Sleeping a different amount in each iteration makes it more likely to hit
+    // the bug.
+    SleepFor(MonoDelta::FromMicroseconds(i));
+  }
+}
+
+TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) {
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_min_threads(1)
+                                   .set_max_threads(4)
+                                   .set_idle_timeout(MonoDelta::FromMilliseconds(1))));
+
+  // There is 1 thread to start with.
+  ASSERT_EQ(1, pool_->num_threads_);
+  // We get up to 4 threads when submitting work.
+  CountDownLatch latch(1);
+  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  ASSERT_EQ(1, pool_->num_threads_);
+  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  ASSERT_EQ(2, pool_->num_threads_);
+  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  ASSERT_EQ(3, pool_->num_threads_);
+  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  ASSERT_EQ(4, pool_->num_threads_);
+  // The 5th piece of work gets queued.
+  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  ASSERT_EQ(4, pool_->num_threads_);
+  // Finish all work
+  latch.CountDown();
+  pool_->Wait();
+  ASSERT_EQ(0, pool_->active_threads_);
+  pool_->Shutdown();
+  ASSERT_EQ(0, pool_->num_threads_);
+}
+
+TEST_F(ThreadPoolTest, TestMaxQueueSize) {
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_min_threads(1)
+                                   .set_max_threads(1)
+                                   .set_max_queue_size(1)));
+
+  CountDownLatch latch(1);
+  // We will be able to submit two tasks: one for max_threads == 1 and one for
+  // max_queue_size == 1.
+  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  Status s = pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch)));
+  CHECK(s.IsServiceUnavailable()) << "Expected failure due to queue blowout:" << s.ToString();
+  latch.CountDown();
+  pool_->Wait();
+  pool_->Shutdown();
+}
+
+// Test that when we specify a zero-sized queue, the maximum number of threads
+// running is used for enforcement.
+TEST_F(ThreadPoolTest, TestZeroQueueSize) {
+  const int kMaxThreads = 4;
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_max_queue_size(0)
+                                   .set_max_threads(kMaxThreads)));
+
+  CountDownLatch latch(1);
+  for (int i = 0; i < kMaxThreads; i++) {
+    ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  }
+  Status s = pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch)));
+  ASSERT_TRUE(s.IsServiceUnavailable()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Thread pool is at capacity");
+  latch.CountDown();
+  pool_->Wait();
+  pool_->Shutdown();
+}
+
+// Test that setting a promise from another thread yields
+// a value on the current thread.
+TEST_F(ThreadPoolTest, TestPromises) {
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_min_threads(1)
+                                   .set_max_threads(1)
+                                   .set_max_queue_size(1)));
+
+  Promise<int> my_promise;
+  ASSERT_OK(pool_->SubmitClosure(
+                     Bind(&Promise<int>::Set, Unretained(&my_promise), 5)));
+  ASSERT_EQ(5, my_promise.Get());
+  pool_->Shutdown();
+}
+
+METRIC_DEFINE_entity(test_entity);
+METRIC_DEFINE_histogram(test_entity, queue_length, "queue length",
+                        MetricUnit::kTasks, "queue length", 1000, 1);
+
+METRIC_DEFINE_histogram(test_entity, queue_time, "queue time",
+                        MetricUnit::kMicroseconds, "queue time", 1000000, 1);
+
+METRIC_DEFINE_histogram(test_entity, run_time, "run time",
+                        MetricUnit::kMicroseconds, "run time", 1000, 1);
+
+TEST_F(ThreadPoolTest, TestMetrics) {
+  MetricRegistry registry;
+  scoped_refptr<MetricEntity> entity = METRIC_ENTITY_test_entity.Instantiate(
+      &registry, "test entity");
+  ASSERT_OK(RebuildPoolWithMinMax(1, 1));
+
+  // Enable metrics for the thread pool.
+  scoped_refptr<Histogram> queue_length = METRIC_queue_length.Instantiate(entity);
+  scoped_refptr<Histogram> queue_time = METRIC_queue_time.Instantiate(entity);
+  scoped_refptr<Histogram> run_time = METRIC_run_time.Instantiate(entity);
+  pool_->SetQueueLengthHistogram(queue_length);
+  pool_->SetQueueTimeMicrosHistogram(queue_time);
+  pool_->SetRunTimeMicrosHistogram(run_time);
+
+  int kNumItems = 500;
+  for (int i = 0; i < kNumItems; i++) {
+    ASSERT_OK(pool_->SubmitFunc(boost::bind(&usleep, i)));
+  }
+
+  pool_->Wait();
+
+  // Check that all histograms were incremented once per submitted item.
+  ASSERT_EQ(kNumItems, queue_length->TotalCount());
+  ASSERT_EQ(kNumItems, queue_time->TotalCount());
+  ASSERT_EQ(kNumItems, run_time->TotalCount());
+}
+
+// Test that a thread pool will crash if asked to run its own blocking
+// functions in a pool thread.
+//
+// In a multi-threaded application, TSAN is unsafe to use following a fork().
+// After a fork(), TSAN will:
+// 1. Disable verification, expecting an exec() soon anyway, and
+// 2. Die on future thread creation.
+// For some reason, this test triggers behavior #2. We could disable it with
+// the TSAN option die_after_fork=0, but this can (supposedly) lead to
+// deadlocks, so we'll disable the entire test instead.
+#ifndef THREAD_SANITIZER
+TEST_F(ThreadPoolTest, TestDeadlocks) {
+  const char* death_msg = "called pool function that would result in deadlock";
+  ASSERT_DEATH({
+    ASSERT_OK(RebuildPoolWithMinMax(1, 1));
+    ASSERT_OK(pool_->SubmitClosure(
+        Bind(&ThreadPool::Shutdown, Unretained(pool_.get()))));
+    pool_->Wait();
+  }, death_msg);
+
+  ASSERT_DEATH({
+    ASSERT_OK(RebuildPoolWithMinMax(1, 1));
+    ASSERT_OK(pool_->SubmitClosure(
+        Bind(&ThreadPool::Wait, Unretained(pool_.get()))));
+    pool_->Wait();
+  }, death_msg);
+}
+#endif
+
+class SlowDestructorRunnable : public Runnable {
+ public:
+  void Run() override {}
+
+  virtual ~SlowDestructorRunnable() {
+    SleepFor(MonoDelta::FromMilliseconds(100));
+  }
+};
+
+// Test that if a tasks's destructor is slow, it doesn't cause serialization of the tasks
+// in the queue.
+TEST_F(ThreadPoolTest, TestSlowDestructor) {
+  ASSERT_OK(RebuildPoolWithMinMax(1, 20));
+  MonoTime start = MonoTime::Now();
+  for (int i = 0; i < 100; i++) {
+    shared_ptr<Runnable> task(new SlowDestructorRunnable());
+    ASSERT_OK(pool_->Submit(std::move(task)));
+  }
+  pool_->Wait();
+  ASSERT_LT((MonoTime::Now() - start).ToSeconds(), 5);
+}
+
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/threadpool.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/threadpool.cc b/be/src/kudu/util/threadpool.cc
new file mode 100644
index 0000000..b3f4ddf
--- /dev/null
+++ b/be/src/kudu/util/threadpool.cc
@@ -0,0 +1,410 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/threadpool.h"
+
+#include <boost/function.hpp>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <limits>
+#include <string>
+
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/trace.h"
+
+namespace kudu {
+
+using strings::Substitute;
+
+////////////////////////////////////////////////////////
+// FunctionRunnable
+////////////////////////////////////////////////////////
+
+class FunctionRunnable : public Runnable {
+ public:
+  explicit FunctionRunnable(boost::function<void()> func) : func_(std::move(func)) {}
+
+  void Run() OVERRIDE {
+    func_();
+  }
+
+ private:
+  boost::function<void()> func_;
+};
+
+////////////////////////////////////////////////////////
+// ThreadPoolBuilder
+////////////////////////////////////////////////////////
+
+ThreadPoolBuilder::ThreadPoolBuilder(std::string name)
+    : name_(std::move(name)),
+      min_threads_(0),
+      max_threads_(base::NumCPUs()),
+      max_queue_size_(std::numeric_limits<int>::max()),
+      idle_timeout_(MonoDelta::FromMilliseconds(500)) {}
+
+ThreadPoolBuilder& ThreadPoolBuilder::set_trace_metric_prefix(
+    const std::string& prefix) {
+  trace_metric_prefix_ = prefix;
+  return *this;
+}
+
+ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) {
+  CHECK_GE(min_threads, 0);
+  min_threads_ = min_threads;
+  return *this;
+}
+
+ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) {
+  CHECK_GT(max_threads, 0);
+  max_threads_ = max_threads;
+  return *this;
+}
+
+ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) {
+  max_queue_size_ = max_queue_size;
+  return *this;
+}
+
+ThreadPoolBuilder& ThreadPoolBuilder::set_idle_timeout(const MonoDelta& idle_timeout) {
+  idle_timeout_ = idle_timeout;
+  return *this;
+}
+
+Status ThreadPoolBuilder::Build(gscoped_ptr<ThreadPool>* pool) const {
+  pool->reset(new ThreadPool(*this));
+  RETURN_NOT_OK((*pool)->Init());
+  return Status::OK();
+}
+
+////////////////////////////////////////////////////////
+// ThreadPool
+////////////////////////////////////////////////////////
+
+ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
+  : name_(builder.name_),
+    min_threads_(builder.min_threads_),
+    max_threads_(builder.max_threads_),
+    max_queue_size_(builder.max_queue_size_),
+    idle_timeout_(builder.idle_timeout_),
+    pool_status_(Status::Uninitialized("The pool was not initialized.")),
+    idle_cond_(&lock_),
+    no_threads_cond_(&lock_),
+    not_empty_(&lock_),
+    num_threads_(0),
+    active_threads_(0),
+    queue_size_(0) {
+
+  string prefix = !builder.trace_metric_prefix_.empty() ?
+      builder.trace_metric_prefix_ : builder.name_;
+
+  queue_time_trace_metric_name_ = TraceMetrics::InternName(
+      prefix + ".queue_time_us");
+  run_wall_time_trace_metric_name_ = TraceMetrics::InternName(
+      prefix + ".run_wall_time_us");
+  run_cpu_time_trace_metric_name_ = TraceMetrics::InternName(
+      prefix + ".run_cpu_time_us");
+}
+
+ThreadPool::~ThreadPool() {
+  Shutdown();
+}
+
+Status ThreadPool::Init() {
+  MutexLock unique_lock(lock_);
+  if (!pool_status_.IsUninitialized()) {
+    return Status::NotSupported("The thread pool is already initialized");
+  }
+  pool_status_ = Status::OK();
+  for (int i = 0; i < min_threads_; i++) {
+    Status status = CreateThreadUnlocked();
+    if (!status.ok()) {
+      Shutdown();
+      return status;
+    }
+  }
+  return Status::OK();
+}
+
+void ThreadPool::Shutdown() {
+  MutexLock unique_lock(lock_);
+  CheckNotPoolThreadUnlocked();
+
+  pool_status_ = Status::ServiceUnavailable("The pool has been shut down.");
+
+  // Clear the queue_ member under the lock, but defer the releasing
+  // of the entries outside the lock, in case there are concurrent threads
+  // wanting to access the ThreadPool. The task's destructors may acquire
+  // locks, etc, so this also prevents lock inversions.
+  auto to_release = std::move(queue_);
+  queue_.clear();
+  queue_size_ = 0;
+  not_empty_.Broadcast();
+
+  // The Runnable doesn't have Abort() so we must wait
+  // and hopefully the abort is done outside before calling Shutdown().
+  while (num_threads_ > 0) {
+    no_threads_cond_.Wait();
+  }
+
+  // Finally release the tasks that were in the queue, outside the lock.
+  unique_lock.Unlock();
+  for (QueueEntry& e : to_release) {
+    if (e.trace) {
+      e.trace->Release();
+    }
+  }
+}
+
+Status ThreadPool::SubmitClosure(const Closure& task) {
+  // TODO: once all uses of boost::bind-based tasks are dead, implement this
+  // in a more straight-forward fashion.
+  return SubmitFunc(boost::bind(&Closure::Run, task));
+}
+
+Status ThreadPool::SubmitFunc(boost::function<void()> func) {
+  return Submit(std::shared_ptr<Runnable>(new FunctionRunnable(std::move(func))));
+}
+
+Status ThreadPool::Submit(std::shared_ptr<Runnable> task) {
+  MonoTime submit_time = MonoTime::Now();
+
+  MutexLock guard(lock_);
+  if (PREDICT_FALSE(!pool_status_.ok())) {
+    return pool_status_;
+  }
+
+  // Size limit check.
+  int64_t capacity_remaining = static_cast<int64_t>(max_threads_) - active_threads_ +
+                               static_cast<int64_t>(max_queue_size_) - queue_size_;
+  if (capacity_remaining < 1) {
+    return Status::ServiceUnavailable(
+        Substitute("Thread pool is at capacity ($0/$1 tasks running, $2/$3 tasks queued)",
+                   num_threads_, max_threads_, queue_size_, max_queue_size_));
+  }
+
+  // Should we create another thread?
+  // We assume that each current inactive thread will grab one item from the
+  // queue.  If it seems like we'll need another thread, we create one.
+  // In theory, a currently active thread could finish immediately after this
+  // calculation.  This would mean we created a thread we didn't really need.
+  // However, this race is unavoidable, since we don't do the work under a lock.
+  // It's also harmless.
+  //
+  // Of course, we never create more than max_threads_ threads no matter what.
+  int inactive_threads = num_threads_ - active_threads_;
+  int additional_threads = (queue_size_ + 1) - inactive_threads;
+  if (additional_threads > 0 && num_threads_ < max_threads_) {
+    Status status = CreateThreadUnlocked();
+    if (!status.ok()) {
+      if (num_threads_ == 0) {
+        // If we have no threads, we can't do any work.
+        return status;
+      }
+      // If we failed to create a thread, but there are still some other
+      // worker threads, log a warning message and continue.
+      LOG(ERROR) << "Thread pool failed to create thread: "
+                 << status.ToString();
+    }
+  }
+
+  QueueEntry e;
+  e.runnable = std::move(task);
+  e.trace = Trace::CurrentTrace();
+  // Need to AddRef, since the thread which submitted the task may go away,
+  // and we don't want the trace to be destructed while waiting in the queue.
+  if (e.trace) {
+    e.trace->AddRef();
+  }
+  e.submit_time = submit_time;
+
+  queue_.emplace_back(std::move(e));
+  int length_at_submit = queue_size_++;
+
+  guard.Unlock();
+  not_empty_.Signal();
+
+  if (queue_length_histogram_) {
+    queue_length_histogram_->Increment(length_at_submit);
+  }
+
+  return Status::OK();
+}
+
+void ThreadPool::Wait() {
+  MutexLock unique_lock(lock_);
+  CheckNotPoolThreadUnlocked();
+  while ((!queue_.empty()) || (active_threads_ > 0)) {
+    idle_cond_.Wait();
+  }
+}
+
+bool ThreadPool::WaitUntil(const MonoTime& until) {
+  return WaitFor(until - MonoTime::Now());
+}
+
+bool ThreadPool::WaitFor(const MonoDelta& delta) {
+  MutexLock unique_lock(lock_);
+  CheckNotPoolThreadUnlocked();
+  while ((!queue_.empty()) || (active_threads_ > 0)) {
+    if (!idle_cond_.TimedWait(delta)) {
+      return false;
+    }
+  }
+  return true;
+}
+
+
+void ThreadPool::SetQueueLengthHistogram(const scoped_refptr<Histogram>& hist) {
+  queue_length_histogram_ = hist;
+}
+
+void ThreadPool::SetQueueTimeMicrosHistogram(const scoped_refptr<Histogram>& hist) {
+  queue_time_us_histogram_ = hist;
+}
+
+void ThreadPool::SetRunTimeMicrosHistogram(const scoped_refptr<Histogram>& hist) {
+  run_time_us_histogram_ = hist;
+}
+
+void ThreadPool::DispatchThread(bool permanent) {
+  MutexLock unique_lock(lock_);
+  while (true) {
+    // Note: Status::Aborted() is used to indicate normal shutdown.
+    if (!pool_status_.ok()) {
+      VLOG(2) << "DispatchThread exiting: " << pool_status_.ToString();
+      break;
+    }
+
+    if (queue_.empty()) {
+      if (permanent) {
+        not_empty_.Wait();
+      } else {
+        if (!not_empty_.TimedWait(idle_timeout_)) {
+          // After much investigation, it appears that pthread condition variables have
+          // a weird behavior in which they can return ETIMEDOUT from timed_wait even if
+          // another thread did in fact signal. Apparently after a timeout there is some
+          // brief period during which another thread may actually grab the internal mutex
+          // protecting the state, signal, and release again before we get the mutex. So,
+          // we'll recheck the empty queue case regardless.
+          if (queue_.empty()) {
+            VLOG(3) << "Releasing worker thread from pool " << name_ << " after "
+                    << idle_timeout_.ToMilliseconds() << "ms of idle time.";
+            break;
+          }
+        }
+      }
+      continue;
+    }
+
+    // Fetch a pending task
+    QueueEntry entry = std::move(queue_.front());
+    queue_.pop_front();
+    queue_size_--;
+    ++active_threads_;
+
+    unique_lock.Unlock();
+
+    // Release the reference which was held by the queued item.
+    ADOPT_TRACE(entry.trace);
+    if (entry.trace) {
+      entry.trace->Release();
+    }
+
+    // Update metrics
+    MonoTime now(MonoTime::Now());
+    int64_t queue_time_us = (now - entry.submit_time).ToMicroseconds();
+    TRACE_COUNTER_INCREMENT(queue_time_trace_metric_name_, queue_time_us);
+    if (queue_time_us_histogram_) {
+      queue_time_us_histogram_->Increment(queue_time_us);
+    }
+
+    // Execute the task
+    {
+      MicrosecondsInt64 start_wall_us = GetMonoTimeMicros();
+      MicrosecondsInt64 start_cpu_us = GetThreadCpuTimeMicros();
+
+      entry.runnable->Run();
+
+      int64_t wall_us = GetMonoTimeMicros() - start_wall_us;
+      int64_t cpu_us = GetThreadCpuTimeMicros() - start_cpu_us;
+
+      if (run_time_us_histogram_) {
+        run_time_us_histogram_->Increment(wall_us);
+      }
+      TRACE_COUNTER_INCREMENT(run_wall_time_trace_metric_name_, wall_us);
+      TRACE_COUNTER_INCREMENT(run_cpu_time_trace_metric_name_, cpu_us);
+    }
+    // Destruct the task while we do not hold the lock.
+    //
+    // The task's destructor may be expensive if it has a lot of bound
+    // objects, and we don't want to block submission of the threadpool.
+    // In the worst case, the destructor might even try to do something
+    // with this threadpool, and produce a deadlock.
+    entry.runnable.reset();
+    unique_lock.Lock();
+
+    if (--active_threads_ == 0) {
+      idle_cond_.Broadcast();
+    }
+  }
+
+  // It's important that we hold the lock between exiting the loop and dropping
+  // num_threads_. Otherwise it's possible someone else could come along here
+  // and add a new task just as the last running thread is about to exit.
+  CHECK(unique_lock.OwnsLock());
+
+  CHECK_EQ(threads_.erase(Thread::current_thread()), 1);
+  if (--num_threads_ == 0) {
+    no_threads_cond_.Broadcast();
+
+    // Sanity check: if we're the last thread exiting, the queue ought to be
+    // empty. Otherwise it will never get processed.
+    CHECK(queue_.empty());
+    DCHECK_EQ(0, queue_size_);
+  }
+}
+
+Status ThreadPool::CreateThreadUnlocked() {
+  // The first few threads are permanent, and do not time out.
+  bool permanent = (num_threads_ < min_threads_);
+  scoped_refptr<Thread> t;
+  Status s = kudu::Thread::Create("thread pool", strings::Substitute("$0 [worker]", name_),
+                                  &ThreadPool::DispatchThread, this, permanent, &t);
+  if (s.ok()) {
+    InsertOrDie(&threads_, t.get());
+    num_threads_++;
+  }
+  return s;
+}
+
+void ThreadPool::CheckNotPoolThreadUnlocked() {
+  Thread* current = Thread::current_thread();
+  if (ContainsKey(threads_, current)) {
+    LOG(FATAL) << Substitute("Thread belonging to thread pool '$0' with "
+        "name '$1' called pool function that would result in deadlock",
+        name_, current->name());
+  }
+}
+
+} // namespace kudu



Mime
View raw message