impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbap...@apache.org
Subject [8/8] incubator-impala git commit: IMPALA-5750: Catch exceptions from boost thread creation
Date Thu, 07 Sep 2017 03:50:26 GMT
IMPALA-5750: Catch exceptions from boost thread creation

The boost thread constructor will throw boost::thread_resource_error
if it is unable to spawn a thread on the system
(e.g. due to a ulimit). This uncaught exception crashes
Impala. Systems with a large number of nodes and threads
are hitting this limit.

This change catches the exception from the thread
constructor and converts it to a Status. This requires
several changes:
1. util/thread.h's Thread constructor is now private
and all Threads are constructed via a new Create()
static factory method.
2. util/thread-pool.h's ThreadPool requires that Init()
be called after the ThreadPool is constructed.
3. To propagate the Status, Threads cannot be created in
constructors, so this is moved to initialization methods
that can return Status.
4. Threads now use unique_ptr's for management in all
cases. Threads cannot be used as stack-allocated local
variables or direct declarations in classes.

Query execution code paths will now handle the error:
1. If the scan node fails to spawn any scanner thread,
it will abort the query.
2. Failing to spawn a fragment instance from the query
state in StartFInstances() will correctly report the error
to the coordinator and tear down the query.

Testing:
This introduces the parameter thread_creation_fault_injection,
which will cause Thread::Create() calls in eligible
locations to fail randomly roughly 1% of the time.
Quite a few locations of Thread::Create() and
ThreadPool::Init() are necessary for startup and cannot
be eligible. However, all the locations used for query
execution are marked as eligible and governed by this
parameter. The code was tested by setting this parameter
to true and running queries to verify that queries either
run to completion with the correct result or fail with
appropriate status.

Change-Id: I15a2f278dc71892b7fec09593f81b1a57ab725c0
Reviewed-on: http://gerrit.cloudera.org:8080/7730
Reviewed-by: Alex Behm <alex.behm@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/e993b971
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/e993b971
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/e993b971

Branch: refs/heads/master
Commit: e993b9712c81dfb66fdf65bb5269cdc38a8eef18
Parents: 897f025
Author: Joe McDonnell <joemcdonnell@cloudera.com>
Authored: Wed Aug 16 17:16:45 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Thu Sep 7 03:25:30 2017 +0000

----------------------------------------------------------------------
 be/src/benchmarks/thread-create-benchmark.cc    | 10 ++-
 be/src/catalog/catalog-server.cc                | 12 ++-
 be/src/catalog/catalog-server.h                 |  3 +-
 be/src/catalog/catalogd-main.cc                 |  2 +-
 be/src/common/global-flags.cc                   |  3 +
 be/src/common/init.cc                           | 21 ++---
 be/src/common/init.h                            |  3 +-
 be/src/exec/blocking-join-node.cc               | 13 +++-
 be/src/exec/hdfs-scan-node.cc                   | 64 ++++++++++------
 be/src/exec/hdfs-scan-node.h                    |  7 +-
 be/src/exec/kudu-scan-node.cc                   | 52 +++++++++----
 be/src/exec/kudu-scan-node.h                    |  7 ++
 be/src/rpc/TAcceptQueueServer.cpp               |  9 +++
 be/src/rpc/auth-provider.h                      |  3 +-
 be/src/rpc/authentication.cc                    |  4 +-
 be/src/rpc/thrift-server-test.cc                |  1 +
 be/src/rpc/thrift-server.cc                     |  7 +-
 be/src/rpc/thrift-server.h                      |  2 +-
 be/src/rpc/thrift-thread.cc                     | 14 +++-
 be/src/rpc/thrift-thread.h                      |  2 +-
 be/src/runtime/data-stream-sender.cc            |  3 +-
 be/src/runtime/disk-io-mgr-handle-cache.h       |  3 +-
 .../runtime/disk-io-mgr-handle-cache.inline.h   |  6 +-
 be/src/runtime/disk-io-mgr.cc                   |  8 +-
 be/src/runtime/exec-env.cc                      |  5 ++
 be/src/runtime/fragment-instance-state.cc       |  5 +-
 be/src/runtime/fragment-instance-state.h        |  2 +-
 be/src/runtime/parallel-executor.cc             | 11 ++-
 be/src/runtime/query-exec-mgr.cc                | 14 +++-
 be/src/runtime/query-state.cc                   | 37 ++++++---
 be/src/runtime/query-state.h                    |  6 +-
 be/src/runtime/thread-resource-mgr.h            | 11 ++-
 be/src/scheduling/admission-controller.cc       | 11 ++-
 be/src/scheduling/admission-controller.h        |  2 +-
 be/src/service/child-query.cc                   |  9 ++-
 be/src/service/child-query.h                    |  6 +-
 be/src/service/client-request-state.cc          | 10 ++-
 be/src/service/client-request-state.h           |  4 +-
 be/src/service/impala-beeswax-server.cc         |  8 +-
 be/src/service/impala-hs2-server.cc             | 18 ++---
 be/src/service/impala-server.cc                 | 22 +++---
 be/src/service/impala-server.h                  | 10 +--
 be/src/service/impalad-main.cc                  |  3 +-
 be/src/statestore/statestore-subscriber.cc      |  4 +-
 be/src/statestore/statestore-subscriber.h       |  2 +-
 be/src/statestore/statestore.cc                 |  6 ++
 be/src/statestore/statestore.h                  |  5 ++
 be/src/statestore/statestored-main.cc           |  3 +-
 be/src/testutil/in-process-servers.cc           |  5 +-
 be/src/testutil/in-process-servers.h            |  2 +-
 be/src/util/thread-pool-test.cc                 |  1 +
 be/src/util/thread-pool.h                       | 70 +++++++++++++----
 be/src/util/thread.cc                           | 39 ++++++++--
 be/src/util/thread.h                            | 80 ++++++++++++--------
 common/thrift/generate_error_codes.py           |  2 +
 55 files changed, 456 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/benchmarks/thread-create-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/thread-create-benchmark.cc b/be/src/benchmarks/thread-create-benchmark.cc
