drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prog...@apache.org
Subject [1/2] drill git commit: DRILL-5721: Query with only root fragment and no non-root fragment hangs when Drillbit to Drillbit Control Connection has network issues Note: 1) To resolve the issue all the fragments including root fragment which are
Date Mon, 25 Sep 2017 05:16:50 GMT
Repository: drill
Updated Branches:
  refs/heads/master d77ab3183 -> 6cb626d78


DRILL-5721: Query with only root fragment and no non-root fragment hangs when Drillbit to Drillbit Control Connection has network issues
            Note: 1) To resolve the issue all the fragments including root fragment which are assigned to be executed on Foreman node
                     are scheduled locally and not sent over Control Tunnel. Also the FragmentStatusReporter is updated to sent the
                     status update locally by fragments running on Foreman node.
                  2) Refactor for FragmentManager, setupRootFragment and startNewFragment
                  3) Update the test added for DRILL-5701 as there is change in behavior


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b06a7bde
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b06a7bde
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b06a7bde

Branch: refs/heads/master
Commit: b06a7bdecef8bdd52eebf4a2821a09c64b296886
Parents: d77ab31
Author: Sorabh Hamirwasia <shamirwasia@maprtech.com>
Authored: Thu Aug 17 16:48:37 2017 -0700
Committer: Paul Rogers <progers@maprtech.com>
Committed: Sun Sep 24 21:34:14 2017 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/ops/FragmentContext.java  |  30 ++-
 .../org/apache/drill/exec/work/WorkManager.java |  51 ++--
 .../exec/work/batch/ControlMessageHandler.java  |  35 ++-
 .../apache/drill/exec/work/foreman/Foreman.java | 238 ++++++++++++-------
 .../work/fragment/AbstractFragmentManager.java  |  99 ++++++++
 .../exec/work/fragment/FragmentExecutor.java    |   5 +-
 .../work/fragment/FragmentStatusReporter.java   |  77 +++---
 .../work/fragment/NonRootFragmentManager.java   |  78 +-----
 .../exec/work/fragment/RootFragmentManager.java |  67 +-----
 .../drill/exec/rpc/data/TestBitBitKerberos.java |  97 ++++++++
 .../security/TestUserBitKerberosEncryption.java |  14 +-
 11 files changed, 501 insertions(+), 290 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index badf70c..19ffca2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -17,14 +17,12 @@
  */
 package org.apache.drill.exec.ops;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import io.netty.buffer.DrillBuf;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executor;
-
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -47,8 +45,9 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
-import org.apache.drill.exec.rpc.control.ControlTunnel;
 import org.apache.drill.exec.rpc.UserClientConnection;
+import org.apache.drill.exec.rpc.control.ControlTunnel;
+import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.FragmentOptionManager;
 import org.apache.drill.exec.server.options.OptionList;
@@ -60,10 +59,10 @@ import org.apache.drill.exec.testing.ExecutionControls;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Contextual objects required for execution of a particular fragment.
@@ -486,6 +485,15 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe
     sendingAccountor.waitForSendComplete();
   }
 
