drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [8/9] drill git commit: DRILL-2575: FragmentExecutor.cancel() blasts through state transitions regardless of current state
Date Thu, 26 Mar 2015 17:51:14 GMT
DRILL-2575: FragmentExecutor.cancel() blasts through state transitions regardless of current
state

FragmentExecutor:
- Changed cancel() to behave asynchronously, and for the cancelation request to
  be checked at an appropriate place in the run() loop.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f7388fb7
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f7388fb7
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f7388fb7

Branch: refs/heads/master
Commit: f7388fb7bb3243c46ba278bb76079e0bf88af43e
Parents: 910e278
Author: Chris Westin <cwestin@yahoo.com>
Authored: Wed Mar 25 19:05:25 2015 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Thu Mar 26 09:58:36 2015 -0700

----------------------------------------------------------------------
 .../exec/work/fragment/FragmentExecutor.java    | 65 +++++++++++++-------
 1 file changed, 43 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f7388fb7/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 5592707..a7e6c46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -47,6 +47,7 @@ public class FragmentExecutor implements Runnable {
   private final FragmentRoot rootOperator;
   private final FragmentContext fragmentContext;
   private final StatusReporter listener;
+  private volatile boolean canceled;
   private volatile boolean closed;
   private RootExec root;
 
@@ -88,15 +89,15 @@ public class FragmentExecutor implements Runnable {
   }
 
   public void cancel() {
-    logger.debug("Cancelling fragment {}", fragmentContext.getHandle());
-
-    // Note this will be called outside of run(), from another thread
-    // Change state checked by main loop to terminate it (if not already done):
-    updateState(FragmentState.CANCELLED);
-
-    fragmentContext.cancel();
-
-    logger.debug("Cancelled fragment {}", fragmentContext.getHandle());
+    /*
+     * Note that this can be called from threads *other* than the one running this runnable(),
so
+     * we need to be careful about the state transitions that can result. We set the canceled
flag,
+     * and this is checked in the run() loop, where action will be taken as soon as possible.
+     *
+     * If the run loop has already exited, because we've already either completed or failed
the query,
+     * then the request to cancel is a no-op anyway, so it doesn't matter that we won't see
the flag.
+     */
+    canceled = true;
   }
 
   public void receivingFragmentFinished(FragmentHandle handle) {
@@ -142,6 +143,23 @@ public class FragmentExecutor implements Runnable {
        * alerting the user--the behavior then is to hang.
        */
       while (state.get() == FragmentState.RUNNING_VALUE) {
+        if (canceled) {
+          logger.debug("Cancelling fragment {}", fragmentContext.getHandle());
+
+          // Change state checked by main loop to terminate it (if not already done):
+          updateState(FragmentState.CANCELLED);
+
+          fragmentContext.cancel();
+
+          logger.debug("Cancelled fragment {}", fragmentContext.getHandle());
+
+          /*
+           * The state will be altered because of the updateState(), which would cause
+           * us to fall out of the enclosing while loop; we just short-circuit that here
+           */
+          break;
+        }
+
         if (!root.next()) {
           if (fragmentContext.isFailed()) {
             internalFail(fragmentContext.getFailureCause());
@@ -180,19 +198,21 @@ public class FragmentExecutor implements Runnable {
      * be safe to call it more than once. We use this flag to bypass the body if it has
      * been called before.
      */
-    if (closed) {
-      return;
-    }
+    synchronized(this) { // synchronize for the state of closed
+      if (closed) {
+        return;
+      }
 
-    final DeferredException deferredException = fragmentContext.getDeferredException();
-    try {
-      root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
-    } catch (RuntimeException e) {
-      logger.warn(CLOSE_FAILURE, e);
-      deferredException.addException(e);
-    }
+      final DeferredException deferredException = fragmentContext.getDeferredException();
+      try {
+        root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
+      } catch (RuntimeException e) {
+        logger.warn(CLOSE_FAILURE, e);
+        deferredException.addException(e);
+      }
 
-    closed = true;
+      closed = true;
+    }
 
     /*
      * This must be last, because this may throw deferred exceptions.
@@ -221,7 +241,7 @@ public class FragmentExecutor implements Runnable {
   }
 
   /**
-   * Updates the fragment state only if the current state matches the expected.
+   * Updates the fragment state only iff the current state matches the expected.
    *
    * @param  expected  expected current state
    * @param  to  target state
@@ -258,7 +278,8 @@ public class FragmentExecutor implements Runnable {
   private boolean updateStateOrFail(final FragmentState expected, final FragmentState to)
{
     final boolean updated = checkAndUpdateState(expected, to);
     if (!updated && !isCompleted()) {
-      final String msg = "State was different than expected while attempting to update state
from %s to %s however current state was %s.";
+      final String msg = "State was different than expected while attempting to update state
from %s to %s"
+          + "however current state was %s.";
       internalFail(new StateTransitionException(
           String.format(msg, expected.name(), to.name(), FragmentState.valueOf(state.get()))));
     }


Mime
View raw message