impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbap...@apache.org
Subject [7/8] incubator-impala git commit: IMPALA-5750: Catch exceptions from boost thread creation
Date Thu, 07 Sep 2017 03:50:25 GMT
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/util/thread.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread.h b/be/src/util/thread.h
index 18f3a75..6898517 100644
--- a/be/src/util/thread.h
+++ b/be/src/util/thread.h
@@ -48,10 +48,10 @@ class Webserver;
 /// TODO: Consider allowing fragment IDs as category parameters.
 class Thread {
  public:
-  /// This constructor pattern mimics that in boost::thread. There is
-  /// one constructor for each number of arguments that the thread
+  /// This static Create method pattern mimics that in the boost::thread constructors.
+  /// There is one static Create method for each number of arguments that the thread
   /// function accepts. To extend the set of acceptable signatures, add
-  /// another constructor with <class F, class A1.... class An>.
+  /// another static Create method with <class F, class A1.... class An>.
   //
   /// In general:
   ///  - category: string identifying the thread category to which this thread belongs,
@@ -61,44 +61,56 @@ class Thread {
   ///  - F - a method type that supports operator(), and the instance passed to the
   ///    constructor is executed immediately in a separate thread.
   ///  - A1...An - argument types whose instances are passed to f(...)
+  ///  - thread - unique_ptr<Thread>* to reset with the created Thread.
+  ///  - fault_injection_eligible - If set to true, allow fault injection at this
+  ///    callsite (see thread_creation_fault_injection). If set to false, fault
+  ///    injection is diabled at this callsite. Thread creation sites that crash
+  ///    Impala or abort startup must have this set to false.
   template <class F>
-  Thread(const std::string& category, const std::string& name, const F& f)
-      : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {
-    StartThread(f);
+  static Status Create(const std::string& category, const std::string& name,
+      const F& f, std::unique_ptr<Thread>* thread,
+      bool fault_injection_eligible = false) {
+    return StartThread(category, name, f, thread, fault_injection_eligible);
   }
 
   template <class F, class A1>
-  Thread(const std::string& category, const std::string& name, const F& f, const
A1& a1)
-      : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {
-    StartThread(boost::bind(f, a1));
+  static Status Create(const std::string& category, const std::string& name,
+      const F& f, const A1& a1, std::unique_ptr<Thread>* thread,
+      bool fault_injection_eligible = false) {
+    return StartThread(category, name, boost::bind(f, a1), thread,
+        fault_injection_eligible);
   }
 
   template <class F, class A1, class A2>
-  Thread(const std::string& category, const std::string& name, const F& f,
-      const A1& a1, const A2& a2)
-      : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {
-    StartThread(boost::bind(f, a1, a2));
+  static Status Create(const std::string& category, const std::string& name,
+      const F& f, const A1& a1, const A2& a2, std::unique_ptr<Thread>*
thread,
+      bool fault_injection_eligible = false) {
+    return StartThread(category, name, boost::bind(f, a1, a2), thread,
+        fault_injection_eligible);
   }
 
   template <class F, class A1, class A2, class A3>
-  Thread(const std::string& category, const std::string& name, const F& f,
-      const A1& a1, const A2& a2, const A3& a3)
-      : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {
-    StartThread(boost::bind(f, a1, a2, a3));
+  static Status Create(const std::string& category, const std::string& name,
+      const F& f, const A1& a1, const A2& a2, const A3& a3,
+      std::unique_ptr<Thread>* thread, bool fault_injection_eligible = false) {
+    return StartThread(category, name, boost::bind(f, a1, a2, a3), thread,
+        fault_injection_eligible);
   }
 
   template <class F, class A1, class A2, class A3, class A4>
-  Thread(const std::string& category, const std::string& name, const F& f,
-      const A1& a1, const A2& a2, const A3& a3, const A4& a4)
-      : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {
-    StartThread(boost::bind(f, a1, a2, a3, a4));
+  static Status Create(const std::string& category, const std::string& name,
+      const F& f, const A1& a1, const A2& a2, const A3& a3, const A4&
a4,
+      std::unique_ptr<Thread>* thread, bool fault_injection_eligible = false) {
+    return StartThread(category, name, boost::bind(f, a1, a2, a3, a4), thread,
+        fault_injection_eligible);
   }
 
   template <class F, class A1, class A2, class A3, class A4, class A5>
-  Thread(const std::string& category, const std::string& name, const F& f,
-      const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5&
a5)
-      : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {
-    StartThread(boost::bind(f, a1, a2, a3, a4, a5));
+  static Status Create(const std::string& category, const std::string& name,
+      const F& f, const A1& a1, const A2& a2, const A3& a3, const A4&
a4, const A5& a5,
+      std::unique_ptr<Thread>* thread, bool fault_injection_eligible = false) {
+    return StartThread(category, name, boost::bind(f, a1, a2, a3, a4, a5), thread,
+        fault_injection_eligible);
   }
 
   /// Blocks until this thread finishes execution. Once this method returns, the thread
@@ -117,6 +129,9 @@ class Thread {
   static const int64_t INVALID_THREAD_ID = -1;
 
  private:
+  Thread(const std::string& category, const std::string& name)
+    : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {}
+
   /// To distinguish between a thread ID that can't be determined, and one that hasn't
   /// been assigned. Since tid_ is set in the constructor, this value will never be seen
   /// by clients of this class.
@@ -137,10 +152,13 @@ class Thread {
   /// non-negative integer, or INVALID_THREAD_ID.
   int64_t tid_;
 
-  /// Starts the thread running SuperviseThread(), and returns once that thread has
-  /// initialised and its TID read. Waits for notification from the started thread that
-  /// initialisation is complete before returning.
-  void StartThread(const ThreadFunctor& functor);
+  /// Creates a new thread and starts the thread running SuperviseThread(). It waits
+  /// for notification from the started thread that initialisation is complete and
+  /// the TID read before returning. This will return an error if thread create fails.
+  /// In the event of success, 'thread' will be set to the created Thread.
+  static Status StartThread(const std::string& category, const std::string& name,
+      const ThreadFunctor& functor, std::unique_ptr<Thread>* thread,
+      bool fault_injection_eligible) WARN_UNUSED_RESULT;
 
   /// Wrapper for the user-supplied function. Always invoked from thread_. Executes the
   /// method in functor_, but before doing so registers with the global ThreadMgr and
@@ -175,7 +193,7 @@ class ThreadGroup {
   /// will destroy it when the ThreadGroup is destroyed.  Threads will linger until that
   /// point (even if terminated), however, so callers should be mindful of the cost of
   /// placing very many threads in this set.
-  void AddThread(std::unique_ptr<Thread> thread);
+  void AddThread(std::unique_ptr<Thread>&& thread);
 
   /// Waits for all threads to finish. DO NOT call this from a thread inside this set;
   /// deadlock will predictably ensue.
@@ -196,7 +214,7 @@ void InitThreading();
 /// the "thread-manager." If 'include_jvm_threads' is true, shows information about
 /// live JVM threads in the web UI.
 Status StartThreadInstrumentation(MetricGroup* metrics, Webserver* webserver,
-    bool include_jvm_threads);
+    bool include_jvm_threads) WARN_UNUSED_RESULT;
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 393daba..c08268e 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -331,6 +331,8 @@ error_codes = (
 
   ("ADMISSION_TIMED_OUT", 108, "Admission for query exceeded timeout $0ms in pool $1. "
      "Queued reason: $2"),
+
+  ("THREAD_CREATION_FAILED", 109, "Failed to create thread $0 in category $1: $2"),
 )
 
 import sys


Mime
View raw message