+  public WorkEventBus getWorkEventbus() {
+    return context.getWorkBus();
+  }
+
+  public boolean isBuffersDone() {
+    Preconditions.checkState(this.buffers != null, "Incoming Buffers is not set in this fragment context");
+    return buffers.isDone();
+  }
+
   public interface ExecutorState {
     /**
      * Whether execution should continue.

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 2d37b8c..800d3a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -17,13 +17,10 @@
  */
 package org.apache.drill.exec.work;
 
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-
-import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.drill.common.SelfCleaningRunnable;
 import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.exec.coord.ClusterCoordinator;
@@ -49,10 +46,11 @@ import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.apache.drill.exec.work.fragment.FragmentManager;
 import org.apache.drill.exec.work.user.UserWorker;
 
-import com.codahale.metrics.Gauge;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 
 /**
  * Manages the running fragments in a Drillbit. Periodically requests run-time stats updates from fragments
@@ -294,6 +292,9 @@ public class WorkManager implements AutoCloseable {
    * about RUNNING queries, such as current memory consumption, number of rows processed, and so on.
    * The FragmentStatusListener only tracks changes to state, so the statistics kept there will be
    * stale; this thread probes for current values.
+   *
+   * For each running fragment if the Foreman is the local Drillbit then status is updated locally bypassing the Control
+   * Tunnel, whereas for remote Foreman it is sent over the Control Tunnel.
    */
   private class StatusThread extends Thread {
     public StatusThread() {
@@ -303,30 +304,42 @@ public class WorkManager implements AutoCloseable {
 
     @Override
     public void run() {
-      while(true) {
-        final Controller controller = dContext.getController();
+
+      // Get the controller and localBitEndPoint outside the loop since these will not change once a Drillbit and
+      // StatusThread is started
+      final Controller controller = dContext.getController();
+      final DrillbitEndpoint localBitEndPoint = dContext.getEndpoint();
+
+      while (true) {
         final List<DrillRpcFuture<Ack>> futures = Lists.newArrayList();
-        for(final FragmentExecutor fragmentExecutor : runningFragments.values()) {
+        for (final FragmentExecutor fragmentExecutor : runningFragments.values()) {
           final FragmentStatus status = fragmentExecutor.getStatus();
           if (status == null) {
             continue;
           }
 
-          final DrillbitEndpoint ep = fragmentExecutor.getContext().getForemanEndpoint();
-          futures.add(controller.getTunnel(ep).sendFragmentStatus(status));
+          final DrillbitEndpoint foremanEndpoint = fragmentExecutor.getContext().getForemanEndpoint();
+
+          // If local endpoint is the Foreman for this running fragment, then submit the status locally bypassing the
+          // Control Tunnel
+          if (localBitEndPoint.equals(foremanEndpoint)) {
+            workBus.statusUpdate(status);
+          } else { // else send the status to remote Foreman over Control Tunnel
+            futures.add(controller.getTunnel(foremanEndpoint).sendFragmentStatus(status));
+          }
         }
 
-        for(final DrillRpcFuture<Ack> future : futures) {
+        for (final DrillRpcFuture<Ack> future : futures) {
           try {
             future.checkedGet();
-          } catch(final RpcException ex) {
+          } catch (final RpcException ex) {
             logger.info("Failure while sending intermediate fragment status to Foreman", ex);
           }
         }
 
         try {
           Thread.sleep(STATUS_PERIOD_SECONDS * 1000);
-        } catch(final InterruptedException e) {
+        } catch (final InterruptedException e) {
           // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
           // interruption and respond to it if it wants to.
           Thread.currentThread().interrupt();

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index 58c1df5..2bbaf1b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -17,10 +17,9 @@
  */
 package org.apache.drill.exec.work.batch;
 
-import static org.apache.drill.exec.rpc.RpcBus.get;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
-
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.CustomMessage;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
@@ -42,7 +41,6 @@ import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.UserRpcException;
 import org.apache.drill.exec.rpc.control.ControlConnection;
 import org.apache.drill.exec.rpc.control.ControlRpcConfig;
-import org.apache.drill.exec.rpc.control.ControlTunnel;
 import org.apache.drill.exec.rpc.control.CustomHandlerRegistry;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
@@ -52,6 +50,8 @@ import org.apache.drill.exec.work.fragment.FragmentManager;
 import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
 import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
 
+import static org.apache.drill.exec.rpc.RpcBus.get;
+
 public class ControlMessageHandler implements RequestHandler<ControlConnection> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlMessageHandler.class);
   private final WorkerBee bee;
@@ -110,8 +110,9 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection>
 
     case RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: {
       final InitializeFragments fragments = get(pBody, InitializeFragments.PARSER);
+      final DrillbitContext drillbitContext = bee.getContext();
       for(int i = 0; i < fragments.getFragmentCount(); i++) {
-        startNewRemoteFragment(fragments.getFragment(i));
+        startNewFragment(fragments.getFragment(i), drillbitContext);
       }
       sender.send(ControlRpcConfig.OK);
       break;
@@ -140,25 +141,33 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection>
     }
   }
 
-  private void startNewRemoteFragment(final PlanFragment fragment) throws UserRpcException {
+  /**
+   * Start a new fragment on this node. These fragments can be leaf or intermediate fragments
+   * which are scheduled by remote or local Foreman node.
+   * @param fragment
+   * @throws UserRpcException
+   */
+  private void startNewFragment(final PlanFragment fragment, final DrillbitContext drillbitContext)
+      throws UserRpcException {
     logger.debug("Received remote fragment start instruction", fragment);
 
-    final DrillbitContext drillbitContext = bee.getContext();
     try {
+      final FragmentContext fragmentContext = new FragmentContext(drillbitContext, fragment,
+          drillbitContext.getFunctionImplementationRegistry());
+      final FragmentStatusReporter statusReporter = new FragmentStatusReporter(fragmentContext);
+      final FragmentExecutor fragmentExecutor = new FragmentExecutor(fragmentContext, fragment, statusReporter);
+
       // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
       if (fragment.getLeafFragment()) {
-        final FragmentContext context = new FragmentContext(drillbitContext, fragment,
-            drillbitContext.getFunctionImplementationRegistry());
-        final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
-        final FragmentStatusReporter statusReporter = new FragmentStatusReporter(context, tunnel);
-        final FragmentExecutor fr = new FragmentExecutor(context, fragment, statusReporter);
-        bee.addFragmentRunner(fr);
+        bee.addFragmentRunner(fragmentExecutor);
       } else {
         // isIntermediate, store for incoming data.
-        final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, drillbitContext);
+        final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, fragmentExecutor, statusReporter);
         drillbitContext.getWorkBus().addFragmentManager(manager);
       }
 
+    } catch (final ExecutionSetupException ex) {
+      throw new UserRpcException(drillbitContext.getEndpoint(), "Failed to create fragment context", ex);
     } catch (final Exception e) {
         throw new UserRpcException(drillbitContext.getEndpoint(),
             "Failure while trying to start remote fragment", e);

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 8144da1..d01d8fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -18,20 +18,16 @@
 package org.apache.drill.exec.work.foreman;
 
 import com.codahale.metrics.Counter;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.protobuf.InvalidProtocolBufferException;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Date;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.drill.common.CatastrophicFailure;
 import org.apache.drill.common.EventProcessor;
 import org.apache.drill.common.concurrent.ExtendedLatch;
@@ -71,9 +67,8 @@ import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.control.ControlTunnel;
-import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.UserClientConnection;
+import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.testing.ControlsInjector;
@@ -83,18 +78,21 @@ import org.apache.drill.exec.util.Pointer;
 import org.apache.drill.exec.work.EndpointListener;
 import org.apache.drill.exec.work.QueryWorkUnit;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.batch.IncomingBuffers;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
+import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
 import org.apache.drill.exec.work.fragment.RootFragmentManager;
 import org.codehaus.jackson.map.ObjectMapper;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Foreman manages all the fragments (local and remote) for a single query where this
@@ -1073,24 +1071,18 @@ public class Foreman implements Runnable {
    */
   private void setupRootFragment(final PlanFragment rootFragment, final FragmentRoot rootOperator)
       throws ExecutionSetupException {
-    @SuppressWarnings("resource")
     final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, queryContext,
         initiatingClient, drillbitContext.getFunctionImplementationRegistry());
-    @SuppressWarnings("resource")
-    final IncomingBuffers buffers = new IncomingBuffers(rootFragment, rootContext);
-    rootContext.setBuffers(buffers);
+    final FragmentStatusReporter statusReporter = new FragmentStatusReporter(rootContext);
+    final FragmentExecutor rootRunner = new FragmentExecutor(rootContext, rootFragment, statusReporter, rootOperator);
+    final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment, rootRunner, statusReporter);
 
     queryManager.addFragmentStatusTracker(rootFragment, true);
 
-    final ControlTunnel tunnel = drillbitContext.getController().getTunnel(queryContext.getCurrentEndpoint());
-    final FragmentExecutor rootRunner = new FragmentExecutor(rootContext, rootFragment,
-        new FragmentStatusReporter(rootContext, tunnel),
-        rootOperator);
-    final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
-
-    if (buffers.isDone()) {
+    // FragmentManager is setting buffer for FragmentContext
+    if (rootContext.isBuffersDone()) {
       // if we don't have to wait for any incoming data, start the fragment runner.
-      bee.addFragmentRunner(fragmentManager.getRunnable());
+      bee.addFragmentRunner(rootRunner);
     } else {
       // if we do, record the fragment manager in the workBus.
       drillbitContext.getWorkBus().addFragmentManager(fragmentManager);
@@ -1098,69 +1090,54 @@ public class Foreman implements Runnable {
   }
 
   /**
-   * Set up the non-root fragments for execution. Some may be local, and some may be remote.
-   * Messages are sent immediately, so they may start returning data even before we complete this.
-   *
-   * @param fragments the fragments
-   * @throws ForemanException
+   * Add planFragment into either of local fragment list or remote fragment map based on assigned Drillbit Endpoint node
+   * and the local Drillbit Endpoint.
+   * @param planFragment
+   * @param localEndPoint
+   * @param localFragmentList
+   * @param remoteFragmentMap
    */
-  private void setupNonRootFragments(final Collection<PlanFragment> fragments) throws ForemanException {
-    if (fragments.isEmpty()) {
-      // nothing to do here
-      return;
-    }
-    /*
-     * We will send a single message to each endpoint, regardless of how many fragments will be
-     * executed there. We need to start up the intermediate fragments first so that they will be
-     * ready once the leaf fragments start producing data. To satisfy both of these, we will
-     * make a pass through the fragments and put them into these two maps according to their
-     * leaf/intermediate state, as well as their target drillbit.
-     */
-    final Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create();
-    final Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create();
+  private void updateFragmentCollection(final PlanFragment planFragment, final DrillbitEndpoint localEndPoint,
+                                        final List<PlanFragment> localFragmentList,
+                                        final Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) {
+    final DrillbitEndpoint assignedDrillbit = planFragment.getAssignment();
 
-    // record all fragments for status purposes.
-    for (final PlanFragment planFragment : fragments) {
-      logger.trace("Tracking intermediate remote node {} with data {}",
-                   planFragment.getAssignment(), planFragment.getFragmentJson());
-      queryManager.addFragmentStatusTracker(planFragment, false);
-      if (planFragment.getLeafFragment()) {
-        leafFragmentMap.put(planFragment.getAssignment(), planFragment);
-      } else {
-        intFragmentMap.put(planFragment.getAssignment(), planFragment);
-      }
+    if (assignedDrillbit.equals(localEndPoint)) {
+      localFragmentList.add(planFragment);
+    } else {
+      remoteFragmentMap.put(assignedDrillbit, planFragment);
     }
+  }
 
-    /*
-     * We need to wait for the intermediates to be sent so that they'll be set up by the time
-     * the leaves start producing data. We'll use this latch to wait for the responses.
-     *
-     * However, in order not to hang the process if any of the RPC requests fails, we always
-     * count down (see FragmentSubmitFailures), but we count the number of failures so that we'll
-     * know if any submissions did fail.
-     */
-    final int numIntFragments = intFragmentMap.keySet().size();
+  /**
+   * Send remote intermediate fragment to the assigned Drillbit node. Throw exception in case of failure to send the
+   * fragment.
+   * @param remoteFragmentMap - Map of Drillbit Endpoint to list of PlanFragment's
+   */
+  private void scheduleRemoteIntermediateFragments(final Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) {
+
+    final int numIntFragments = remoteFragmentMap.keySet().size();
     final ExtendedLatch endpointLatch = new ExtendedLatch(numIntFragments);
     final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures();
 
     // send remote intermediate fragments
-    for (final DrillbitEndpoint ep : intFragmentMap.keySet()) {
-      sendRemoteFragments(ep, intFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures);
+    for (final DrillbitEndpoint ep : remoteFragmentMap.keySet()) {
+      sendRemoteFragments(ep, remoteFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures);
     }
 
     final long timeout = RPC_WAIT_IN_MSECS_PER_FRAGMENT * numIntFragments;
-    if(numIntFragments > 0 && !endpointLatch.awaitUninterruptibly(timeout)){
+    if (numIntFragments > 0 && !endpointLatch.awaitUninterruptibly(timeout)) {
       long numberRemaining = endpointLatch.getCount();
       throw UserException.connectionError()
-          .message(
-              "Exceeded timeout (%d) while waiting send intermediate work fragments to remote nodes. " +
-                  "Sent %d and only heard response back from %d nodes.",
-              timeout, numIntFragments, numIntFragments - numberRemaining)
-          .build(logger);
+          .message("Exceeded timeout (%d) while waiting send intermediate work fragments to remote nodes. " +
+              "Sent %d and only heard response back from %d nodes.",
+              timeout, numIntFragments, numIntFragments - numberRemaining).build(logger);
     }
 
     // if any of the intermediate fragment submissions failed, fail the query
-    final List<FragmentSubmitFailures.SubmissionException> submissionExceptions = fragmentSubmitFailures.submissionExceptions;
+    final List<FragmentSubmitFailures.SubmissionException> submissionExceptions =
+        fragmentSubmitFailures.submissionExceptions;
+
     if (submissionExceptions.size() > 0) {
       Set<DrillbitEndpoint> endpoints = Sets.newHashSet();
       StringBuilder sb = new StringBuilder();
@@ -1179,8 +1156,100 @@ public class Foreman implements Runnable {
       }
       throw UserException.connectionError(submissionExceptions.get(0).rpcException)
           .message("Error setting up remote intermediate fragment execution")
-          .addContext("Nodes with failures", sb.toString())
-          .build(logger);
+          .addContext("Nodes with failures", sb.toString()).build(logger);
+    }
+  }
+
+
+  /**
+   * Start the locally assigned leaf or intermediate fragment
+   * @param fragment
+   * @throws ForemanException
+   */
+  private void startLocalFragment(final PlanFragment fragment) throws ForemanException {
+
+    logger.debug("Received local fragment start instruction", fragment);
+
+    try {
+      final FragmentContext fragmentContext = new FragmentContext(drillbitContext, fragment,
+          drillbitContext.getFunctionImplementationRegistry());
+      final FragmentStatusReporter statusReporter = new FragmentStatusReporter(fragmentContext);
+      final FragmentExecutor fragmentExecutor = new FragmentExecutor(fragmentContext, fragment, statusReporter);
+
+      // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
+      if (fragment.getLeafFragment()) {
+        bee.addFragmentRunner(fragmentExecutor);
+      } else {
+        // isIntermediate, store for incoming data.
+        final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, fragmentExecutor, statusReporter);
+        drillbitContext.getWorkBus().addFragmentManager(manager);
+      }
+
+    } catch (final ExecutionSetupException ex) {
+      throw new ForemanException("Failed to create fragment context", ex);
+    } catch (final Exception ex) {
+      throw new ForemanException("Failed while trying to start local fragment", ex);
+    }
+  }
+
+  /**
+   * Set up the non-root fragments for execution. Some may be local, and some may be remote.
+   * Messages are sent immediately, so they may start returning data even before we complete this.
+   *
+   * @param fragments the fragments
+   * @throws ForemanException
+   */
+  private void setupNonRootFragments(final Collection<PlanFragment> fragments) throws ForemanException {
+    if (fragments.isEmpty()) {
+      // nothing to do here
+      return;
+    }
+    /*
+     * We will send a single message to each endpoint, regardless of how many fragments will be
+     * executed there. We need to start up the intermediate fragments first so that they will be
+     * ready once the leaf fragments start producing data. To satisfy both of these, we will
+     * make a pass through the fragments and put them into the remote maps according to their
+     * leaf/intermediate state, as well as their target drillbit. Also filter the leaf/intermediate
+     * fragments which are assigned to run on local Drillbit node (or Foreman node) into separate lists.
+     *
+     * This will help to schedule local
+     */
+    final Multimap<DrillbitEndpoint, PlanFragment> remoteLeafFragmentMap = ArrayListMultimap.create();
+    final List<PlanFragment> localLeafFragmentList = new ArrayList<>();
+    final Multimap<DrillbitEndpoint, PlanFragment> remoteIntFragmentMap = ArrayListMultimap.create();
+    final List<PlanFragment> localIntFragmentList = new ArrayList<>();
+
+    final DrillbitEndpoint localDrillbitEndpoint = drillbitContext.getEndpoint();
+    // record all fragments for status purposes.
+    for (final PlanFragment planFragment : fragments) {
+
+      if (logger.isTraceEnabled()) {
+        logger.trace("Tracking intermediate remote node {} with data {}", planFragment.getAssignment(),
+            planFragment.getFragmentJson());
+      }
+
+      queryManager.addFragmentStatusTracker(planFragment, false);
+
+      if (planFragment.getLeafFragment()) {
+        updateFragmentCollection(planFragment, localDrillbitEndpoint, localLeafFragmentList, remoteLeafFragmentMap);
+      } else {
+        updateFragmentCollection(planFragment, localDrillbitEndpoint, localIntFragmentList, remoteIntFragmentMap);
+      }
+    }
+
+    /*
+     * We need to wait for the intermediates to be sent so that they'll be set up by the time
+     * the leaves start producing data. We'll use this latch to wait for the responses.
+     *
+     * However, in order not to hang the process if any of the RPC requests fails, we always
+     * count down (see FragmentSubmitFailures), but we count the number of failures so that we'll
+     * know if any submissions did fail.
+     */
+    scheduleRemoteIntermediateFragments(remoteIntFragmentMap);
+
+    // Setup local intermediate fragments
+    for (final PlanFragment fragment : localIntFragmentList) {
+      startLocalFragment(fragment);
     }
 
     injector.injectChecked(queryContext.getExecutionControls(), "send-fragments", ForemanException.class);
@@ -1188,8 +1257,13 @@ public class Foreman implements Runnable {
      * Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through
      * the regular sendListener event delivery.
      */
-    for (final DrillbitEndpoint ep : leafFragmentMap.keySet()) {
-      sendRemoteFragments(ep, leafFragmentMap.get(ep), null, null);
+    for (final DrillbitEndpoint ep : remoteLeafFragmentMap.keySet()) {
+      sendRemoteFragments(ep, remoteLeafFragmentMap.get(ep), null, null);
+    }
+
+    // Setup local leaf fragments
+    for (final PlanFragment fragment : localLeafFragmentList) {
+      startLocalFragment(fragment);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractFragmentManager.java
new file mode 100644
index 0000000..f427a84
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractFragmentManager.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.fragment;
+
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.rpc.data.IncomingDataBatch;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
+
+import java.io.IOException;
+
+public abstract class AbstractFragmentManager implements FragmentManager {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractFragmentManager.class);
+
+
+  protected final IncomingBuffers buffers;
+
+  protected final FragmentExecutor fragmentExecutor;
+
+  protected final FragmentHandle fragmentHandle;
+
+  protected final FragmentContext fragmentContext;
+
+  protected volatile boolean cancel = false;
+
+
+  public AbstractFragmentManager(final PlanFragment fragment, final FragmentExecutor executor, final FragmentStatusReporter statusReporter, final FragmentRoot rootOperator) {
+    this.fragmentHandle = fragment.getHandle();
+    this.fragmentContext = executor.getContext();
+    this.buffers = new IncomingBuffers(fragment, fragmentContext);
+    this.fragmentContext.setBuffers(buffers);
+    this.fragmentExecutor = executor;
+  }
+
+  public AbstractFragmentManager(final PlanFragment fragment, final FragmentExecutor executor, final FragmentStatusReporter statusReporter) {
+    this(fragment, executor, statusReporter, null);
+  }
+
+  @Override
+  public boolean handle(final IncomingDataBatch batch) throws FragmentSetupException, IOException {
+    return buffers.batchArrived(batch);
+  }
+
+  @Override
+  public boolean isCancelled() {
+    return cancel;
+  }
+
+  @Override
+  public void unpause() {
+    fragmentExecutor.unpause();
+  }
+
+  @Override
+  public FragmentHandle getHandle() {
+    return fragmentHandle;
+  }
+
+  @Override
+  public boolean isWaiting() {
+    return !buffers.isDone() && !cancel;
+  }
+
+  @Override
+  public FragmentContext getFragmentContext() {
+    return fragmentContext;
+  }
+
+  @Override
+  public FragmentExecutor getRunnable() {
+    return fragmentExecutor;
+  }
+
+  public abstract void receivingFragmentFinished(final FragmentHandle handle);
+
+  @Override
+  public void cancel() {
+    cancel = true;
+    fragmentExecutor.cancel();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 258e485..e97a382 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -92,7 +92,7 @@ public class FragmentExecutor implements Runnable {
    * @param rootOperator
    */
   public FragmentExecutor(final FragmentContext context, final PlanFragment fragment,
-      final FragmentStatusReporter statusReporter, final FragmentRoot rootOperator) {
+                          final FragmentStatusReporter statusReporter, final FragmentRoot rootOperator) {
     this.fragmentContext = context;
     this.statusReporter = statusReporter;
     this.fragment = fragment;
@@ -261,6 +261,9 @@ public class FragmentExecutor implements Runnable {
       eventProcessor.start();
 
       // here we could be in FAILED, RUNNING, or CANCELLATION_REQUESTED
+      // FAILED state will be because of any Exception in execution loop root.next()
+      // CANCELLATION_REQUESTED because of a CANCEL request received by Foreman.
+      // ELSE will be in FINISHED state.
       cleanup(FragmentState.FINISHED);
 
       clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
index e37435c..c0ecb07 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
@@ -21,27 +21,32 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared.FragmentState;
 import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.control.ControlTunnel;
+import org.apache.drill.exec.server.Drillbit;
 
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * The status reporter is responsible for receiving changes in fragment state and propagating the status back to the
- * Foreman through a control tunnel.
+ * Foreman either through a control tunnel or locally.
  */
 public class FragmentStatusReporter implements AutoCloseable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatusReporter.class);
 
-  private final FragmentContext context;
-  private final AtomicReference<ControlTunnel> tunnel;
+  protected final FragmentContext context;
 
-  public FragmentStatusReporter(final FragmentContext context, final ControlTunnel tunnel) {
+  protected final AtomicReference<DrillbitEndpoint> foremanDrillbit;
+
+  protected final DrillbitEndpoint localDrillbit;
+
+  public FragmentStatusReporter(final FragmentContext context) {
     this.context = context;
-    this.tunnel = new AtomicReference<>(tunnel);
+    this.foremanDrillbit = new AtomicReference<>(context.getForemanEndpoint());
+    this.localDrillbit = context.getIdentity();
   }
 
   /**
@@ -82,29 +87,47 @@ public class FragmentStatusReporter implements AutoCloseable {
     final FragmentStatus status = getStatus(newState, null);
     logger.info("{}: State to report: {}", QueryIdHelper.getQueryIdentifier(context.getHandle()), newState);
     switch (newState) {
-    case AWAITING_ALLOCATION:
-    case CANCELLATION_REQUESTED:
-    case CANCELLED:
-    case FINISHED:
-    case RUNNING:
-      sendStatus(status);
-      break;
-    case SENDING:
-      // no op.
-      break;
-    case FAILED:
-      // shouldn't get here since fail() should be called.
-    default:
-      throw new IllegalStateException(String.format("Received state changed event for unexpected state of %s.", newState));
+      case AWAITING_ALLOCATION:
+      case CANCELLATION_REQUESTED:
+      case CANCELLED:
+      case FINISHED:
+      case RUNNING:
+        sendStatus(status);
+        break;
+      case SENDING:
+        // no op.
+        break;
+      case FAILED:
+        // shouldn't get here since fail() should be called.
+      default:
+        throw new IllegalStateException(String.format("Received state changed event for unexpected state of %s.",
+            newState));
     }
   }
 
-  private void sendStatus(final FragmentStatus status) {
-    final ControlTunnel tunnel = this.tunnel.get();
-    if (tunnel != null) {
-      tunnel.sendFragmentStatus(status);
+
+  /**
+   * Sends status to remote Foreman node using Control Tunnel or to Local Foreman bypassing
+   * Control Tunnel and using WorkEventBus.
+   * @param status
+   */
+  void sendStatus(final FragmentStatus status) {
+
+    DrillbitEndpoint foremanNode = foremanDrillbit.get();
+
+    if (foremanNode == null) {
+      logger.warn("{}: State {} is not reported as {} is closed", QueryIdHelper.getQueryIdentifier(context.getHandle()),
+          status.getProfile().getState(), this);
+      return;
+    }
+
+    if (localDrillbit.equals(foremanNode)) {
+      // Update the status locally
+      context.getWorkEventbus().statusUpdate(status);
     } else {
-      logger.warn("{}: State {} is not reported as {} is closed", QueryIdHelper.getQueryIdentifier(context.getHandle()), status.getProfile().getState(), this);
+      // Send the status via Control Tunnel to remote foreman node
+      final ControlTunnel tunnel = context.getControlTunnel(foremanNode);
+      tunnel.sendFragmentStatus(status);
     }
   }
 
@@ -123,8 +146,8 @@ public class FragmentStatusReporter implements AutoCloseable {
   @Override
   public void close()
   {
-    final ControlTunnel tunnel = this.tunnel.getAndSet(null);
-    if (tunnel != null) {
+    final DrillbitEndpoint foremanNode = foremanDrillbit.getAndSet(null);
+    if (foremanNode != null) {
       logger.debug("Closing {}", this);
     } else {
       logger.warn("{} was already closed", this);

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 7cffa0a..7d1585b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -17,56 +17,21 @@
  */
 package org.apache.drill.exec.work.fragment;
 
-import java.io.IOException;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import com.google.common.base.Preconditions;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.rpc.data.IncomingDataBatch;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.work.batch.IncomingBuffers;
-import org.apache.drill.exec.work.foreman.ForemanException;
-
-import com.google.common.base.Preconditions;
 
 /**
  * This managers determines when to run a non-root fragment node.
  */
-// TODO a lot of this is the same as RootFragmentManager
-public class NonRootFragmentManager implements FragmentManager {
+public class NonRootFragmentManager extends AbstractFragmentManager {
   //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NonRootFragmentManager.class);
 
-  private final IncomingBuffers buffers;
-  private final FragmentExecutor runner;
-  private final FragmentHandle handle;
-  private volatile boolean cancel = false;
-  private final FragmentContext context;
   private volatile boolean runnerRetrieved = false;
 
-  public NonRootFragmentManager(final PlanFragment fragment, final DrillbitContext context)
-      throws ExecutionSetupException {
-    try {
-      this.handle = fragment.getHandle();
-      this.context = new FragmentContext(context, fragment, context.getFunctionImplementationRegistry());
-      this.buffers = new IncomingBuffers(fragment, this.context);
-      final FragmentStatusReporter reporter = new FragmentStatusReporter(this.context,
-          context.getController().getTunnel(fragment.getForeman()));
-      this.runner = new FragmentExecutor(this.context, fragment, reporter);
-      this.context.setBuffers(buffers);
-
-    } catch (ForemanException e) {
-      throw new FragmentSetupException("Failure while decoding fragment.", e);
-    }
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.drill.exec.work.fragment.FragmentHandler#handle(org.apache.drill.exec.rpc.AbstractRemoteConnection.ConnectionThrottle, org.apache.drill.exec.record.RawFragmentBatch)
-   */
-  @Override
-  public boolean handle(final IncomingDataBatch batch) throws FragmentSetupException, IOException {
-    return buffers.batchArrived(batch);
+  public NonRootFragmentManager(final PlanFragment fragment, final FragmentExecutor fragmentExecutor,
+                                final FragmentStatusReporter statusReporter) {
+    super(fragment, fragmentExecutor, statusReporter);
   }
 
   /* (non-Javadoc)
@@ -84,44 +49,17 @@ public class NonRootFragmentManager implements FragmentManager {
         return null;
       }
       runnerRetrieved = true;
-      return runner;
+      return super.getRunnable();
     }
   }
 
   @Override
   public void receivingFragmentFinished(final FragmentHandle handle) {
-    runner.receivingFragmentFinished(handle);
+    fragmentExecutor.receivingFragmentFinished(handle);
   }
 
   @Override
   public synchronized void cancel() {
-    cancel = true;
-    runner.cancel();
-  }
-
-  @Override
-  public boolean isCancelled() {
-    return cancel;
+    super.cancel();
   }
-
-  @Override
-  public void unpause() {
-    runner.unpause();
-  }
-
-  @Override
-  public FragmentHandle getHandle() {
-    return handle;
-  }
-
-  @Override
-  public boolean isWaiting() {
-    return !buffers.isDone() && !cancel;
-  }
-
-  @Override
-  public FragmentContext getFragmentContext() {
-    return context;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index af81d17..4cbadc2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -17,73 +17,22 @@
  */
 package org.apache.drill.exec.work.fragment;
 
-import java.io.IOException;
-
-import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.rpc.data.IncomingDataBatch;
-import org.apache.drill.exec.work.batch.IncomingBuffers;
 
-// TODO a lot of this is the same as NonRootFragmentManager
-public class RootFragmentManager implements FragmentManager {
+/**
+ * This managers determines when to run a root fragment node.
+ */
+public class RootFragmentManager extends AbstractFragmentManager {
   // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class);
 
-  private final IncomingBuffers buffers;
-  private final FragmentExecutor runner;
-  private final FragmentHandle handle;
-  private volatile boolean cancel = false;
-
-  public RootFragmentManager(final FragmentHandle handle, final IncomingBuffers buffers, final FragmentExecutor runner) {
-    super();
-    this.handle = handle;
-    this.buffers = buffers;
-    this.runner = runner;
-  }
-
-  @Override
-  public boolean handle(final IncomingDataBatch batch) throws FragmentSetupException, IOException {
-    return buffers.batchArrived(batch);
+  public RootFragmentManager(final PlanFragment fragment, final FragmentExecutor fragmentExecutor,
+                             final FragmentStatusReporter statusReporter) {
+    super(fragment, fragmentExecutor, statusReporter);
   }
 
   @Override
   public void receivingFragmentFinished(final FragmentHandle handle) {
     throw new IllegalStateException("The root fragment should not be sending any messages to receiver.");
   }
-
-  @Override
-  public FragmentExecutor getRunnable() {
-    return runner;
-  }
-
-  public FragmentHandle getHandle() {
-    return handle;
-  }
-
-  @Override
-  public void cancel() {
-    cancel = true;
-    runner.cancel();
-  }
-
-  @Override
-  public boolean isCancelled() {
-    return cancel;
-  }
-
-  @Override
-  public void unpause() {
-    runner.unpause();
-  }
-
-  @Override
-  public boolean isWaiting() {
-    return !buffers.isDone() && !cancel;
-  }
-
-  @Override
-  public FragmentContext getFragmentContext() {
-    return runner.getContext();
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
index 8165b0d..3d6f507 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java
@@ -28,6 +28,8 @@ import mockit.MockUp;
 import mockit.NonStrictExpectations;
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.scanner.ClassPathScanner;
 import org.apache.drill.common.scanner.persistence.ScanResult;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -41,6 +43,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.record.FragmentWritableBatch;
 import org.apache.drill.exec.record.MaterializedField;
@@ -50,6 +53,7 @@ import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.rpc.security.KerberosHelper;
+import org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.vector.Float8Vector;
@@ -66,6 +70,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -364,6 +369,98 @@ public class TestBitBitKerberos extends BaseTestQuery {
     }
   }
 
+  /**
+   * Test to validate that a query which is running only on local Foreman node runs fine even if the Bit-Bit
+   * Auth config is wrong. With DRILL-5721, all the local fragment setup and status update
+   * doesn't happen over Control tunnel but instead happens locally. Without the fix in DRILL-5721 these queries will
+   * hang.
+   *
+   * This test only starts up 1 Drillbit so that all fragments are scheduled on Foreman Drillbit node
+   * @throws Exception
+   */
+  @Test
+  public void localQuerySuccessWithWrongBitAuthConfig() throws Exception {
+
+    final Properties connectionProps = new Properties();
+    connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
+    connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
+
+    newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+        .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+            ConfigValueFactory.fromAnyRef(true))
+        .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+            ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+        .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+            ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+        .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+            ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+        .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+            ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
+        .withValue(ExecConstants.BIT_AUTHENTICATION_ENABLED,
+            ConfigValueFactory.fromAnyRef(true))
+        .withValue(ExecConstants.BIT_AUTHENTICATION_MECHANISM,
+            ConfigValueFactory.fromAnyRef("kerberos"))
+        .withValue(ExecConstants.USE_LOGIN_PRINCIPAL,
+            ConfigValueFactory.fromAnyRef(false))
+        ,false);
+
+    updateTestCluster(1, newConfig, connectionProps);
+
+    // Run a query using the new client
+    final String query = getFile("queries/tpch/01.sql");
+    test(query);
+  }
+
+  /**
+   * Test to validate that query setup fails while scheduling remote fragments when multiple Drillbits are running with
+   * wrong Bit-to-Bit Authentication configuration.
+   *
+   * This test starts up 2 Drillbit so that there are combination of local and remote fragments for query
+   * execution. Note: When test runs with wrong config then for control connection Drillbit's uses wrong
+   * service principal to talk to another Drillbit, and due to this Kerby server also fails with NullPointerException.
+   * But for unit testing this should be fine.
+   * @throws Exception
+   */
+  @Test
+  public void queryFailureWithWrongBitAuthConfig() throws Exception {
+    try{
+      final Properties connectionProps = new Properties();
+      connectionProps.setProperty(DrillProperties.SERVICE_PRINCIPAL, krbHelper.SERVER_PRINCIPAL);
+      connectionProps.setProperty(DrillProperties.USER, krbHelper.CLIENT_PRINCIPAL);
+      connectionProps.setProperty(DrillProperties.KEYTAB, krbHelper.clientKeytab.getAbsolutePath());
+
+      newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+          .withValue(ExecConstants.USER_AUTHENTICATION_ENABLED,
+              ConfigValueFactory.fromAnyRef(true))
+          .withValue(ExecConstants.USER_AUTHENTICATOR_IMPL,
+              ConfigValueFactory.fromAnyRef(UserAuthenticatorTestImpl.TYPE))
+          .withValue(BootStrapContext.SERVICE_PRINCIPAL,
+              ConfigValueFactory.fromAnyRef(krbHelper.SERVER_PRINCIPAL))
+          .withValue(BootStrapContext.SERVICE_KEYTAB_LOCATION,
+              ConfigValueFactory.fromAnyRef(krbHelper.serverKeytab.toString()))
+          .withValue(ExecConstants.AUTHENTICATION_MECHANISMS,
+              ConfigValueFactory.fromIterable(Lists.newArrayList("plain", "kerberos")))
+          .withValue(ExecConstants.BIT_AUTHENTICATION_ENABLED,
+              ConfigValueFactory.fromAnyRef(true))
+          .withValue(ExecConstants.BIT_AUTHENTICATION_MECHANISM,
+              ConfigValueFactory.fromAnyRef("kerberos"))
+          .withValue(ExecConstants.USE_LOGIN_PRINCIPAL,
+              ConfigValueFactory.fromAnyRef(false))
+          ,false);
+
+      updateTestCluster(2, newConfig, connectionProps);
+
+      test("alter session set `planner.slice_target` = 10");
+      final String query = getFile("queries/tpch/01.sql");
+      test(query);
+      fail();
+    } catch(Exception ex) {
+      assertTrue(ex instanceof UserRemoteException);
+      assertTrue(((UserRemoteException)ex).getErrorType() == UserBitShared.DrillPBError.ErrorType.CONNECTION);
+    }
+  }
+
   @AfterClass
   public static void cleanTest() throws Exception {
     krbHelper.stopKdc();

http://git-wip-us.apache.org/repos/asf/drill/blob/b06a7bde/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
index b4d56ba..b9c0de2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitKerberosEncryption.java
@@ -132,9 +132,8 @@ public class TestUserBitKerberosEncryption extends BaseTestQuery {
    * For example: There is only 1 DrillClient so encrypted connection count of UserRpcMetrics will be 1. Before
    * running any query there should not be any connection (control or data) between Drillbits, hence those counters
    * are 0. After running a simple query since there is only 1 fragment which is root fragment the Control Connection
-   * count is 2 (for unencrypted counter) based on connection for status update of fragment to Foreman. It is 2 because
-   * for Control and Data Server we count total number of client and server connections on a node. There is no
-   * Data Connection because there is no data exchange between multiple fragments.
+   * count is 0 (for unencrypted counter) since with DRILL-5721 status update of fragment to Foreman happens locally.
+   * There is no Data Connection because there is no data exchange between multiple fragments.
    *
    * @throws Exception
    */
@@ -164,7 +163,7 @@ public class TestUserBitKerberosEncryption extends BaseTestQuery {
 
     // Check unencrypted counters value
     assertTrue(0 == UserRpcMetrics.getInstance().getUnEncryptedConnectionCount());
-    assertTrue(2 == ControlRpcMetrics.getInstance().getUnEncryptedConnectionCount());
+    assertTrue(0 == ControlRpcMetrics.getInstance().getUnEncryptedConnectionCount());
     assertTrue(0 == DataRpcMetrics.getInstance().getUnEncryptedConnectionCount());
   }
 
@@ -393,9 +392,8 @@ public class TestUserBitKerberosEncryption extends BaseTestQuery {
    * For example: There is only 1 DrillClient so encrypted connection count of UserRpcMetrics
    * will be 1. Before running any query there should not be any connection (control or data) between Drillbits,
    * hence those counters are 0. After running a simple query since there is only 1 fragment which is root fragment
-   * the Control Connection count is 2 (for encrypted counter) based on connection for status update of fragment to
-   * Foreman. It is 2 because for Control and Data Server we count total number of client and server connections on a
-   * node. There is no Data Connection because there is no data exchange between multiple fragments.
+   * the Control Connection count is 0 (for encrypted counter), since with DRILL-5721 status update of fragment to
+   * Foreman happens locally. There is no Data Connection because there is no data exchange between multiple fragments.
    */
 
   @Test
@@ -440,7 +438,7 @@ public class TestUserBitKerberosEncryption extends BaseTestQuery {
 
     // Check encrypted counters value
     assertTrue(1 == UserRpcMetrics.getInstance().getEncryptedConnectionCount());
-    assertTrue(2 == ControlRpcMetrics.getInstance().getEncryptedConnectionCount());
+    assertTrue(0 == ControlRpcMetrics.getInstance().getEncryptedConnectionCount());
     assertTrue(0 == DataRpcMetrics.getInstance().getEncryptedConnectionCount());
 
     // Check unencrypted counters value


Mime
View raw message