drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sudhe...@apache.org
Subject [1/2] drill git commit: DRILL-4766: FragmentExecutor should use EventProcessor and avoid blocking rpc threads
Date Tue, 09 Aug 2016 19:05:54 GMT
Repository: drill
Updated Branches:
  refs/heads/master 0bac42dec -> 0a4c21cc1


DRILL-4766: FragmentExecutor should use EventProcessor and avoid blocking rpc threads

closes #561


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0d1158f8
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0d1158f8
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0d1158f8

Branch: refs/heads/master
Commit: 0d1158f86c18825d0e26ed63887c49e3f38f6344
Parents: 0bac42d
Author: adeneche <adeneche@gmail.com>
Authored: Fri Jul 8 08:08:38 2016 -0700
Committer: Sudheesh Katkam <skatkam@maprtech.com>
Committed: Tue Aug 9 10:10:15 2016 -0700

----------------------------------------------------------------------
 .../exec/work/fragment/FragmentExecutor.java    | 136 ++++++++++---------
 1 file changed, 74 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/0d1158f8/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 9df128f..b167fa3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -20,14 +20,12 @@ package org.apache.drill.exec.work.fragment;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Set;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.drill.common.CatastrophicFailure;
 import org.apache.drill.common.DeferredException;
-import org.apache.drill.common.SerializedExecutor;
-import org.apache.drill.common.concurrent.ExtendedLatch;
+import org.apache.drill.common.EventProcessor;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -65,11 +63,10 @@ public class FragmentExecutor implements Runnable {
   private final DeferredException deferredException = new DeferredException();
   private final PlanFragment fragment;
   private final FragmentRoot rootOperator;
-  private final ReceiverExecutor receiverExecutor;
 
   private volatile RootExec root;
   private final AtomicReference<FragmentState> fragmentState = new AtomicReference<>(FragmentState.AWAITING_ALLOCATION);
-  private final ExtendedLatch acceptExternalEvents = new ExtendedLatch();
+  private final FragmentEventProcessor eventProcessor = new FragmentEventProcessor();
 
   // Thread that is currently executing the Fragment. Value is null if the fragment hasn't
started running or finished
   private final AtomicReference<Thread> myThreadRef = new AtomicReference<>(null);
@@ -101,7 +98,6 @@ public class FragmentExecutor implements Runnable {
     this.fragment = fragment;
     this.rootOperator = rootOperator;
     this.fragmentName = QueryIdHelper.getQueryIdentifier(context.getHandle());
-    this.receiverExecutor = new ReceiverExecutor(fragmentName, fragmentContext.getExecutor());
 
     context.setExecutorState(new ExecutorStateImpl());
   }
@@ -146,32 +142,11 @@ public class FragmentExecutor implements Runnable {
   public void cancel() {
     final boolean thisIsOnlyThread = hasCloseoutThread.compareAndSet(false, true);
 
-    if (!thisIsOnlyThread) {
-      acceptExternalEvents.awaitUninterruptibly();
-
-      /*
-       * We set the cancel requested flag but the actual cancellation is managed by the run()
loop, if called.
-       */
-      updateState(FragmentState.CANCELLATION_REQUESTED);
-
-      /*
-       * Interrupt the thread so that it exits from any blocking operation it could be executing
currently. We
-       * synchronize here to ensure we don't accidentally create a race condition where we
interrupt the close out
-       * procedure of the main thread.
-       */
-      synchronized (myThreadRef) {
-        final Thread myThread = myThreadRef.get();
-        if (myThread != null) {
-          logger.debug("Interrupting fragment thread {}", myThread.getName());
-          myThread.interrupt();
-        }
-      }
+    if (thisIsOnlyThread) {
+      eventProcessor.cancelAndFinish();
+      eventProcessor.start(); // start immediately as we are the first thread accessing this
fragment
     } else {
-      // countdown so receiver fragment finished can proceed.
-      acceptExternalEvents.countDown();
-
-      updateState(FragmentState.CANCELLATION_REQUESTED);
-      cleanup(FragmentState.FINISHED);
+      eventProcessor.cancel();
     }
   }
 
@@ -201,7 +176,7 @@ public class FragmentExecutor implements Runnable {
    * @param handle The downstream FragmentHandle of the Fragment that needs no more records
from this Fragment.
    */
   public void receivingFragmentFinished(final FragmentHandle handle) {
-    receiverExecutor.submitReceiverFinished(handle);
+    eventProcessor.receiverFinished(handle);
   }
 
   @Override
@@ -236,7 +211,7 @@ public class FragmentExecutor implements Runnable {
       clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
       updateState(FragmentState.RUNNING);
 
-      acceptExternalEvents.countDown();
+      eventProcessor.start();
       injector.injectPause(fragmentContext.getExecutionControls(), "fragment-running", logger);
 
       final DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
@@ -280,8 +255,8 @@ public class FragmentExecutor implements Runnable {
         Thread.interrupted();
       }
 
-      // We need to sure we countDown at least once. We'll do it here to guarantee that.
-      acceptExternalEvents.countDown();
+      // Make sure the event processor is started at least once
+      eventProcessor.start();
 
       // here we could be in FAILED, RUNNING, or CANCELLATION_REQUESTED
       cleanup(FragmentState.FINISHED);
@@ -472,45 +447,82 @@ public class FragmentExecutor implements Runnable {
     }
   }
 
-  private class ReceiverExecutor extends SerializedExecutor {
+  private enum EventType {
+    CANCEL,
+    CANCEL_AND_FINISH,
+    RECEIVER_FINISHED
+  }
 
-    public ReceiverExecutor(String name, Executor underlyingExecutor) {
-      super(name, underlyingExecutor);
-    }
+  private class FragmentEvent {
+    private final EventType type;
+    private final FragmentHandle handle;
 
-    @Override
-    protected void runException(Runnable command, Throwable t) {
-      logger.error("Failure running with exception of command {}", command, t);
+    FragmentEvent(EventType type, FragmentHandle handle) {
+      this.type = type;
+      this.handle = handle;
     }
+  }
+
+  /**
+   * Implementation of EventProcessor to handle fragment cancellation and early terminations
+   * without relying on a latch, thus avoiding to block the rpc control thread.<br>
+   * This is especially important as fragments can take longer to start
+   */
+  private class FragmentEventProcessor extends EventProcessor<FragmentEvent> {
 
-    public void submitReceiverFinished(FragmentHandle handle){
-      execute(new ReceiverFinished(handle));
+    void cancel() {
+      sendEvent(new FragmentEvent(EventType.CANCEL, null));
     }
-  }
 
-  private class ReceiverFinished implements Runnable {
-    final FragmentHandle handle;
+    void cancelAndFinish() {
+      sendEvent(new FragmentEvent(EventType.CANCEL_AND_FINISH, null));
+    }
 
-    public ReceiverFinished(FragmentHandle handle) {
-      super();
-      this.handle = handle;
+    void receiverFinished(FragmentHandle handle) {
+      sendEvent(new FragmentEvent(EventType.RECEIVER_FINISHED, handle));
     }
 
     @Override
-    public void run() {
-      acceptExternalEvents.awaitUninterruptibly();
+    protected void processEvent(FragmentEvent event) {
+      switch (event.type) {
+        case CANCEL:
+          /*
+           * We set the cancel requested flag but the actual cancellation is managed by the
run() loop, if called.
+           */
+          updateState(FragmentState.CANCELLATION_REQUESTED);
 
-      if (root != null) {
-        logger.info("Applying request for early sender termination for {} -> {}.",
-            QueryIdHelper.getFragmentId(getContext().getHandle()), QueryIdHelper.getFragmentId(handle));
-        root.receivingFragmentFinished(handle);
-      } else {
-        logger.warn("Dropping request for early fragment termination for path {} -> {}
as no root exec exists.",
-            QueryIdHelper.getFragmentId(getContext().getHandle()), QueryIdHelper.getFragmentId(handle));
+          /*
+           * Interrupt the thread so that it exits from any blocking operation it could be
executing currently. We
+           * synchronize here to ensure we don't accidentally create a race condition where
we interrupt the close out
+           * procedure of the main thread.
+          */
+          synchronized (myThreadRef) {
+            final Thread myThread = myThreadRef.get();
+            if (myThread != null) {
+              logger.debug("Interrupting fragment thread {}", myThread.getName());
+              myThread.interrupt();
+            }
+          }
+          break;
+
+        case CANCEL_AND_FINISH:
+          updateState(FragmentState.CANCELLATION_REQUESTED);
+          cleanup(FragmentState.FINISHED);
+          break;
+
+        case RECEIVER_FINISHED:
+          assert event.handle != null : "RECEIVER_FINISHED event must have a handle";
+          if (root != null) {
+            logger.info("Applying request for early sender termination for {} -> {}.",
+              QueryIdHelper.getQueryIdentifier(getContext().getHandle()),
+              QueryIdHelper.getFragmentId(event.handle));
+            root.receivingFragmentFinished(event.handle);
+          } else {
+            logger.warn("Dropping request for early fragment termination for path {} ->
{} as no root exec exists.",
+              QueryIdHelper.getFragmentId(getContext().getHandle()), QueryIdHelper.getFragmentId(event.handle));
+          }
+          break;
       }
-
     }
-
   }
-
 }


Mime
View raw message