index cac1f1c..7b57242 100644
--- a/be/src/benchmarks/thread-create-benchmark.cc
+++ b/be/src/benchmarks/thread-create-benchmark.cc
@@ -87,14 +87,16 @@ void NativeThreadStarter(int num_threads, const function<void ()>& f) {
 
 // Runs N Impala Threads, each executing 'f'
 void ImpalaThreadStarter(int num_threads, const function<void ()>& f) {
-  vector<Thread*> threads;
+  vector<unique_ptr<Thread>> threads;
   threads.reserve(num_threads);
   for (int i=0; i < num_threads; ++i) {
-    threads.push_back(new Thread("mythreadgroup", "thread", f));
+    unique_ptr<Thread> thread;
+    Status s = Thread::Create("mythreadgroup", "thread", f, &thread);
+    DCHECK(s.ok());
+    threads.push_back(move(thread));
   }
-  for (Thread* thread: threads) {
+  for (unique_ptr<Thread>& thread: threads) {
     thread->Join();
-    delete thread;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 0e0cca0..38a64d1 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -170,9 +170,13 @@ Status CatalogServer::Start() {
 
   // This will trigger a full Catalog metadata load.
   catalog_.reset(new Catalog());
-  catalog_update_gathering_thread_.reset(new Thread("catalog-server",
-      "catalog-update-gathering-thread",
-      &CatalogServer::GatherCatalogUpdatesThread, this));
+  Status status = Thread::Create("catalog-server", "catalog-update-gathering-thread",
+      &CatalogServer::GatherCatalogUpdatesThread, this,
+      &catalog_update_gathering_thread_);
+  if (!status.ok()) {
+    status.AddDetail("CatalogService failed to start");
+    return status;
+  }
 
   statestore_subscriber_.reset(new StatestoreSubscriber(
      Substitute("catalog-server@$0", TNetworkAddressToString(server_address)),
@@ -180,7 +184,7 @@ Status CatalogServer::Start() {
 
   StatestoreSubscriber::UpdateCallback cb =
       bind<void>(mem_fn(&CatalogServer::UpdateCatalogTopicCallback), this, _1, _2);
-  Status status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC, false, cb);
+  status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC, false, cb);
   if (!status.ok()) {
     status.AddDetail("CatalogService failed to start");
     return status;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/catalog/catalog-server.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 17b8732..9d33591 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -36,7 +36,6 @@ namespace impala {
 
 class StatestoreSubscriber;
 class Catalog;
-class TGetAllCatalogObjectsResponse;
 
 /// The Impala CatalogServer manages the caching and persistence of cluster-wide metadata.
 /// The CatalogServer aggregates the metadata from the Hive Metastore, the NameNode,
@@ -86,7 +85,7 @@ class CatalogServer {
   StatsMetric<double>* topic_processing_time_metric_;
 
   /// Thread that polls the catalog for any updates.
-  boost::scoped_ptr<Thread> catalog_update_gathering_thread_;
+  std::unique_ptr<Thread> catalog_update_gathering_thread_;
 
   /// Tracks the set of catalog objects that exist via their topic entry key.
   /// During each IMPALA_CATALOG_TOPIC heartbeat, stores the set of known catalog objects

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/catalog/catalogd-main.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalogd-main.cc b/be/src/catalog/catalogd-main.cc
index a3a0edb..f98a406 100644
--- a/be/src/catalog/catalogd-main.cc
+++ b/be/src/catalog/catalogd-main.cc
@@ -75,7 +75,7 @@ int CatalogdMain(int argc, char** argv) {
 
   ABORT_IF_ERROR(metrics->Init(FLAGS_enable_webserver ? webserver.get() : nullptr));
   ABORT_IF_ERROR(RegisterMemoryMetrics(metrics.get(), true, nullptr, nullptr));
-  StartMemoryMaintenanceThread();
+  ABORT_IF_ERROR(StartMemoryMaintenanceThread());
   ABORT_IF_ERROR(StartThreadInstrumentation(metrics.get(), webserver.get(), true));
 
   InitRpcEventTracing(webserver.get());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index ccffcb3..e5420a2 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -137,6 +137,9 @@ DEFINE_int32(fault_injection_rpc_exception_type, 0, "A fault injection option th
     "in debug builds only");
 DEFINE_int32(stress_scratch_write_delay_ms, 0, "A stress option which causes writes to "
     "scratch files to be to be delayed to simulate slow writes.");
+DEFINE_bool(thread_creation_fault_injection, false, "A fault injection option that "
+    " causes calls to Thread::Create() to fail randomly 1% of the time on eligible "
+    " codepaths. Effective in debug builds only.");
 #endif
 
 // Used for testing the path where the Kudu client is stubbed.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/common/init.cc
----------------------------------------------------------------------
diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index ce55067..003ebf9 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -95,18 +95,18 @@ using std::string;
 // glog only automatically flushes the log file if logbufsecs has passed since the
 // previous flush when a new log is written. That means that on a quiet system, logs
 // will be buffered indefinitely. It also rotates log files.
-static scoped_ptr<impala::Thread> log_maintenance_thread;
+static unique_ptr<impala::Thread> log_maintenance_thread;
 
 // Memory Maintenance thread that runs periodically to free up memory. It does the
 // following things every memory_maintenance_sleep_time_ms secs:
 // 1) Releases BufferPool memory that is not currently in use.
 // 2) Frees excess memory that TCMalloc has left in its pageheap.
-static scoped_ptr<impala::Thread> memory_maintenance_thread;
+static unique_ptr<impala::Thread> memory_maintenance_thread;
 
 // A pause monitor thread to monitor process pauses in impala daemons. The thread sleeps
 // for a short interval of time (THREAD_SLEEP_TIME_MS), wakes up and calculates the actual
 // time slept. If that exceeds PAUSE_WARN_THRESHOLD_MS, a warning is logged.
-static scoped_ptr<impala::Thread> pause_monitor;
+static unique_ptr<impala::Thread> pause_monitor;
 
 [[noreturn]] static void LogMaintenanceThread() {
   while (true) {
@@ -205,10 +205,13 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
   ABORT_IF_ERROR(impala::InitAuth(argv[0]));
 
   // Initialize maintenance_thread after InitGoogleLoggingSafe and InitThreading.
-  log_maintenance_thread.reset(
-      new Thread("common", "log-maintenance-thread", &LogMaintenanceThread));
+  Status thread_spawn_status = Thread::Create("common", "log-maintenance-thread",
+      &LogMaintenanceThread, &log_maintenance_thread);
+  if (!thread_spawn_status.ok()) CLEAN_EXIT_WITH_ERROR(thread_spawn_status.GetDetail());
 
-  pause_monitor.reset(new Thread("common", "pause-monitor", &PauseMonitorLoop));
+  thread_spawn_status = Thread::Create("common", "pause-monitor",
+      &PauseMonitorLoop, &pause_monitor);
+  if (!thread_spawn_status.ok()) CLEAN_EXIT_WITH_ERROR(thread_spawn_status.GetDetail());
 
   PeriodicCounterUpdater::Init();
 
@@ -251,8 +254,8 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
 #endif
 }
 
-void impala::StartMemoryMaintenanceThread() {
+Status impala::StartMemoryMaintenanceThread() {
   DCHECK(AggregateMemoryMetrics::NUM_MAPS != nullptr) << "Mem metrics not registered.";
-  memory_maintenance_thread.reset(
-      new Thread("common", "memory-maintenance-thread", &MemoryMaintenanceThread));
+  return Thread::Create("common", "memory-maintenance-thread",
+      &MemoryMaintenanceThread, &memory_maintenance_thread);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/common/init.h
----------------------------------------------------------------------
diff --git a/be/src/common/init.h b/be/src/common/init.h
index b1de6d8..fe8a063 100644
--- a/be/src/common/init.h
+++ b/be/src/common/init.h
@@ -19,6 +19,7 @@
 #define IMPALA_COMMON_INIT_H
 
 #include "util/test-info.h"
+#include "common/status.h"
 
 namespace impala {
 
@@ -33,7 +34,7 @@ void InitCommonRuntime(int argc, char** argv, bool init_jvm,
 /// Starts background memory maintenance thread. Must be called after
 /// RegisterMemoryMetrics(). This thread is needed for daemons to free memory and
 /// refresh metrics but is not needed for standalone tests.
-void StartMemoryMaintenanceThread();
+Status StartMemoryMaintenanceThread() WARN_UNUSED_RESULT;
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index 7ccddc0..477e934 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -193,10 +193,15 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
     runtime_profile()->AppendExecOption("Join Build-Side Prepared Asynchronously");
     string thread_name = Substitute("join-build-thread (finst:$0, plan-node-id:$1)",
         PrintId(state->fragment_instance_id()), id());
-    Thread build_thread(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,
-        [this, state, build_sink, status=&build_side_status]() {
+    unique_ptr<Thread> build_thread;
+    Status thread_status = Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME,
+        thread_name, [this, state, build_sink, status=&build_side_status]() {
           ProcessBuildInputAsync(state, build_sink, status);
-        });
+        }, &build_thread, true);
+    if (!thread_status.ok()) {
+      state->resource_pool()->ReleaseThreadToken(false);
+      return thread_status;
+    }
     // Open the left child so that it may perform any initialisation in parallel.
     // Don't exit even if we see an error, we still need to wait for the build thread
     // to finish.
@@ -207,7 +212,7 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
 
     // Blocks until ProcessBuildInput has returned, after which the build side structures
     // are fully constructed.
-    build_thread.Join();
+    build_thread->Join();
     RETURN_IF_ERROR(build_side_status);
     RETURN_IF_ERROR(open_status);
   } else if (IsInSubplan()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 2dc8d7b..528e290 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -320,6 +320,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
   // TODO: It would be good to have a test case for that.
   if (!initial_ranges_issued_) return;
 
+  Status status = Status::OK();
   while (true) {
     // The lock must be given up between loops in order to give writers to done_,
     // all_ranges_started_ etc. a chance to grab the lock.
@@ -347,14 +348,32 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
     }
 
     COUNTER_ADD(&active_scanner_thread_counter_, 1);
-    COUNTER_ADD(num_scanner_threads_started_counter_, 1);
     string name = Substitute("scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)",
         PrintId(runtime_state_->fragment_instance_id()), id(),
         num_scanner_threads_started_counter_->value());
 
     auto fn = [this]() { this->ScannerThread(); };
-    scanner_threads_.AddThread(
-        make_unique<Thread>(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn));
+    std::unique_ptr<Thread> t;
+    status =
+      Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, &t, true);
+    if (!status.ok()) {
+      COUNTER_ADD(&active_scanner_thread_counter_, -1);
+      // Release the token and skip running callbacks to find a replacement. Skipping
+      // serves two purposes. First, it prevents a mutual recursion between this function
+      // and ReleaseThreadToken()->InvokeCallbacks(). Second, Thread::Create() failed and
+      // is likely to continue failing for future callbacks.
+      pool->ReleaseThreadToken(false, true);
+
+      // Abort the query. This is still holding the lock_, so done_ is known to be
+      // false and status_ must be ok.
+      DCHECK(status_.ok());
+      status_ = status;
+      SetDoneInternal();
+      break;
+    }
+    // Thread successfully started
+    COUNTER_ADD(num_scanner_threads_started_counter_, 1);
+    scanner_threads_.AddThread(move(t));
   }
 }
 
@@ -420,21 +439,18 @@ void HdfsScanNode::ScannerThread() {
     }
 
     if (!status.ok()) {
-      {
-        unique_lock<mutex> l(lock_);
-        // If there was already an error, the main thread will do the cleanup
-        if (!status_.ok()) break;
-
-        if (status.IsCancelled() && done_) {
-          // Scan node initiated scanner thread cancellation.  No need to do anything.
-          break;
-        }
-        // Set status_ before calling SetDone() (which shuts down the RowBatchQueue),
-        // to ensure that GetNextInternal() notices the error status.
-        status_ = status;
-      }
+      unique_lock<mutex> l(lock_);
+      // If there was already an error, the main thread will do the cleanup
+      if (!status_.ok()) break;
 
-      SetDone();
+      if (status.IsCancelled() && done_) {
+        // Scan node initiated scanner thread cancellation.  No need to do anything.
+        break;
+      }
+      // Set status_ before calling SetDone() (which shuts down the RowBatchQueue),
+      // to ensure that GetNextInternal() notices the error status.
+      status_ = status;
+      SetDoneInternal();
       break;
     }
 
@@ -542,14 +558,16 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
   return status;
 }
 
-void HdfsScanNode::SetDone() {
-  {
-    unique_lock<mutex> l(lock_);
-    if (done_) return;
-    done_ = true;
-  }
+void HdfsScanNode::SetDoneInternal() {
+  if (done_) return;
+  done_ = true;
   if (reader_context_ != NULL) {
     runtime_state_->io_mgr()->CancelContext(reader_context_);
   }
   materialized_row_batches_->Shutdown();
 }
+
+void HdfsScanNode::SetDone() {
+  unique_lock<mutex> l(lock_);
+  SetDoneInternal();
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index b056e2e..18a74ad 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -180,8 +180,11 @@ class HdfsScanNode : public HdfsScanNodeBase {
   Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos)
       WARN_UNUSED_RESULT;
 
-  /// sets done_ to true and triggers threads to cleanup. Cannot be called with
-  /// any locks taken. Calling it repeatedly ignores subsequent calls.
+  /// Sets done_ to true and triggers threads to cleanup. Must be called with lock_
+  /// taken. Calling it repeatedly ignores subsequent calls.
+  void SetDoneInternal();
+
+  /// Gets lock_ and calls SetDoneInternal()
   void SetDone();
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index e192a86..7f18710 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -110,9 +110,7 @@ Status KuduScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
       COUNTER_SET(rows_returned_counter_, num_rows_returned_);
       *eos = true;
 
-      unique_lock<mutex> l(lock_);
-      done_ = true;
-      materialized_row_batches_->Shutdown();
+      SetDone();
     }
     materialized_batch.reset();
   } else {
@@ -130,11 +128,7 @@ void KuduScanNode::Close(RuntimeState* state) {
     state->resource_pool()->RemoveThreadAvailableCb(thread_avail_cb_id_);
   }
 
-  if (!done_) {
-    unique_lock<mutex> l(lock_);
-    done_ = true;
-    materialized_row_batches_->Shutdown();
-  }
+  SetDone();
 
   scanner_threads_.JoinAll();
   DCHECK_EQ(num_active_scanners_, 0);
@@ -151,9 +145,6 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
     // Check if we can get a token.
     if (!pool->TryAcquireThreadToken()) break;
 
-    ++num_active_scanners_;
-    COUNTER_ADD(num_scanner_threads_started_counter_, 1);
-
     string name = Substitute(
         "kudu-scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)",
         PrintId(runtime_state_->fragment_instance_id()), id(),
@@ -162,9 +153,28 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
     // Reserve the first token so no other thread picks it up.
     const string* token = GetNextScanToken();
     auto fn = [this, token, name]() { this->RunScannerThread(name, token); };
+    std::unique_ptr<Thread> t;
+    Status status =
+      Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, &t, true);
+    if (!status.ok()) {
+      // Release the token and skip running callbacks to find a replacement. Skipping
+      // serves two purposes. First, it prevents a mutual recursion between this function
+      // and ReleaseThreadToken()->InvokeCallbacks(). Second, Thread::Create() failed and
+      // is likely to continue failing for future callbacks.
+      pool->ReleaseThreadToken(false, true);
+
+      // Abort the query. This is still holding the lock_, so done_ is known to be
+      // false and status_ must be ok.
+      DCHECK(status_.ok());
+      status_ = status;
+      SetDoneInternal();
+      break;
+    }
+    // Thread successfully started
+    COUNTER_ADD(num_scanner_threads_started_counter_, 1);
+    ++num_active_scanners_;
     VLOG_RPC << "Thread started: " << name;
-    scanner_threads_.AddThread(
-        make_unique<Thread>(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn));
+    scanner_threads_.AddThread(move(t));
   }
 }
 
@@ -231,14 +241,13 @@ void KuduScanNode::RunScannerThread(const string& name, const string* initial_to
     unique_lock<mutex> l(lock_);
     if (!status.ok() && status_.ok()) {
       status_ = status;
-      done_ = true;
+      SetDoneInternal();
     }
     // Decrement num_active_scanners_ unless handling the case of an early exit when
     // optional threads have been exceeded, in which case it already was decremented.
     if (!optional_thread_exiting) --num_active_scanners_;
     if (num_active_scanners_ == 0) {
-      done_ = true;
-      materialized_row_batches_->Shutdown();
+      SetDoneInternal();
     }
   }
 
@@ -248,4 +257,15 @@ void KuduScanNode::RunScannerThread(const string& name, const string* initial_to
   runtime_state_->resource_pool()->ReleaseThreadToken(false);
 }
 
+void KuduScanNode::SetDoneInternal() {
+  if (done_) return;
+  done_ = true;
+  materialized_row_batches_->Shutdown();
+}
+
+void KuduScanNode::SetDone() {
+  unique_lock<mutex> l(lock_);
+  SetDoneInternal();
+}
+
 }  // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/exec/kudu-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.h b/be/src/exec/kudu-scan-node.h
index 6341cb6..4759f0a 100644
--- a/be/src/exec/kudu-scan-node.h
+++ b/be/src/exec/kudu-scan-node.h
@@ -96,6 +96,13 @@ class KuduScanNode : public KuduScanNodeBase {
   /// in 'materialized_row_batches_' until the scanner reports eos, an error occurs, or
   /// the limit is reached.
   Status ProcessScanToken(KuduScanner* scanner, const std::string& scan_token);
+
+  /// Sets done_ to true and triggers threads to cleanup. Must be called with lock_
+  /// taken. Calling it repeatedly ignores subsequent calls.
+  void SetDoneInternal();
+
+  /// Gets lock_ and calls SetDoneInternal()
+  void SetDone();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/rpc/TAcceptQueueServer.cpp
----------------------------------------------------------------------
diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp
index 65fdc46..030d714 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -31,6 +31,7 @@
 #include <unistd.h>
 #endif
 
+#include "common/status.h"
 #include "util/thread-pool.h"
 
 DEFINE_int32(accepted_cnxn_queue_depth, 10000,
@@ -217,6 +218,14 @@ void TAcceptQueueServer::serve() {
       [this](int tid, const shared_ptr<TTransport>& item) {
         this->SetupConnection(item);
       });
+  // Initialize the thread pool
+  Status status = connection_setup_pool.Init();
+  if (!status.ok()) {
+    status.AddDetail("TAcceptQueueServer: thread pool could not start.");
+    string errStr = status.GetDetail();
+    GlobalOutput(errStr.c_str());
+    stop_ = true;
+  }
 
   while (!stop_) {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/rpc/auth-provider.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/auth-provider.h b/be/src/rpc/auth-provider.h
index 0021dc7..9286154 100644
--- a/be/src/rpc/auth-provider.h
+++ b/be/src/rpc/auth-provider.h
@@ -19,7 +19,6 @@
 #define IMPALA_RPC_AUTH_PROVIDER_H
 
 #include <string>
-#include <boost/scoped_ptr.hpp>
 #include <boost/thread/mutex.hpp>
 #include <sasl/sasl.h>
 
@@ -142,7 +141,7 @@ class SaslAuthProvider : public AuthProvider {
   bool needs_kinit_;
 
   /// Runs "RunKinit" below if needs_kinit_ is true.
-  boost::scoped_ptr<Thread> kinit_thread_;
+  std::unique_ptr<Thread> kinit_thread_;
 
   /// Periodically (roughly once every FLAGS_kerberos_reinit_interval minutes) calls kinit
   /// to get a ticket granting ticket from the kerberos server for principal_, which is

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/rpc/authentication.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index 1a56f7c..275963f 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -835,8 +835,8 @@ Status SaslAuthProvider::Start() {
     Promise<Status> first_kinit;
     stringstream thread_name;
     thread_name << "kinit-" << principal_;
-    kinit_thread_.reset(new Thread("authentication", thread_name.str(),
-        &SaslAuthProvider::RunKinit, this, &first_kinit));
+    RETURN_IF_ERROR(Thread::Create("authentication", thread_name.str(),
+        &SaslAuthProvider::RunKinit, this, &first_kinit, &kinit_thread_));
     LOG(INFO) << "Waiting for Kerberos ticket for principal: " << principal_;
     RETURN_IF_ERROR(first_kinit.Get());
     LOG(INFO) << "Kerberos ticket granted to " << principal_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/rpc/thrift-server-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server-test.cc b/be/src/rpc/thrift-server-test.cc
index 6ceaefd..ef50160 100644
--- a/be/src/rpc/thrift-server-test.cc
+++ b/be/src/rpc/thrift-server-test.cc
@@ -415,6 +415,7 @@ TEST(ConcurrencyTest, DISABLED_ManyConcurrentConnections) {
         Status status = client->Open();
         ASSERT_OK(status);
       });
+  ASSERT_OK(pool.Init());
   for (int i = 0; i < 1024 * 16; ++i) pool.Offer(i);
   pool.DrainAndShutdown();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/rpc/thrift-server.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index 467c004..c385a66 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -172,9 +172,9 @@ Status ThriftServer::ThriftServerEventProcessor::StartAndWaitForServer() {
 
   stringstream name;
   name << "supervise-" << thrift_server_->name_;
-  thrift_server_->server_thread_.reset(
-      new Thread("thrift-server", name.str(),
-                 &ThriftServer::ThriftServerEventProcessor::Supervise, this));
+  RETURN_IF_ERROR(Thread::Create("thrift-server", name.str(),
+      &ThriftServer::ThriftServerEventProcessor::Supervise, this,
+      &thrift_server_->server_thread_));
 
   system_time deadline = get_system_time() +
       posix_time::milliseconds(ThriftServer::ThriftServerEventProcessor::TIMEOUT_MS);
@@ -335,7 +335,6 @@ ThriftServer::ThriftServer(const string& name,
     num_worker_threads_(num_worker_threads),
     server_type_(server_type),
     name_(name),
-    server_thread_(NULL),
     server_(NULL),
     processor_(processor),
     connection_handler_(NULL),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/rpc/thrift-server.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index 2002f7f..f889a4e 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -209,7 +209,7 @@ class ThriftServer {
   const std::string name_;
 
   /// Thread that runs ThriftServerEventProcessor::Supervise() in a separate loop
-  boost::scoped_ptr<Thread> server_thread_;
+  std::unique_ptr<Thread> server_thread_;
 
   /// Thrift housekeeping
   boost::scoped_ptr<apache::thrift::server::TServer> server_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/rpc/thrift-thread.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-thread.cc b/be/src/rpc/thrift-thread.cc
index cdda36b..ef2144d 100644
--- a/be/src/rpc/thrift-thread.cc
+++ b/be/src/rpc/thrift-thread.cc
@@ -20,7 +20,10 @@
 #include <boost/bind.hpp>
 #include <sstream>
 
+#include <thrift/concurrency/Exception.h>
+
 #include "common/names.h"
+#include "common/status.h"
 
 using namespace impala;
 
@@ -29,8 +32,15 @@ namespace atc = apache::thrift::concurrency;
 
 void ThriftThread::start() {
   Promise<atc::Thread::id_t> promise;
-  impala_thread_.reset(new impala::Thread(group_, name_,
-      bind(&ThriftThread::RunRunnable, this, runnable(), &promise)));
+  Status status = impala::Thread::Create(group_, name_,
+      bind(&ThriftThread::RunRunnable, this, runnable(), &promise), &impala_thread_);
+
+  // Thread creation failed. Thrift expects an exception in this case. See
+  // the implementation of atc::PosixThreadFactory.cpp or atc::BoostThreadFactory.cpp.
+  if (!status.ok()) {
+    throw atc::SystemResourceException(
+        Substitute("Thread::Create() failed: $0", status.GetDetail()));
+  }
 
   // Blocks until the thread id has been set
   tid_ = promise.Get();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/rpc/thrift-thread.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-thread.h b/be/src/rpc/thrift-thread.h
index 397703f..14943b2 100644
--- a/be/src/rpc/thrift-thread.h
+++ b/be/src/rpc/thrift-thread.h
@@ -93,7 +93,7 @@ class ThriftThread : public apache::thrift::concurrency::Thread {
 
   /// Impala thread that runs the runnable and registers itself with the global
   /// ThreadManager.
-  boost::scoped_ptr<impala::Thread> impala_thread_;
+  std::unique_ptr<impala::Thread> impala_thread_;
 
   /// Thrift thread ID, set by RunRunnable.
   apache::thrift::concurrency::Thread::id_t tid_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc
index 752e3ce..f39a595 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -77,7 +77,7 @@ class DataStreamSender::Channel : public CacheLineAligned {
       dest_node_id_(dest_node_id),
       num_data_bytes_sent_(0),
       rpc_thread_("DataStreamSender", "SenderThread", 1, 1,
-          bind<void>(mem_fn(&Channel::TransmitData), this, _1, _2)),
+          bind<void>(mem_fn(&Channel::TransmitData), this, _1, _2), true),
       rpc_in_flight_(false) {}
 
   // Initialize channel.
@@ -156,6 +156,7 @@ class DataStreamSender::Channel : public CacheLineAligned {
 };
 
 Status DataStreamSender::Channel::Init(RuntimeState* state) {
+  RETURN_IF_ERROR(rpc_thread_.Init());
   runtime_state_ = state;
   // TODO: figure out how to size batch_
   int capacity = max(1, buffer_size_ / max(row_desc_->GetRowSize(), 1));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/disk-io-mgr-handle-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-handle-cache.h b/be/src/runtime/disk-io-mgr-handle-cache.h
index 96add9f..4ba2342 100644
--- a/be/src/runtime/disk-io-mgr-handle-cache.h
+++ b/be/src/runtime/disk-io-mgr-handle-cache.h
@@ -26,6 +26,7 @@
 #include <boost/thread/mutex.hpp>
 
 #include "common/hdfs.h"
+#include "common/status.h"
 #include "util/aligned-new.h"
 #include "util/impalad-metrics.h"
 #include "util/spinlock.h"
@@ -96,7 +97,7 @@ class FileHandleCache {
 
   /// Starts up a thread that monitors the age of file handles and evicts any that
   /// exceed the limit.
-  void Init();
+  Status Init() WARN_UNUSED_RESULT;
 
   /// Get a file handle from the cache for the specified filename (fname) and
   /// last modification time (mtime). This will hash the filename to determine

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/disk-io-mgr-handle-cache.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-handle-cache.inline.h b/be/src/runtime/disk-io-mgr-handle-cache.inline.h
index 76bef95..3068971 100644
--- a/be/src/runtime/disk-io-mgr-handle-cache.inline.h
+++ b/be/src/runtime/disk-io-mgr-handle-cache.inline.h
@@ -69,9 +69,9 @@ FileHandleCache<NUM_PARTITIONS>::~FileHandleCache() {
 }
 
 template <size_t NUM_PARTITIONS>
-void FileHandleCache<NUM_PARTITIONS>::Init() {
-  eviction_thread_.reset(new Thread("disk-io-mgr-handle-cache", "File Handle Timeout",
-      &FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop, this));
+Status FileHandleCache<NUM_PARTITIONS>::Init() {
+  return Thread::Create("disk-io-mgr-handle-cache", "File Handle Timeout",
+      &FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop, this, &eviction_thread_);
 }
 
 template <size_t NUM_PARTITIONS>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index dff6ec5..0fe16d2 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -406,12 +406,14 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
     for (int j = 0; j < num_threads_per_disk; ++j) {
       stringstream ss;
       ss << "work-loop(Disk: " << i << ", Thread: " << j << ")";
-      disk_thread_group_.AddThread(make_unique<Thread>("disk-io-mgr", ss.str(),
-          &DiskIoMgr::WorkLoop, this, disk_queues_[i]));
+      std::unique_ptr<Thread> t;
+      RETURN_IF_ERROR(Thread::Create("disk-io-mgr", ss.str(), &DiskIoMgr::WorkLoop,
+          this, disk_queues_[i], &t));
+      disk_thread_group_.AddThread(move(t));
     }
   }
   request_context_cache_.reset(new RequestContextCache(this));
-  file_handle_cache_.Init();
+  RETURN_IF_ERROR(file_handle_cache_.Init());
 
   cached_read_options_ = hadoopRzOptionsAlloc();
   DCHECK(cached_read_options_ != nullptr);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index fe4825d..3652280 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -216,6 +216,11 @@ Status ExecEnv::InitForFeTests() {
 Status ExecEnv::StartServices() {
   LOG(INFO) << "Starting global services";
 
+  // Initialize thread pools
+  RETURN_IF_ERROR(exec_rpc_thread_pool_->Init());
+  RETURN_IF_ERROR(async_rpc_pool_->Init());
+  RETURN_IF_ERROR(hdfs_op_thread_pool_->Init());
+
   // Initialize global memory limit.
   // Depending on the system configuration, we will have to calculate the process
   // memory limit either based on the available physical memory, or if overcommitting

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 3f5a72f..74f5495 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -226,9 +226,8 @@ Status FragmentInstanceState::Prepare() {
   if (FLAGS_status_report_interval > 0) {
     string thread_name = Substitute("profile-report (finst:$0)", PrintId(instance_id()));
     unique_lock<mutex> l(report_thread_lock_);
-    report_thread_.reset(
-        new Thread(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,
-            [this]() { this->ReportProfileThread(); }));
+    RETURN_IF_ERROR(Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME,
+        thread_name, [this]() { this->ReportProfileThread(); }, &report_thread_, true));
     // Make sure the thread started up, otherwise ReportProfileThread() might get into
     // a race with StopReportThread().
     while (!report_thread_active_) report_thread_started_cv_.wait(l);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/fragment-instance-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 750f983..fa35c6b 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -133,7 +133,7 @@ class FragmentInstanceState {
   RuntimeState* runtime_state_ = nullptr;  // lives in obj_pool()
 
   /// profile reporting-related
-  boost::scoped_ptr<Thread> report_thread_;
+  std::unique_ptr<Thread> report_thread_;
   boost::mutex report_thread_lock_;
 
   /// Indicates that profile reporting thread should stop.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/parallel-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/parallel-executor.cc b/be/src/runtime/parallel-executor.cc
index b7b3cc2..f3dd708 100644
--- a/be/src/runtime/parallel-executor.cc
+++ b/be/src/runtime/parallel-executor.cc
@@ -35,8 +35,15 @@ Status ParallelExecutor::Exec(Function function, void** args, int num_args,
   for (int i = 0; i < num_args; ++i) {
     stringstream ss;
     ss << "worker-thread(" << i << ")";
-    worker_threads.AddThread(make_unique<Thread>("parallel-executor", ss.str(),
-        &ParallelExecutor::Worker, function, args[i], &lock, &status, latencies));
+    std::unique_ptr<Thread> t;
+    Status thread_status = Thread::Create("parallel-executor", ss.str(),
+        &ParallelExecutor::Worker, function, args[i], &lock, &status, latencies, &t);
+    if (!thread_status.ok()) {
+      unique_lock<mutex> l(lock);
+      status = thread_status;
+      break;
+    }
+    worker_threads.AddThread(move(t));
   }
   worker_threads.JoinAll();
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 901ddbd..071c5dd 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -55,10 +55,18 @@ Status QueryExecMgr::StartQuery(const TExecQueryFInstancesParams& params) {
   }
   // avoid blocking the rpc handler thread for too long by starting a new thread for
   // query startup (which takes ownership of the QueryState reference)
-  Thread t("query-exec-mgr",
+  unique_ptr<Thread> t;
+  status = Thread::Create("query-exec-mgr",
       Substitute("start-query-finstances-$0", PrintId(query_id)),
-      &QueryExecMgr::StartQueryHelper, this, qs);
-  t.Detach();
+          &QueryExecMgr::StartQueryHelper, this, qs, &t, true);
+  if (!status.ok()) {
+    // decrement refcount taken in QueryState::Init()
+    qs->ReleaseInitialReservationRefcount();
+    // decrement refcount taken in GetOrCreateQueryState()
+    ReleaseQueryState(qs);
+    return status;
+  }
+  t->Detach();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 4311e27..4c5eb17 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -291,6 +291,7 @@ void QueryState::StartFInstances() {
   VLOG_QUERY << "descriptor table for query=" << PrintId(query_id())
              << "\n" << desc_tbl_->DebugString();
 
+  Status thread_create_status;
   DCHECK_GT(rpc_params_.fragment_ctxs.size(), 0);
   TPlanFragmentCtx* fragment_ctx = &rpc_params_.fragment_ctxs[0];
   int fragment_ctx_idx = 0;
@@ -305,25 +306,35 @@ void QueryState::StartFInstances() {
     }
     FragmentInstanceState* fis = obj_pool_.Add(
         new FragmentInstanceState(this, *fragment_ctx, instance_ctx));
-    fis_map_.emplace(fis->instance_id(), fis);
-
-    // update fragment_map_
-    vector<FragmentInstanceState*>& fis_list = fragment_map_[instance_ctx.fragment_idx];
-    fis_list.push_back(fis);
 
     // start new thread to execute instance
     refcnt_.Add(1);  // decremented in ExecFInstance()
     initial_reservation_refcnt_.Add(1);  // decremented in ExecFInstance()
     string thread_name = Substitute(
         "exec-finstance (finst:$0)", PrintId(instance_ctx.fragment_instance_id));
-    Thread t(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,
-        [this, fis]() { this->ExecFInstance(fis); });
-    t.Detach();
+    unique_ptr<Thread> t;
+    thread_create_status = Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME,
+        thread_name, [this, fis]() { this->ExecFInstance(fis); }, &t, true);
+    if (!thread_create_status.ok()) {
+      // Undo refcnt increments done immediately prior to Thread::Create(). The
+      // reference counts were both greater than zero before the increments, so
+      // neither of these decrements will free any structures.
+      ReleaseInitialReservationRefcount();
+      ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this);
+      break;
+    }
+    // Fragment instance successfully started
+    fis_map_.emplace(fis->instance_id(), fis);
+    // update fragment_map_
+    vector<FragmentInstanceState*>& fis_list = fragment_map_[instance_ctx.fragment_idx];
+    fis_list.push_back(fis);
+    t->Detach();
   }
 
   // don't return until every instance is prepared and record the first non-OK
-  // (non-CANCELLED if available) status
-  Status prepare_status;
+  // (non-CANCELLED if available) status (including any error from thread creation
+  // above).
+  Status prepare_status = thread_create_status;
   for (auto entry: fis_map_) {
     Status instance_status = entry.second->WaitForPrepare();
     // don't wipe out an error in one instance with the resulting CANCELLED from
@@ -333,6 +344,12 @@ void QueryState::StartFInstances() {
     }
   }
   instances_prepared_promise_.Set(prepare_status);
+  // If this is aborting due to failure in thread creation, report status to the
+  // coordinator to start query cancellation. (Other errors are reported by the
+  // fragment instance itself.)
+  if (!thread_create_status.ok()) {
+    ReportExecStatusAux(true, thread_create_status, nullptr, true);
+  }
 }
 
 void QueryState::ReleaseInitialReservationRefcount() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index fc71772..d9606be 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -155,8 +155,10 @@ class QueryState {
   void ReleaseResources();
 
   /// Sends a ReportExecStatus rpc to the coordinator. If fis == nullptr, the
-  /// status must be an error. If fis is given, expects that fis finished its Prepare
-  /// phase; it then sends a report for that instance, including its profile.
+  /// status must be an error. If fis is given, the content will depend on whether
+  /// the fis has finished its Prepare phase. It sends a report for the instance,
+  /// and it will include the profile if the fis is prepared. If the fis is not
+  /// prepared, the status must be an error.
   /// If there is an error during the rpc, initiates cancellation.
   void ReportExecStatus(bool done, const Status& status, FragmentInstanceState* fis);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/thread-resource-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/thread-resource-mgr.h b/be/src/runtime/thread-resource-mgr.h
index 8b86dcc..bf601bd 100644
--- a/be/src/runtime/thread-resource-mgr.h
+++ b/be/src/runtime/thread-resource-mgr.h
@@ -111,8 +111,10 @@ class ThreadResourceMgr {
     /// each call to AcquireThreadToken and each successful call to TryAcquireThreadToken
     /// If the thread token is from AcquireThreadToken, required must be true; false
     /// if from TryAcquireThreadToken.
-    /// Must not be called from from ThreadAvailableCb.
-    void ReleaseThreadToken(bool required);
+    /// If 'skip_callbacks' is true, ReleaseThreadToken() will not run callbacks to find
+    /// a replacement for this thread. This is dangerous and can lead to underutilization
+    /// of the system.
+    void ReleaseThreadToken(bool required, bool skip_callbacks = false);
 
     /// Register a callback to be notified when a thread is available.
     /// Returns a unique id to be used when removing the callback.
@@ -266,7 +268,8 @@ inline bool ThreadResourceMgr::ResourcePool::TryAcquireThreadToken(bool* is_rese
   }
 }
 
-inline void ThreadResourceMgr::ResourcePool::ReleaseThreadToken(bool required) {
+inline void ThreadResourceMgr::ResourcePool::ReleaseThreadToken(
+    bool required, bool skip_callbacks) {
   if (required) {
     DCHECK_GT(num_required_threads(), 0);
     __sync_fetch_and_add(&num_threads_, -1);
@@ -282,7 +285,7 @@ inline void ThreadResourceMgr::ResourcePool::ReleaseThreadToken(bool required) {
       }
     }
   }
-  InvokeCallbacks();
+  if (!skip_callbacks) InvokeCallbacks();
 }
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 1aadf22..c23f4be 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -218,12 +218,13 @@ AdmissionController::AdmissionController(StatestoreSubscriber* subscriber,
       metrics_group_(metrics),
       host_id_(TNetworkAddressToString(host_addr)),
       thrift_serializer_(false),
-      done_(false) {
-  dequeue_thread_.reset(new Thread("scheduling", "admission-thread",
-        &AdmissionController::DequeueLoop, this));
-}
+      done_(false) {}
 
 AdmissionController::~AdmissionController() {
+  // If the dequeue thread is not running (e.g. if Init() fails), then there is
+  // nothing to do.
+  if (dequeue_thread_ == nullptr) return;
+
   // The AdmissionController should live for the lifetime of the impalad, but
   // for unit tests we need to ensure that no thread is waiting on the
   // condition variable. This notifies the dequeue thread to stop and waits
@@ -238,6 +239,8 @@ AdmissionController::~AdmissionController() {
 }
 
 Status AdmissionController::Init() {
+  RETURN_IF_ERROR(Thread::Create("scheduling", "admission-thread",
+      &AdmissionController::DequeueLoop, this, &dequeue_thread_));
   StatestoreSubscriber::UpdateCallback cb =
     bind<void>(mem_fn(&AdmissionController::UpdatePoolStats), this, _1, _2);
   Status status = subscriber_->AddTopic(IMPALA_REQUEST_QUEUE_TOPIC, true, cb);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 3e49cfb..81b2968 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -219,7 +219,7 @@ class AdmissionController {
   MetricGroup* metrics_group_;
 
   /// Thread dequeuing and admitting queries.
-  boost::scoped_ptr<Thread> dequeue_thread_;
+  std::unique_ptr<Thread> dequeue_thread_;
 
   // The local impalad's host/port id, used to construct topic keys.
   const std::string host_id_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/service/child-query.cc
----------------------------------------------------------------------
diff --git a/be/src/service/child-query.cc b/be/src/service/child-query.cc
index f58aacc..21a2b07 100644
--- a/be/src/service/child-query.cc
+++ b/be/src/service/child-query.cc
@@ -152,16 +152,17 @@ ChildQueryExecutor::~ChildQueryExecutor() {
   DCHECK(!is_running_);
 }
 
-void ChildQueryExecutor::ExecAsync(vector<ChildQuery>&& child_queries) {
+Status ChildQueryExecutor::ExecAsync(vector<ChildQuery>&& child_queries) {
   DCHECK(!child_queries.empty());
   lock_guard<SpinLock> lock(lock_);
   DCHECK(child_queries_.empty());
   DCHECK(child_queries_thread_.get() == NULL);
-  if (is_cancelled_) return;
+  if (is_cancelled_) return Status::OK();
   child_queries_ = move(child_queries);
-  child_queries_thread_.reset(new Thread("query-exec-state", "async child queries",
-      bind(&ChildQueryExecutor::ExecChildQueries, this)));
+  RETURN_IF_ERROR(Thread::Create("query-exec-state", "async child queries",
+      bind(&ChildQueryExecutor::ExecChildQueries, this), &child_queries_thread_));
   is_running_ = true;
+  return Status::OK();
 }
 
 void ChildQueryExecutor::ExecChildQueries() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/service/child-query.h
----------------------------------------------------------------------
diff --git a/be/src/service/child-query.h b/be/src/service/child-query.h
index 36f6197..83bcb58 100644
--- a/be/src/service/child-query.h
+++ b/be/src/service/child-query.h
@@ -19,7 +19,7 @@
 #define IMPALA_SERVICE_CHILD_QUERY_H
 
 #include <string>
-#include <boost/thread.hpp>
+#include <boost/thread/mutex.hpp>
 
 #include "common/status.h"
 #include "impala-server.h"
@@ -156,7 +156,7 @@ class ChildQueryExecutor {
   /// Asynchronously executes 'child_queries' one by one in a new thread. 'child_queries'
   /// must be non-empty. May clear or modify the 'child_queries' arg. Can only be called
   /// once. Does nothing if Cancel() was already called.
-  void ExecAsync(std::vector<ChildQuery>&& child_queries);
+  Status ExecAsync(std::vector<ChildQuery>&& child_queries) WARN_UNUSED_RESULT;
 
   /// Waits for all child queries to complete successfully or with an error. Returns a
   /// non-OK status if a child query fails. Returns OK if ExecAsync() was not called,
@@ -200,7 +200,7 @@ class ChildQueryExecutor {
 
   /// Thread to execute 'child_queries_' in. Immutable after the first time it is set or
   /// after 'is_cancelled_' is true.
-  boost::scoped_ptr<Thread> child_queries_thread_;
+  std::unique_ptr<Thread> child_queries_thread_;
 
   /// The status of the child queries. The status is OK iff all child queries complete
   /// successfully. Otherwise, status contains the error of the first child query that

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 60f799c..2a5b379 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -495,7 +495,9 @@ Status ClientRequestState::ExecDdlRequest() {
           ChildQuery(compute_stats_params.col_stats_query, this, parent_server_));
     }
 
-    if (child_queries.size() > 0) child_query_executor_->ExecAsync(move(child_queries));
+    if (child_queries.size() > 0) {
+      RETURN_IF_ERROR(child_query_executor_->ExecAsync(move(child_queries)));
+    }
     return Status::OK();
   }
 
@@ -592,9 +594,9 @@ Status ClientRequestState::Exec(const TMetadataOpRequest& exec_request) {
   return Status::OK();
 }
 
-void ClientRequestState::WaitAsync() {
-  wait_thread_.reset(new Thread(
-      "query-exec-state", "wait-thread", &ClientRequestState::Wait, this));
+Status ClientRequestState::WaitAsync() {
+  return Thread::Create("query-exec-state", "wait-thread",
+      &ClientRequestState::Wait, this, &wait_thread_, true);
 }
 
 void ClientRequestState::BlockOnWait() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/service/client-request-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 66e206d..6846165 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -78,7 +78,7 @@ class ClientRequestState {
   void Wait();
 
   /// Calls Wait() asynchronously in a thread and returns immediately.
-  void WaitAsync();
+  Status WaitAsync();
 
   /// BlockOnWait() may be called after WaitAsync() has been called in order to wait
   /// for the asynchronous thread (wait_thread_) to complete. It is safe to call this
@@ -250,7 +250,7 @@ class ClientRequestState {
   ExecEnv* exec_env_;
 
   /// Thread for asynchronously running Wait().
-  boost::scoped_ptr<Thread> wait_thread_;
+  std::unique_ptr<Thread> wait_thread_;
 
   /// Condition variable to make BlockOnWait() thread-safe. One thread joins
   /// wait_thread_, and all other threads block on this cv. Used with lock_.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index a170ea1..bcf76b6 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -68,10 +68,14 @@ void ImpalaServer::query(QueryHandle& query_handle, const Query& query) {
   request_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
   // start thread to wait for results to become available, which will allow
   // us to advance query state to FINISHED or EXCEPTION
-  request_state->WaitAsync();
+  Status status = request_state->WaitAsync();
+  if (!status.ok()) {
+    discard_result(UnregisterQuery(request_state->query_id(), false, &status));
+    RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
+  }
   // Once the query is running do a final check for session closure and add it to the
   // set of in-flight queries.
-  Status status = SetQueryInflight(session, request_state);
+  status = SetQueryInflight(session, request_state);
   if (!status.ok()) {
     discard_result(UnregisterQuery(request_state->query_id(), false, &status));
     RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index f32d04e..da8d606 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -464,21 +464,16 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
         QueryResultSet::CreateHS2ResultSet(
             session->hs2_version, *request_state->result_metadata(), nullptr),
         cache_num_rows);
-    if (!status.ok()) {
-      discard_result(UnregisterQuery(request_state->query_id(), false, &status));
-      HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
-    }
+    if (!status.ok()) goto return_error;
   }
   request_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
   // Start thread to wait for results to become available.
-  request_state->WaitAsync();
+  status = request_state->WaitAsync();
+  if (!status.ok()) goto return_error;
   // Once the query is running do a final check for session closure and add it to the
   // set of in-flight queries.
   status = SetQueryInflight(session, request_state);
-  if (!status.ok()) {
-    discard_result(UnregisterQuery(request_state->query_id(), false, &status));
-    HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
-  }
+  if (!status.ok()) goto return_error;
   return_val.__isset.operationHandle = true;
   return_val.operationHandle.__set_operationType(TOperationType::EXECUTE_STATEMENT);
   return_val.operationHandle.__set_hasResultSet(request_state->returns_result_set());
@@ -489,6 +484,11 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
       apache::hive::service::cli::thrift::TStatusCode::SUCCESS_STATUS);
 
   VLOG_QUERY << "ExecuteStatement(): return_val=" << ThriftDebugString(return_val);
+  return;
+
+ return_error:
+  discard_result(UnregisterQuery(request_state->query_id(), false, &status));
+  HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
 }
 
 void ImpalaServer::GetTypeInfo(TGetTypeInfoResp& return_val,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 18582da..7eae5b1 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -383,15 +383,16 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
           "impala-server", "cancellation-worker",
       FLAGS_cancellation_thread_pool_size, MAX_CANCELLATION_QUEUE_SIZE,
       bind<void>(&ImpalaServer::CancelFromThreadPool, this, _1, _2)));
+  ABORT_IF_ERROR(cancellation_thread_pool_->Init());
 
   // Initialize a session expiry thread which blocks indefinitely until the first session
   // with non-zero timeout value is opened. Note that a session which doesn't specify any
   // idle session timeout value will use the default value FLAGS_idle_session_timeout.
-  session_timeout_thread_.reset(new Thread("impala-server", "session-expirer",
-      bind<void>(&ImpalaServer::ExpireSessions, this)));
+  ABORT_IF_ERROR(Thread::Create("impala-server", "session-expirer",
+      bind<void>(&ImpalaServer::ExpireSessions, this), &session_timeout_thread_));
 
-  query_expiration_thread_.reset(new Thread("impala-server", "query-expirer",
-      bind<void>(&ImpalaServer::ExpireQueries, this)));
+  ABORT_IF_ERROR(Thread::Create("impala-server", "query-expirer",
+      bind<void>(&ImpalaServer::ExpireQueries, this), &query_expiration_thread_));
 
   is_coordinator_ = FLAGS_is_coordinator;
   is_executor_ = FLAGS_is_executor;
@@ -448,8 +449,8 @@ Status ImpalaServer::InitLineageLogging() {
   lineage_logger_.reset(new SimpleLogger(FLAGS_lineage_event_log_dir,
       LINEAGE_LOG_FILE_PREFIX, FLAGS_max_lineage_log_file_size));
   RETURN_IF_ERROR(lineage_logger_->Init());
-  lineage_logger_flush_thread_.reset(new Thread("impala-server",
-        "lineage-log-flush", &ImpalaServer::LineageLoggerFlushThread, this));
+  RETURN_IF_ERROR(Thread::Create("impala-server", "lineage-log-flush",
+      &ImpalaServer::LineageLoggerFlushThread, this, &lineage_logger_flush_thread_));
   return Status::OK();
 }
 
@@ -540,8 +541,9 @@ Status ImpalaServer::InitAuditEventLogging() {
   audit_event_logger_.reset(new SimpleLogger(FLAGS_audit_event_log_dir,
      AUDIT_EVENT_LOG_FILE_PREFIX, FLAGS_max_audit_event_log_file_size));
   RETURN_IF_ERROR(audit_event_logger_->Init());
-  audit_event_logger_flush_thread_.reset(new Thread("impala-server",
-        "audit-event-log-flush", &ImpalaServer::AuditEventLoggerFlushThread, this));
+  RETURN_IF_ERROR(Thread::Create("impala-server", "audit-event-log-flush",
+      &ImpalaServer::AuditEventLoggerFlushThread, this,
+      &audit_event_logger_flush_thread_));
   return Status::OK();
 }
 
@@ -600,8 +602,8 @@ Status ImpalaServer::InitProfileLogging() {
       PROFILE_LOG_FILE_PREFIX, FLAGS_max_profile_log_file_size,
       FLAGS_max_profile_log_files));
   RETURN_IF_ERROR(profile_logger_->Init());
-  profile_log_file_flush_thread_.reset(new Thread("impala-server", "log-flush-thread",
-      &ImpalaServer::LogFileFlushThread, this));
+  RETURN_IF_ERROR(Thread::Create("impala-server", "log-flush-thread",
+      &ImpalaServer::LogFileFlushThread, this, &profile_log_file_flush_thread_));
 
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 1e83de5..eb3251c 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -745,13 +745,13 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
   boost::scoped_ptr<SimpleLogger> lineage_logger_;
 
   /// If profile logging is enabled, wakes once every 5s to flush query profiles to disk
-  boost::scoped_ptr<Thread> profile_log_file_flush_thread_;
+  std::unique_ptr<Thread> profile_log_file_flush_thread_;
 
   /// If audit event logging is enabled, wakes once every 5s to flush audit events to disk
-  boost::scoped_ptr<Thread> audit_event_logger_flush_thread_;
+  std::unique_ptr<Thread> audit_event_logger_flush_thread_;
 
   /// If lineage logging is enabled, wakes once every 5s to flush lineage events to disk
-  boost::scoped_ptr<Thread> lineage_logger_flush_thread_;
+  std::unique_ptr<Thread> lineage_logger_flush_thread_;
 
   /// global, per-server state
   ExecEnv* exec_env_;  // not owned
@@ -762,7 +762,7 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
 
   /// Thread that runs ExpireSessions. It will wake up periodically to check for sessions
   /// which are idle for more their timeout values.
-  boost::scoped_ptr<Thread> session_timeout_thread_;
+  std::unique_ptr<Thread> session_timeout_thread_;
 
   /// Contains all the non-zero idle session timeout values.
   std::multiset<int32_t> session_timeout_set_;
@@ -966,7 +966,7 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
   ExpirationQueue queries_by_timestamp_;
 
   /// Container for a thread that runs ExpireQueries() if FLAGS_idle_query_timeout is set.
-  boost::scoped_ptr<Thread> query_expiration_thread_;
+  std::unique_ptr<Thread> query_expiration_thread_;
 
   /// Serializes TBackendDescriptors when creating topic updates
   ThriftSerializer thrift_serializer_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/service/impalad-main.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index 8a7961c..53b7d3e 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -97,7 +97,8 @@ int ImpaladMain(int argc, char** argv) {
     ShutdownLogging();
     exit(1);
   }
-  StartMemoryMaintenanceThread(); // Memory metrics are created in StartServices().
+  // Memory metrics are created in StartServices().
+  ABORT_IF_ERROR(StartMemoryMaintenanceThread());
 
   DCHECK(exec_env.process_mem_tracker() != nullptr)
       << "ExecEnv::StartServices() must be called before starting RPC services";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/statestore/statestore-subscriber.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 7cfcaf7..fa839aa 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -224,8 +224,8 @@ Status StatestoreSubscriber::Start() {
   }
 
   // Registration is finished at this point, so it's fine to release the lock.
-  recovery_mode_thread_.reset(new Thread("statestore-subscriber", "recovery-mode-thread",
-      &StatestoreSubscriber::RecoveryModeChecker, this));
+  RETURN_IF_ERROR(Thread::Create("statestore-subscriber", "recovery-mode-thread",
+      &StatestoreSubscriber::RecoveryModeChecker, this, &recovery_mode_thread_));
 
   return status;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/statestore/statestore-subscriber.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index 65dcac9..49db5d0 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -143,7 +143,7 @@ class StatestoreSubscriber {
   boost::scoped_ptr<impala::TimeoutFailureDetector> failure_detector_;
 
   /// Thread in which RecoveryModeChecker runs.
-  boost::scoped_ptr<Thread> recovery_mode_thread_;
+  std::unique_ptr<Thread> recovery_mode_thread_;
 
   /// Class-wide lock. Protects all subsequent members. Most private methods must
   /// be called holding this lock; this is noted in the method comments.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 5d7738c..75ba5c7 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -256,6 +256,12 @@ Statestore::Statestore(MetricGroup* metrics)
   heartbeat_client_cache_->InitMetrics(metrics, "subscriber-heartbeat");
 }
 
+Status Statestore::Init() {
+  RETURN_IF_ERROR(subscriber_topic_update_threadpool_.Init());
+  RETURN_IF_ERROR(subscriber_heartbeat_threadpool_.Init());
+  return Status::OK();
+}
+
 void Statestore::RegisterWebpages(Webserver* webserver) {
   Webserver::UrlCallback topics_callback =
       bind<void>(mem_fn(&Statestore::TopicsHandler), this, _1, _2);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index b3ba315..05feac3 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -28,6 +28,7 @@
 #include <boost/unordered_map.hpp>
 #include <boost/uuid/uuid_generators.hpp>
 
+#include "common/status.h"
 #include "gen-cpp/StatestoreService.h"
 #include "gen-cpp/StatestoreSubscriber.h"
 #include "gen-cpp/Types_types.h"
@@ -97,6 +98,10 @@ class Statestore : public CacheLineAligned {
   /// The only constructor; initialises member variables only.
   Statestore(MetricGroup* metrics);
 
+  /// Initialize the ThreadPools used for updates and heartbeats. Returns an error if
+  /// ThreadPool initialization fails.
+  Status Init() WARN_UNUSED_RESULT;
+
   /// Registers a new subscriber with the given unique subscriber ID, running a subscriber
   /// service at the given location, with the provided list of topic subscriptions.
   /// The registration_id output parameter is the unique ID for this registration, used to

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/statestore/statestored-main.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestored-main.cc b/be/src/statestore/statestored-main.cc
index 1f06f04..1a11237 100644
--- a/be/src/statestore/statestored-main.cc
+++ b/be/src/statestore/statestored-main.cc
@@ -70,7 +70,7 @@ int StatestoredMain(int argc, char** argv) {
   ABORT_IF_ERROR(
       metrics->Init(FLAGS_enable_webserver ? webserver.get() : nullptr));
   ABORT_IF_ERROR(RegisterMemoryMetrics(metrics.get(), false, nullptr, nullptr));
-  StartMemoryMaintenanceThread();
+  ABORT_IF_ERROR(StartMemoryMaintenanceThread());
   ABORT_IF_ERROR(
     StartThreadInstrumentation(metrics.get(), webserver.get(), false));
   InitRpcEventTracing(webserver.get());
@@ -81,6 +81,7 @@ int StatestoredMain(int argc, char** argv) {
   CommonMetrics::InitCommonMetrics(metrics.get());
 
   Statestore statestore(metrics.get());
+  ABORT_IF_ERROR(statestore.Init());
   statestore.RegisterWebpages(webserver.get());
   boost::shared_ptr<TProcessor> processor(
       new StatestoreServiceProcessor(statestore.thrift_iface()));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/testutil/in-process-servers.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index 5b2a4e5..d0036af 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -165,6 +165,7 @@ InProcessStatestore::InProcessStatestore(int statestore_port, int webserver_port
 }
 
 Status InProcessStatestore::Start() {
+  RETURN_IF_ERROR(statestore_->Init());
   RETURN_IF_ERROR(webserver_->Start());
   boost::shared_ptr<TProcessor> processor(
       new StatestoreServiceProcessor(statestore_->thrift_iface()));
@@ -177,8 +178,8 @@ Status InProcessStatestore::Start() {
   ThriftServer* server;
   ABORT_IF_ERROR(builder.metrics(metrics_.get()).Build(&server));
   statestore_server_.reset(server);
-  statestore_main_loop_.reset(
-      new Thread("statestore", "main-loop", &Statestore::MainLoop, statestore_.get()));
+  RETURN_IF_ERROR(Thread::Create("statestore", "main-loop",
+      &Statestore::MainLoop, statestore_.get(), &statestore_main_loop_));
 
   RETURN_IF_ERROR(statestore_server_->Start());
   return WaitForServer("localhost", statestore_port_, 10, 100);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/testutil/in-process-servers.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.h b/be/src/testutil/in-process-servers.h
index d22c441..9e3b2f5 100644
--- a/be/src/testutil/in-process-servers.h
+++ b/be/src/testutil/in-process-servers.h
@@ -141,7 +141,7 @@ class InProcessStatestore {
   /// Statestore Thrift server
   boost::scoped_ptr<ThriftServer> statestore_server_;
 
-  boost::scoped_ptr<Thread> statestore_main_loop_;
+  std::unique_ptr<Thread> statestore_main_loop_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/util/thread-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/thread-pool-test.cc b/be/src/util/thread-pool-test.cc
index edfe881..a7e4df7 100644
--- a/be/src/util/thread-pool-test.cc
+++ b/be/src/util/thread-pool-test.cc
@@ -46,6 +46,7 @@ TEST(ThreadPoolTest, BasicTest) {
   }
 
   ThreadPool<int> thread_pool("thread-pool", "worker", 5, 250, Count);
+  ASSERT_OK(thread_pool.Init());
   for (int i = 0; i <= OFFERED_RANGE; ++i) {
     ASSERT_TRUE(thread_pool.Offer(i));
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/util/thread-pool.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread-pool.h b/be/src/util/thread-pool.h
index 800f690..4b5dbf0 100644
--- a/be/src/util/thread-pool.h
+++ b/be/src/util/thread-pool.h
@@ -38,24 +38,23 @@ class ThreadPool : public CacheLineAligned {
   /// process.
   typedef boost::function<void (int thread_id, const T& workitem)> WorkFunction;
 
-  /// Creates a new thread pool and start num_threads threads.
+  /// Creates a new thread pool without starting any threads. Code must call
+  /// Init() on this thread pool before any calls to Offer().
   ///  -- num_threads: how many threads are part of this pool
   ///  -- queue_size: the maximum size of the queue on which work items are offered. If the
   ///     queue exceeds this size, subsequent calls to Offer will block until there is
   ///     capacity available.
   ///  -- work_function: the function to run every time an item is consumed from the queue
+  ///  -- fault_injection_eligible - If set to true, allow fault injection at this
+  ///     callsite (see thread_creation_fault_injection). If set to false, fault
+  ///     injection is diabled at this callsite. Thread creation sites that crash
+  ///     Impala or abort startup must have this set to false.
   ThreadPool(const std::string& group, const std::string& thread_prefix,
-      uint32_t num_threads, uint32_t queue_size, const WorkFunction& work_function)
-    : work_function_(work_function),
-      work_queue_(queue_size),
-      shutdown_(false) {
-    for (int i = 0; i < num_threads; ++i) {
-      std::stringstream threadname;
-      threadname << thread_prefix << "(" << i + 1 << ":" << num_threads << ")";
-      threads_.AddThread(std::make_unique<Thread>(group, threadname.str(),
-          boost::bind<void>(boost::mem_fn(&ThreadPool<T>::WorkerThread), this, i)));
-    }
-  }
+      uint32_t num_threads, uint32_t queue_size, const WorkFunction& work_function,
+      bool fault_injection_eligible = false)
+    : group_(group), thread_prefix_(thread_prefix), num_threads_(num_threads),
+      work_function_(work_function), work_queue_(queue_size),
+      fault_injection_eligible_(fault_injection_eligible) {}
 
   /// Destructor ensures that all threads are terminated before this object is freed
   /// (otherwise they may continue to run and reference member variables)
@@ -64,8 +63,32 @@ class ThreadPool : public CacheLineAligned {
     Join();
   }
 
+  /// Create the threads needed for this ThreadPool. Returns an error on any
+  /// error spawning the threads.
+  Status Init() {
+    for (int i = 0; i < num_threads_; ++i) {
+      std::stringstream threadname;
+      threadname << thread_prefix_ << "(" << i + 1 << ":" << num_threads_ << ")";
+      std::unique_ptr<Thread> t;
+      Status status = Thread::Create(group_, threadname.str(),
+          boost::bind<void>(boost::mem_fn(&ThreadPool<T>::WorkerThread), this, i), &t,
+          fault_injection_eligible_);
+      if (!status.ok()) {
+        // The thread pool initialization failed. Shutdown any threads that were
+        // spawned. Note: Shutdown() and Join() are safe to call multiple times.
+        Shutdown();
+        Join();
+        return status;
+      }
+      threads_.AddThread(std::move(t));
+    }
+    initialized_ = true;
+    return Status::OK();
+  }
+
   /// Blocking operation that puts a work item on the queue. If the queue is full, blocks
-  /// until there is capacity available.
+  /// until there is capacity available. The ThreadPool must be initialized before
+  /// calling this method.
   //
   /// 'work' is copied into the work queue, but may be referenced at any time in the
   /// future. Therefore the caller needs to ensure that any data referenced by work (if T
@@ -77,6 +100,7 @@ class ThreadPool : public CacheLineAligned {
   /// (which typically means that the thread pool has already been shut down).
   template <typename V>
   bool Offer(V&& work) {
+    DCHECK(initialized_);
     return work_queue_.BlockingPut(std::forward<V>(work));
   }
 
@@ -108,6 +132,8 @@ class ThreadPool : public CacheLineAligned {
   void DrainAndShutdown() {
     {
       boost::unique_lock<boost::mutex> l(lock_);
+      // If the ThreadPool is not initialized, then the queue must be empty.
+      DCHECK(initialized_ || work_queue_.Size() == 0);
       while (work_queue_.Size() != 0) {
         empty_cv_.wait(l);
       }
@@ -141,6 +167,15 @@ class ThreadPool : public CacheLineAligned {
     return shutdown_;
   }
 
+  /// Group string to tag threads for this pool
+  const std::string group_;
+
+  /// Thread name prefix
+  const std::string thread_prefix_;
+
+  /// The number of threads to start in this pool
+  uint32_t num_threads_;
+
   /// User-supplied method to call to process each work item.
   WorkFunction work_function_;
 
@@ -148,14 +183,21 @@ class ThreadPool : public CacheLineAligned {
   /// FIFO order.
   BlockingQueue<T> work_queue_;
 
+  /// Whether this ThreadPool will tolerate failure by aborting a query. This means
+  /// it is safe to inject errors for Init().
+  bool fault_injection_eligible_;
+
   /// Collection of worker threads that process work from the queue.
   ThreadGroup threads_;
 
   /// Guards shutdown_ and empty_cv_
   boost::mutex lock_;
 
+  /// Set to true when Init() has finished spawning the threads.
+  bool initialized_ = false;
+
   /// Set to true when threads should stop doing work and terminate.
-  bool shutdown_;
+  bool shutdown_ = false;
 
   /// Signalled when the queue becomes empty
   boost::condition_variable empty_cv_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/util/thread.cc
----------------------------------------------------------------------
diff --git a/be/src/util/thread.cc b/be/src/util/thread.cc
index 0e08ab1..cd38c9a 100644
--- a/be/src/util/thread.cc
+++ b/be/src/util/thread.cc
@@ -33,6 +33,10 @@
 
 #include "common/names.h"
 
+#ifndef NDEBUG
+DECLARE_bool(thread_creation_fault_injection);
+#endif
+
 namespace this_thread = boost::this_thread;
 using namespace rapidjson;
 
@@ -286,22 +290,41 @@ void ThreadMgr::ThreadGroupUrlCallback(const Webserver::ArgumentMap& args,
   document->AddMember("threads", lst, document->GetAllocator());
 }
 
-void Thread::StartThread(const ThreadFunctor& functor) {
+Status Thread::StartThread(const std::string& category, const std::string& name,
+    const ThreadFunctor& functor, unique_ptr<Thread>* thread,
+    bool fault_injection_eligible) {
   DCHECK(thread_manager.get() != nullptr)
       << "Thread created before InitThreading called";
-  DCHECK(tid_ == UNINITIALISED_THREAD_ID) << "StartThread called twice";
+  DCHECK(thread->get() == nullptr);
+
+#ifndef NDEBUG
+  if (fault_injection_eligible && FLAGS_thread_creation_fault_injection) {
+    // Fail roughly 1% of the time on eligible codepaths.
+    if ((rand() % 100) == 1) {
+      return Status(Substitute("Fake thread creation failure (category: $0, name: $1)",
+          category, name));
+    }
+  }
+#endif
 
+  unique_ptr<Thread> t(new Thread(category, name));
   Promise<int64_t> thread_started;
-  thread_.reset(
-      new thread(&Thread::SuperviseThread, name_, category_, functor, &thread_started));
-
+  try {
+    t->thread_.reset(
+        new boost::thread(&Thread::SuperviseThread, t->name_, t->category_, functor,
+            &thread_started));
+  } catch (boost::thread_resource_error& e) {
+    return Status(TErrorCode::THREAD_CREATION_FAILED, name, category, e.what());
+  }
   // TODO: This slows down thread creation although not enormously. To make this faster,
   // consider delaying thread_started.Get() until the first call to tid(), but bear in
   // mind that some coordination is required between SuperviseThread() and this to make
   // sure that the thread is still available to have its tid set.
-  tid_ = thread_started.Get();
+  t->tid_ = thread_started.Get();
 
-  VLOG(2) << "Started thread " << tid_ << " - " << category_ << ":" << name_;
+  VLOG(2) << "Started thread " << t->tid() << " - " << category << ":" << name;
+  *thread = move(t);
+  return Status::OK();
 }
 
 void Thread::SuperviseThread(const string& name, const string& category,
@@ -330,7 +353,7 @@ void Thread::SuperviseThread(const string& name, const string& category,
   thread_mgr_ref->RemoveThread(this_thread::get_id(), category_copy);
 }
 
-void ThreadGroup::AddThread(unique_ptr<Thread> thread) {
+void ThreadGroup::AddThread(unique_ptr<Thread>&& thread) {
   threads_.emplace_back(move(thread));
 }
 


Mime
View raw message