impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sail...@apache.org
Subject [2/6] incubator-impala git commit: IMPALA-4548: BlockingJoinNode should wait for async build thread
Date Fri, 21 Apr 2017 00:40:45 GMT
IMPALA-4548: BlockingJoinNode should wait for async build thread

This is a minor clean up for the handling of the async build thread
in BlockingJoinNode. In particular, the main thread used to block on
the promise passed to the async build thread and won't proceed until
the status is set. Async build thread relies on the fragile assumption
that no other states (e.g. runtime state object) will be accessed
once the promise is set. This has proven to be fragile with the
use-after-free bug in IMPALA-4532.

This change removes the reliance on fragile assumption by making
the main thread join the async build thread before proceeding.

Change-Id: I33b07d60426cde61922b05c969ef09453ac0f342
Reviewed-on: http://gerrit.cloudera.org:8080/6664
Reviewed-by: Michael Ho <kwho@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/692c6a55
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/692c6a55
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/692c6a55

Branch: refs/heads/master
Commit: 692c6a555811a09827dd963d4bf4f1ffd0a3aad4
Parents: 8660c40
Author: Michael Ho <kwho@cloudera.com>
Authored: Mon Apr 17 14:54:53 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Thu Apr 20 21:55:57 2017 +0000

----------------------------------------------------------------------
 be/src/exec/blocking-join-node.cc | 46 +++++++++++++++-------------------
 be/src/exec/blocking-join-node.h  |  3 +--
 2 files changed, 21 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/692c6a55/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index fca2f72..8fb0756 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -144,31 +144,24 @@ void BlockingJoinNode::Close(RuntimeState* state) {
 }
 
 void BlockingJoinNode::ProcessBuildInputAsync(RuntimeState* state, DataSink* build_sink,
-    Promise<Status>* status) {
-  Status s;
-  {
-    SCOPED_THREAD_COUNTER_MEASUREMENT(state->total_thread_statistics());
-    if  (build_sink == NULL){
-      s = ProcessBuildInput(state);
-    } else {
-      s = SendBuildInputToSink<true>(state, build_sink);
-    }
-    // IMPALA-1863: If the build-side thread failed, then we need to close the right
-    // (build-side) child to avoid a potential deadlock between fragment instances.  This
-    // is safe to do because while the build may have partially completed, it will not be
-    // probed.  BlockJoinNode::Open() will return failure as soon as child(0)->Open()
-    // completes.
-    if (!s.ok()) child(1)->Close(state);
-    // Release the thread token as soon as possible (before the main thread joins
-    // on it).  This way, if we had a chain of 10 joins using 1 additional thread,
-    // we'd keep the additional thread busy the whole time.
-    state->resource_pool()->ReleaseThreadToken(false);
+    Status* status) {
+  DCHECK(status != nullptr);
+  SCOPED_THREAD_COUNTER_MEASUREMENT(state->total_thread_statistics());
+  if  (build_sink == nullptr){
+    *status = ProcessBuildInput(state);
+  } else {
+    *status = SendBuildInputToSink<true>(state, build_sink);
   }
-  // Please keep this as the last line in this function to avoid use-after-free problem.
-  // Once 'status' is set, ProcessBuildInputAndProbe() will start running and 'states'
-  // may have been freed after this line once the query completes. IMPALA-4532.
-  // TODO: Make this less fragile.
-  status->Set(s);
+  // IMPALA-1863: If the build-side thread failed, then we need to close the right
+  // (build-side) child to avoid a potential deadlock between fragment instances.  This
+  // is safe to do because while the build may have partially completed, it will not be
+  // probed.  BlockJoinNode::Open() will return failure as soon as child(0)->Open()
+  // completes.
+  if (!status->ok()) child(1)->Close(state);
+  // Release the thread token as soon as possible (before the main thread joins
+  // on it).  This way, if we had a chain of 10 joins using 1 additional thread,
+  // we'd keep the additional thread busy the whole time.
+  state->resource_pool()->ReleaseThreadToken(false);
 }
 
 Status BlockingJoinNode::Open(RuntimeState* state) {
@@ -194,7 +187,7 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
   // build side in a different thread, the overlap stops when the left child Open()
   // returns.
   if (!IsInSubplan() && state->resource_pool()->TryAcquireThreadToken()) {
-    Promise<Status> build_side_status;
+    Status build_side_status;
     runtime_profile()->AppendExecOption("Join Build-Side Prepared Asynchronously");
     Thread build_thread(
         node_name_, "build thread", bind(&BlockingJoinNode::ProcessBuildInputAsync, this,
@@ -209,7 +202,8 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
 
     // Blocks until ProcessBuildInput has returned, after which the build side structures
     // are fully constructed.
-    RETURN_IF_ERROR(build_side_status.Get());
+    build_thread.Join();
+    RETURN_IF_ERROR(build_side_status);
     RETURN_IF_ERROR(open_status);
   } else if (IsInSubplan()) {
     // When inside a subplan, open the first child before doing the build such that

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/692c6a55/be/src/exec/blocking-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index 4000b22..c184857 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -204,8 +204,7 @@ class BlockingJoinNode : public ExecNode {
   /// The main function for the thread that processes the build input asynchronously.
   /// Its status is returned in the 'status' promise. If 'build_sink' is non-NULL, it
   /// is used for the build. Otherwise, ProcessBuildInput() is called on the subclass.
-  void ProcessBuildInputAsync(RuntimeState* state, DataSink* build_sink,
-      Promise<Status>* status);
+  void ProcessBuildInputAsync(RuntimeState* state, DataSink* build_sink, Status* status);
 };
 
 }


Mime
View raw message