drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [11/12] incubator-drill git commit: DRILL-1517: Update Foreman to improve state management.
Date Thu, 20 Nov 2014 16:44:47 GMT
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 7a0e501..3e1393c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -21,24 +21,28 @@ import io.netty.buffer.ByteBuf;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import net.hydromatic.optiq.tools.ValidationException;
-
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.coord.DistributedSemaphore;
 import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
-import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.exception.OptimizerException;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.opt.BasicOptimizer;
 import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.ExternalSort;
-import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
@@ -47,39 +51,58 @@ import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
 import org.apache.drill.exec.planner.fragment.StatsCollector;
 import org.apache.drill.exec.planner.sql.DirectPlan;
 import org.apache.drill.exec.planner.sql.DrillSqlWorker;
+import org.apache.drill.exec.proto.BitControl.InitializeFragments;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
-import org.apache.drill.exec.proto.UserProtos.RequestResults;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.util.AtomicState;
 import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.EndpointListener;
 import org.apache.drill.exec.work.ErrorHelper;
 import org.apache.drill.exec.work.QueryWorkUnit;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.eigenbase.sql.parser.SqlParseException;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
+import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import org.apache.drill.exec.work.fragment.RootFragmentManager;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
 
 /**
  * Foreman manages all queries where this is the driving/root node.
+ *
+ * The flow is as follows:
+ *   - Foreman is submitted as a runnable.
+ *   - Runnable does query planning.
+ *   - PENDING > RUNNING
+ *   - Runnable sends out starting fragments
+ *   - Status listener are activated
+ *   - Foreman listens for state move messages.
+ *
  */
-public class Foreman implements Runnable, Closeable, Comparable<Object>{
+public class Foreman implements Runnable, Closeable, Comparable<Object> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
 
   private QueryId queryId;
   private RunQuery queryRequest;
   private QueryContext context;
-  private QueryManager fragmentManager;
+  private QueryManager queryManager;
   private WorkerBee bee;
   private UserClientConnection initiatingClient;
-  private final AtomicState<QueryState> state;
+  private volatile QueryState state;
+
   private final DistributedSemaphore smallSemaphore;
   private final DistributedSemaphore largeSemaphore;
   private final long queueThreshold;
@@ -87,11 +110,18 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
   private volatile DistributedLease lease;
   private final boolean queuingEnabled;
 
+  private FragmentExecutor rootRunner;
+  private final CountDownLatch acceptExternalEvents = new CountDownLatch(1);
+  private final StateListener stateListener = new StateListener();
+  private final ResponseSendListener responseListener = new ResponseSendListener();
+
   public Foreman(WorkerBee bee, DrillbitContext dContext, UserClientConnection connection, QueryId queryId,
       RunQuery queryRequest) {
     this.queryId = queryId;
     this.queryRequest = queryRequest;
     this.context = new QueryContext(connection.getSession(), queryId, dContext);
+
+    // set up queuing
     this.queuingEnabled = context.getOptions().getOption(ExecConstants.ENABLE_QUEUE_KEY).bool_val;
     if (queuingEnabled) {
       int smallQueue = context.getOptions().getOption(ExecConstants.SMALL_QUEUE_KEY).num_val.intValue();
@@ -106,81 +136,32 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
       this.queueThreshold = 0;
       this.queueTimeout = 0;
     }
+    // end queuing setup.
 
     this.initiatingClient = connection;
-    this.fragmentManager = new QueryManager(queryId, queryRequest, bee.getContext().getPersistentStoreProvider(), new ForemanManagerListener(), dContext.getController(), this);
+    this.queryManager = new QueryManager(queryId, queryRequest, bee.getContext().getPersistentStoreProvider(),
+        stateListener, this);
     this.bee = bee;
 
-    this.state = new AtomicState<QueryState>(QueryState.PENDING) {
-      @Override
-      protected QueryState getStateFromNumber(int i) {
-        return QueryState.valueOf(i);
-      }
-    };
-    this.fragmentManager.getStatus().updateQueryStateInStore();
+    recordNewState(QueryState.PENDING);
   }
 
   public QueryContext getContext() {
     return context;
   }
 
-  private boolean isFinished() {
-    switch(state.getState()) {
-    case PENDING:
-    case RUNNING:
-      return false;
-    default:
-      return true;
-    }
-
-  }
-
-  private void fail(String message, Throwable t) {
-    if(isFinished()) {
-      logger.error("Received a failure message query finished of: {}", message, t);
-    }
-    if (!state.updateState(QueryState.RUNNING, QueryState.FAILED)) {
-      if (!state.updateState(QueryState.PENDING, QueryState.FAILED)) {
-        logger.warn("Tried to update query state to FAILED, but was not RUNNING");
-      }
-    }
-
-    DrillPBError error = ErrorHelper.logAndConvertError(context.getCurrentEndpoint(), message, t, logger);
-    QueryResult result = QueryResult //
-        .newBuilder() //
-        .addError(error) //
-        .setIsLastChunk(true) //
-        .setQueryState(QueryState.FAILED) //
-        .setQueryId(queryId) //
-        .build();
-    cleanupAndSendResult(result);
-  }
-
   public void cancel() {
-    if (isFinished()) {
-      return;
-    }
-    state.updateState(QueryState.RUNNING, QueryState.CANCELED);
-
-    // cancel remote fragments.
-    fragmentManager.cancel();
+    stateListener.moveToState(QueryState.CANCELED, null);
   }
 
-  void cleanupAndSendResult(QueryResult result) {
+  private void cleanup(QueryResult result) {
     bee.retireForeman(this);
-    initiatingClient.sendResult(new ResponseSendListener(), new QueryWritableBatch(result), true);
-    state.updateState(state.getState(), result.getQueryState());
-
-    this.fragmentManager.getStatus().updateQueryStateInStore();
-  }
-
-  private class ResponseSendListener extends BaseRpcOutcomeListener<Ack> {
-    @Override
-    public void failed(RpcException ex) {
-      logger.info(
-              "Failure while trying communicate query result to initating client.  This would happen if a client is disconnected before response notice can be sent.",
-              ex);
+    context.getWorkBus().removeFragmentStatusListener(queryId);
+    context.getClusterCoordinator().removeDrillbitStatusListener(queryManager);
+    if(result != null){
+      initiatingClient.sendResult(responseListener, new QueryWritableBatch(result), true);
     }
+    releaseLease();
   }
 
   /**
@@ -190,7 +171,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
 
     final String originalThread = Thread.currentThread().getName();
     Thread.currentThread().setName(QueryIdHelper.getQueryId(queryId) + ":foreman");
-    fragmentManager.getStatus().setStartTime(System.currentTimeMillis());
+    getStatus().markStart();
     // convert a run query request into action
     try {
       switch (queryRequest.getType()) {
@@ -204,16 +185,21 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
         runSQL(queryRequest.getPlan());
         break;
       default:
-        throw new UnsupportedOperationException();
+        throw new IllegalStateException();
       }
+    } catch (ForemanException e) {
+      moveToState(QueryState.FAILED, e);
+
     } catch (AssertionError | Exception ex) {
-      fail("Failure while setting up Foreman.", ex);
+      moveToState(QueryState.FAILED, new ForemanException("Unexpected exception during fragment initialization.", ex));
+
     } catch (OutOfMemoryError e) {
       System.out.println("Out of memory, exiting.");
+      e.printStackTrace();
       System.out.flush();
       System.exit(-1);
+
     } finally {
-      releaseLease();
       Thread.currentThread().setName(originalThread);
     }
   }
@@ -224,45 +210,62 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
         lease.close();
       } catch (Exception e) {
         logger.warn("Failure while releasing lease.", e);
-      };
+      }
+      ;
     }
 
   }
-  private void parseAndRunLogicalPlan(String json) {
 
+  private void parseAndRunLogicalPlan(String json) throws ExecutionSetupException {
+    LogicalPlan logicalPlan;
     try {
-      LogicalPlan logicalPlan = context.getPlanReader().readLogicalPlan(json);
+      logicalPlan = context.getPlanReader().readLogicalPlan(json);
+    } catch (IOException e) {
+      throw new ForemanException("Failure parsing logical plan.", e);
+    }
 
-      if (logicalPlan.getProperties().resultMode == ResultMode.LOGICAL) {
-        fail("Failure running plan.  You requested a result mode of LOGICAL and submitted a logical plan.  In this case you're output mode must be PHYSICAL or EXEC.", new Exception());
-      }
-      if (logger.isDebugEnabled()) {
-        logger.debug("Logical {}", logicalPlan.unparse(context.getConfig()));
-      }
-      PhysicalPlan physicalPlan = convert(logicalPlan);
+    if (logicalPlan.getProperties().resultMode == ResultMode.LOGICAL) {
+      throw new ForemanException(
+          "Failure running plan.  You requested a result mode of LOGICAL and submitted a logical plan.  In this case you're output mode must be PHYSICAL or EXEC.");
+    }
 
-      if (logicalPlan.getProperties().resultMode == ResultMode.PHYSICAL) {
-        returnPhysical(physicalPlan);
-        return;
-      }
+    log(logicalPlan);
+
+    PhysicalPlan physicalPlan = convert(logicalPlan);
+
+    if (logicalPlan.getProperties().resultMode == ResultMode.PHYSICAL) {
+      returnPhysical(physicalPlan);
+      return;
+    }
 
-      if (logger.isDebugEnabled()) {
-        logger.debug("Physical {}", context.getConfig().getMapper().writeValueAsString(physicalPlan));
+    log(physicalPlan);
+
+    runPhysicalPlan(physicalPlan);
+  }
+
+  private void log(LogicalPlan plan) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Logical {}", plan.unparse(context.getConfig()));
+    }
+  }
+
+  private void log(PhysicalPlan plan) {
+    if (logger.isDebugEnabled()) {
+      try {
+        String planText = context.getConfig().getMapper().writeValueAsString(plan);
+        logger.debug("Physical {}", planText);
+      } catch (IOException e) {
+        logger.warn("Error while attempting to log physical plan.", e);
       }
-      runPhysicalPlan(physicalPlan);
-    } catch (IOException e) {
-      fail("Failure while parsing logical plan.", e);
-    } catch (OptimizerException e) {
-      fail("Failure while converting logical plan to physical plan.", e);
     }
   }
 
-  private void returnPhysical(PhysicalPlan plan) {
+  private void returnPhysical(PhysicalPlan plan) throws ExecutionSetupException {
     String jsonPlan = plan.unparse(context.getConfig().getMapper().writer());
     runPhysicalPlan(DirectPlan.createDirectPlan(context, new PhysicalFromLogicalExplain(jsonPlan)));
   }
 
-  private class PhysicalFromLogicalExplain{
+  public static class PhysicalFromLogicalExplain {
     public String json;
 
     public PhysicalFromLogicalExplain(String json) {
@@ -272,54 +275,53 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
 
   }
 
-  class SingleListener implements RpcOutcomeListener<Ack>{
-
-    final SendingAccountor acct;
-
-    public SingleListener() {
-      acct  = new SendingAccountor();
-      acct.increment();
-      acct.increment();
-    }
-
-    @Override
-    public void failed(RpcException ex) {
-      acct.decrement();
-      fail("Failure while sending single result.", ex);
-    }
-
-    @Override
-    public void success(Ack value, ByteBuf buffer) {
-      acct.decrement();
-    }
-
-  }
-
-  private void parseAndRunPhysicalPlan(String json) {
+  private void parseAndRunPhysicalPlan(String json) throws ExecutionSetupException {
     try {
       PhysicalPlan plan = context.getPlanReader().readPhysicalPlan(json);
       runPhysicalPlan(plan);
     } catch (IOException e) {
-      fail("Failure while parsing physical plan.", e);
+      throw new ForemanSetupException("Failure while parsing physical plan.", e);
     }
   }
 
-  private void runPhysicalPlan(PhysicalPlan plan) {
+  private void runPhysicalPlan(PhysicalPlan plan) throws ExecutionSetupException {
 
-    if(plan.getProperties().resultMode != ResultMode.EXEC) {
-      fail(String.format("Failure running plan.  You requested a result mode of %s and a physical plan can only be output as EXEC", plan.getProperties().resultMode), new Exception());
-    }
-    PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
+    validatePlan(plan);
+    setupSortMemoryAllocations(plan);
+    acquireQuerySemaphore(plan);
 
-    MakeFragmentsVisitor makeFragmentsVisitor = new MakeFragmentsVisitor();
-    Fragment rootFragment;
-    try {
-      rootFragment = rootOperator.accept(makeFragmentsVisitor, null);
-    } catch (FragmentSetupException e) {
-      fail("Failure while fragmenting query.", e);
-      return;
+    final QueryWorkUnit work = getQueryWorkUnit(plan);
+
+    this.context.getWorkBus().setFragmentStatusListener(work.getRootFragment().getHandle().getQueryId(), queryManager);
+    this.context.getClusterCoordinator().addDrillbitStatusListener(queryManager);
+
+    logger.debug("Submitting fragments to run.");
+
+    final PlanFragment rootPlanFragment = work.getRootFragment();
+    assert queryId == rootPlanFragment.getHandle().getQueryId();
+
+    queryManager.setup(rootPlanFragment.getHandle(), context.getCurrentEndpoint(), work.getFragments().size());
+
+    // set up the root fragment first so we'll have incoming buffers available.
+    setupRootFragment(rootPlanFragment, initiatingClient, work.getRootOperator());
+
+    setupNonRootFragments(work.getFragments());
+    bee.getContext().getAllocator().resetFragmentLimits();
+
+    moveToState(QueryState.RUNNING, null);
+    logger.debug("Fragments running.");
+
+  }
+
+  private void validatePlan(PhysicalPlan plan) throws ForemanSetupException{
+    if (plan.getProperties().resultMode != ResultMode.EXEC) {
+      throw new ForemanSetupException(String.format(
+          "Failure running plan.  You requested a result mode of %s and a physical plan can only be output as EXEC",
+          plan.getProperties().resultMode));
     }
+  }
 
+  private void setupSortMemoryAllocations(PhysicalPlan plan){
     int sortCount = 0;
     for (PhysicalOperator op : plan.getSortedOperators()) {
       if (op instanceof ExternalSort) {
@@ -329,83 +331,193 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
 
     if (sortCount > 0) {
       long maxWidthPerNode = context.getOptions().getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val;
-      long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(), context.getConfig().getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC));
-      maxAllocPerNode = Math.min(maxAllocPerNode, context.getOptions().getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
+      long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(),
+          context.getConfig().getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC));
+      maxAllocPerNode = Math.min(maxAllocPerNode,
+          context.getOptions().getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
       long maxSortAlloc = maxAllocPerNode / (sortCount * maxWidthPerNode);
       logger.debug("Max sort alloc: {}", maxSortAlloc);
       for (PhysicalOperator op : plan.getSortedOperators()) {
-        if (op instanceof  ExternalSort) {
-          ((ExternalSort)op).setMaxAllocation(maxSortAlloc);
+        if (op instanceof ExternalSort) {
+          ((ExternalSort) op).setMaxAllocation(maxSortAlloc);
         }
       }
     }
+  }
 
-    PlanningSet planningSet = StatsCollector.collectStats(rootFragment);
-    SimpleParallelizer parallelizer = new SimpleParallelizer(context);
+  private void acquireQuerySemaphore(PhysicalPlan plan) throws ForemanSetupException {
 
-    try {
+    double size = 0;
+    for (PhysicalOperator ops : plan.getSortedOperators()) {
+      size += ops.getCost();
+    }
 
-      double size = 0;
-      for (PhysicalOperator ops : plan.getSortedOperators()) {
-        size += ops.getCost();
-      }
-      if (queuingEnabled) {
+    if (queuingEnabled) {
+      try {
         if (size > this.queueThreshold) {
           this.lease = largeSemaphore.acquire(this.queueTimeout, TimeUnit.MILLISECONDS);
         } else {
           this.lease = smallSemaphore.acquire(this.queueTimeout, TimeUnit.MILLISECONDS);
         }
+      } catch (Exception e) {
+        throw new ForemanSetupException("Unable to acquire slot for query.", e);
+      }
+    }
+  }
+
+  private QueryWorkUnit getQueryWorkUnit(PhysicalPlan plan) throws ExecutionSetupException {
+    PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
+    MakeFragmentsVisitor makeFragmentsVisitor = new MakeFragmentsVisitor();
+    Fragment rootFragment = rootOperator.accept(makeFragmentsVisitor, null);
+    PlanningSet planningSet = StatsCollector.collectStats(rootFragment);
+    SimpleParallelizer parallelizer = new SimpleParallelizer(context);
+
+
+    return parallelizer.getFragments(context.getOptions().getOptionList(), context.getCurrentEndpoint(),
+        queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet,
+        initiatingClient.getSession());
+  }
+
+  /**
+   * Tells the foreman to move to a new state.  Note that
+   * @param state
+   * @return
+   */
+  private synchronized boolean moveToState(QueryState newState, Exception exception){
+    logger.debug("State change requested.  {} --> {}", state, newState);
+    outside: switch(state) {
+
+    case PENDING:
+      // since we're moving out of pending, we can now start accepting other changes in state.
+      // This guarantees that the first state change is driven by the original thread.
+      acceptExternalEvents.countDown();
+
+      if(newState == QueryState.RUNNING){
+        recordNewState(QueryState.RUNNING);
+        return true;
+      }
+
+      // fall through to running behavior.
+      //
+    case RUNNING: {
+
+      switch(newState){
+
+      case CANCELED: {
+        assert exception == null;
+        recordNewState(QueryState.CANCELED);
+        cancelExecutingFragments();
+        QueryResult result = QueryResult.newBuilder() //
+            .setQueryId(queryId) //
+            .setQueryState(QueryState.CANCELED) //
+            .setIsLastChunk(true) //
+            .build();
+
+        cleanup(result);
+        return true;
       }
 
-      QueryWorkUnit work = parallelizer.getFragments(context.getOptions().getOptionList(), context.getCurrentEndpoint(),
-          queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet, initiatingClient.getSession());
+      case COMPLETED: {
+        assert exception == null;
+        recordNewState(QueryState.COMPLETED);
+//        QueryResult result = QueryResult //
+//            .newBuilder() //
+//            .setIsLastChunk(true) //
+//            .setQueryState(QueryState.COMPLETED) //
+//            .setQueryId(queryId) //
+//            .build();
+        cleanup(null);
+        return true;
+      }
 
-      this.context.getWorkBus().setFragmentStatusListener(work.getRootFragment().getHandle().getQueryId(), fragmentManager);
-      this.context.getClusterCoordinator().addDrillbitStatusListener(fragmentManager);
 
-      int totalFragments = 1 + work.getFragments().size();;
-      fragmentManager.getStatus().setTotalFragments(totalFragments);
+      case FAILED:
+        assert exception != null;
+        recordNewState(QueryState.FAILED);
+        cancelExecutingFragments();
+        DrillPBError error = ErrorHelper.logAndConvertError(context.getCurrentEndpoint(), "Query failed.", exception, logger);
+        QueryResult result = QueryResult //
+          .newBuilder() //
+          .addError(error) //
+          .setIsLastChunk(true) //
+          .setQueryState(QueryState.FAILED) //
+          .setQueryId(queryId) //
+          .build();
+        cleanup(result);
+        return true;
+      default:
+        break outside;
 
-      logger.debug("Submitting fragments to run.");
-      fragmentManager.runFragments(bee, work.getRootFragment(), work.getRootOperator(), initiatingClient, work.getFragments());
+      }
+    }
+
+    case CANCELED:
+    case COMPLETED:
+    case FAILED: {
+      // no op.
+      logger.info("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state, exception);
+      return false;
+    }
 
-      logger.debug("Fragments running.");
-      state.updateState(QueryState.PENDING, QueryState.RUNNING);
-      fragmentManager.getStatus().updateQueryStateInStore();
-    } catch (Exception e) {
-      fail("Failure while setting up query.", e);
     }
 
+    throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name()));
   }
 
-  private void runSQL(String sql) {
-    try{
-      DrillSqlWorker sqlWorker = new DrillSqlWorker(context);
-      Pointer<String> textPlan = new Pointer<>();
-      PhysicalPlan plan = sqlWorker.getPlan(sql, textPlan);
-      fragmentManager.getStatus().setPlanText(textPlan.value);
-      runPhysicalPlan(plan);
-    } catch (SqlParseException ex) {
-      fail("Failure while parsing sql : " + ex.getMessage(), ex);
-    } catch (ValidationException ex) {
-      fail("Failure while validating sql : " + ex.getMessage(), ex);
-    } catch(Exception e) {
-      fail("Failure while running sql.", e);
+  private void cancelExecutingFragments(){
+
+    // Stop all framgents with a currently active status.
+    List<FragmentData> fragments = getStatus().getFragmentData();
+    Collections.sort(fragments, new Comparator<FragmentData>() {
+      @Override
+      public int compare(FragmentData o1, FragmentData o2) {
+        return o2.getHandle().getMajorFragmentId() - o1.getHandle().getMajorFragmentId();
+      }
+    });
+    for(FragmentData data: fragments){
+      FragmentHandle handle = data.getStatus().getHandle();
+      switch(data.getStatus().getProfile().getState()){
+      case SENDING:
+      case AWAITING_ALLOCATION:
+      case RUNNING:
+        if(data.isLocal()){
+          rootRunner.cancel();
+        }else{
+          bee.getContext().getController().getTunnel(data.getEndpoint()).cancelFragment(new CancelListener(data.getEndpoint(), handle), handle);
+        }
+        break;
+      default:
+        break;
+      }
     }
+
+  }
+
+  private QueryStatus getStatus(){
+    return queryManager.getStatus();
+  }
+
+  private void recordNewState(QueryState newState){
+    this.state = newState;
+    getStatus().updateQueryStateInStore(newState);
+  }
+
+  private void runSQL(String sql) throws ExecutionSetupException {
+    DrillSqlWorker sqlWorker = new DrillSqlWorker(context);
+    Pointer<String> textPlan = new Pointer<>();
+    PhysicalPlan plan = sqlWorker.getPlan(sql, textPlan);
+    getStatus().setPlanText(textPlan.value);
+    runPhysicalPlan(plan);
   }
 
   private PhysicalPlan convert(LogicalPlan plan) throws OptimizerException {
     if (logger.isDebugEnabled()) {
       logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(context.getConfig()));
     }
-    return new BasicOptimizer(DrillConfig.create(), context, initiatingClient).optimize(new BasicOptimizer.BasicOptimizationContext(context), plan);
+    return new BasicOptimizer(DrillConfig.create(), context, initiatingClient).optimize(
+        new BasicOptimizer.BasicOptimizationContext(context), plan);
   }
 
-  public QueryResult getResult(UserClientConnection connection, RequestResults req) {
-    throw new UnsupportedOperationException();
-  }
-
-
   public QueryId getQueryId() {
     return queryId;
   }
@@ -414,29 +526,163 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
   public void close() throws IOException {
   }
 
-  public QueryState getQueryState() {
-    return this.state.getState();
+  public QueryStatus getQueryStatus() {
+    return this.queryManager.getStatus();
   }
 
-  public QueryStatus getQueryStatus() {
-    return this.fragmentManager.getStatus();
+  private void setupRootFragment(PlanFragment rootFragment, UserClientConnection rootClient, FragmentRoot rootOperator) throws ExecutionSetupException {
+    FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment, rootClient, bee.getContext()
+        .getFunctionImplementationRegistry());
+
+    IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
+
+    rootContext.setBuffers(buffers);
+
+    // add fragment to local node.
+    queryManager.addFragmentStatusTracker(rootFragment, true);
+
+    this.rootRunner = new FragmentExecutor(rootContext, bee, rootOperator, queryManager.getRootStatusHandler(rootContext, rootFragment));
+    RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
+
+    if (buffers.isDone()) {
+      // if we don't have to wait for any incoming data, start the fragment runner.
+      bee.addFragmentRunner(fragmentManager.getRunnable());
+    } else {
+      // if we do, record the fragment manager in the workBus.
+      bee.getContext().getWorkBus().setFragmentManager(fragmentManager);
+    }
   }
 
+  private void setupNonRootFragments(Collection<PlanFragment> fragments) throws ForemanException{
+    Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create();
+    Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create();
+
+    // record all fragments for status purposes.
+    for (PlanFragment f : fragments) {
+//      logger.debug("Tracking intermediate remote node {} with data {}", f.getAssignment(), f.getFragmentJson());
+      queryManager.addFragmentStatusTracker(f, false);
+      if (f.getLeafFragment()) {
+        leafFragmentMap.put(f.getAssignment(), f);
+      } else {
+        intFragmentMap.put(f.getAssignment(), f);
+      }
+    }
+
+    CountDownLatch latch = new CountDownLatch(intFragmentMap.keySet().size());
 
-  class ForemanManagerListener{
-    void fail(String message, Throwable t) {
-      ForemanManagerListener.this.fail(message, t);
+    // send remote intermediate fragments
+    for (DrillbitEndpoint ep : intFragmentMap.keySet()) {
+      sendRemoteFragments(ep, intFragmentMap.get(ep), latch);
     }
 
-    void cleanupAndSendResult(QueryResult result) {
-      Foreman.this.cleanupAndSendResult(result);
+    // wait for send complete
+    try {
+      latch.await();
+    } catch (InterruptedException e) {
+      throw new ForemanException("Interrupted while waiting to complete send of remote fragments.", e);
+    }
+
+    // send remote (leaf) fragments.
+    for (DrillbitEndpoint ep : leafFragmentMap.keySet()) {
+      sendRemoteFragments(ep, leafFragmentMap.get(ep), null);
+    }
+  }
+
+  public RpcOutcomeListener<Ack> getSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value, CountDownLatch latch){
+    return new FragmentSubmitListener(endpoint, value, latch);
+  }
+
+  private void sendRemoteFragments(DrillbitEndpoint assignment, Collection<PlanFragment> fragments, CountDownLatch latch){
+    Controller controller = bee.getContext().getController();
+    InitializeFragments.Builder fb = InitializeFragments.newBuilder();
+    for(PlanFragment f : fragments){
+      fb.addFragment(f);
+    }
+    InitializeFragments initFrags = fb.build();
+
+    logger.debug("Sending remote fragments to node {} with data {}", assignment, initFrags);
+    FragmentSubmitListener listener = new FragmentSubmitListener(assignment, initFrags, latch);
+    controller.getTunnel(assignment).sendFragments(listener, initFrags);
+  }
+
+  public QueryState getState(){
+    return state;
+  }
+
+  private class FragmentSubmitListener extends EndpointListener<Ack, InitializeFragments>{
+
+    private CountDownLatch latch;
+
+    public FragmentSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value, CountDownLatch latch) {
+      super(endpoint, value);
+      this.latch = latch;
+    }
+
+    @Override
+    public void success(Ack ack, ByteBuf byteBuf) {
+      if (latch != null) {
+        latch.countDown();
+      }
+    }
+
+    @Override
+    public void failed(RpcException ex) {
+      logger.debug("Failure while sending fragment.  Stopping query.", ex);
+      moveToState(QueryState.FAILED, ex);
     }
 
   }
 
+
+  public class StateListener {
+    public boolean moveToState(QueryState newState, Exception ex){
+      try{
+        acceptExternalEvents.await();
+      }catch(InterruptedException e){
+        logger.warn("Interrupted while waiting to move state.", e);
+        return false;
+      }
+
+      return Foreman.this.moveToState(newState, ex);
+    }
+  }
+
+
   @Override
   public int compareTo(Object o) {
     return hashCode() - o.hashCode();
   }
 
+  private class ResponseSendListener extends BaseRpcOutcomeListener<Ack> {
+    @Override
+    public void failed(RpcException ex) {
+      logger
+          .info(
+              "Failure while trying communicate query result to initating client.  This would happen if a client is disconnected before response notice can be sent.",
+              ex);
+      moveToState(QueryState.FAILED, ex);
+    }
+  }
+
+
+  private class CancelListener extends EndpointListener<Ack, FragmentHandle>{
+
+    public CancelListener(DrillbitEndpoint endpoint, FragmentHandle handle) {
+      super(endpoint, handle);
+    }
+
+    @Override
+    public void failed(RpcException ex) {
+      logger.error("Failure while attempting to cancel fragment {} on endpoint {}.", value, endpoint, ex);
+    }
+
+    @Override
+    public void success(Ack value, ByteBuf buf) {
+      if(!value.getOk()){
+        logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", endpoint, value);
+      }
+      // do nothing.
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ForemanException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ForemanException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ForemanException.java
new file mode 100644
index 0000000..32a99ad
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ForemanException.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman;
+
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+
+public class ForemanException extends ExecutionSetupException {
+  private static final long serialVersionUID = -6943409010231014085L;
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ForemanException.class);
+
+  public static ForemanException fromThrowable(String message, Throwable cause) {
+    Throwable t = cause instanceof InvocationTargetException
+        ? ((InvocationTargetException)cause).getTargetException() : cause;
+    if (t instanceof ForemanException) {
+      return ((ForemanException) t);
+    }
+    return new ForemanException(message, t);
+  }
+
+  public ForemanException() {
+    super();
+  }
+
+  public ForemanException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+
+  public ForemanException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public ForemanException(String message) {
+    super(message);
+  }
+
+  public ForemanException(Throwable cause) {
+    super(cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ForemanSetupException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ForemanSetupException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ForemanSetupException.java
new file mode 100644
index 0000000..2083753
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ForemanSetupException.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman;
+
+
+public class ForemanSetupException extends ForemanException {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ForemanSetupException.class);
+
+  public ForemanSetupException() {
+    super();
+  }
+
+  public ForemanSetupException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+
+  public ForemanSetupException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public ForemanSetupException(String message) {
+    super(message);
+  }
+
+  public ForemanSetupException(Throwable cause) {
+    super(cause);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 0f007ee..d4c87d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -17,204 +17,70 @@
  */
 package org.apache.drill.exec.work.foreman;
 
-import io.netty.buffer.ByteBuf;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.drill.common.exceptions.DrillException;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.BitControl.InitializeFragments;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
-import org.apache.drill.exec.rpc.control.Controller;
-import org.apache.drill.exec.rpc.control.WorkEventBus;
-import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RemoteRpcException;
 import org.apache.drill.exec.store.sys.PStoreProvider;
-import org.apache.drill.exec.work.EndpointListener;
-import org.apache.drill.exec.work.ErrorHelper;
-import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.batch.IncomingBuffers;
-import org.apache.drill.exec.work.foreman.Foreman.ForemanManagerListener;
+import org.apache.drill.exec.work.foreman.Foreman.StateListener;
 import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
-import org.apache.drill.exec.work.fragment.FragmentExecutor;
-import org.apache.drill.exec.work.fragment.RootFragmentManager;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
 
 /**
- * Each Foreman holds its own fragment manager.  This manages the events associated with execution of a particular query across all fragments.
+ * Each Foreman holds its own QueryManager.  This manages the events associated with execution of a particular query across all fragments.
  */
 public class QueryManager implements FragmentStatusListener, DrillbitStatusListener{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
+  private final Set<DrillbitEndpoint> includedBits;
 
   private final QueryStatus status;
-  private final Controller controller;
-  private ForemanManagerListener foremanManagerListener;
-  private AtomicInteger remainingFragmentCount;
-  private AtomicInteger failedFragmentCount;
-  private WorkEventBus workBus;
-  private ClusterCoordinator coord;
-  private QueryId queryId;
-  private FragmentExecutor rootRunner;
-  private RunQuery query;
-  private volatile boolean running = false;
-  private volatile boolean cancelled = false;
-  private volatile boolean stopped = false;
+  private final StateListener stateListener;
+  private final AtomicInteger remainingFragmentCount;
+  private final QueryId queryId;
 
-  public QueryManager(QueryId id, RunQuery query, PStoreProvider pStoreProvider, ForemanManagerListener foremanManagerListener, Controller controller, Foreman foreman) {
+  public QueryManager(QueryId id, RunQuery query, PStoreProvider pStoreProvider, StateListener stateListener, Foreman foreman) {
     super();
-    this.foremanManagerListener = foremanManagerListener;
-    this.query = query;
+    this.stateListener = stateListener;
     this.queryId =  id;
-    this.controller = controller;
     this.remainingFragmentCount = new AtomicInteger(0);
-    this.failedFragmentCount = new AtomicInteger(0);
     this.status = new QueryStatus(query, id, pStoreProvider, foreman);
+    this.includedBits = Sets.newHashSet();
   }
 
   public QueryStatus getStatus(){
     return status;
   }
 
-  public void addTextPlan(String textPlan){
-
-  }
-
-  public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator,
-                           UserClientConnection rootClient, List<PlanFragment> nonRootFragments) throws ExecutionSetupException{
-    logger.debug("Setting up fragment runs.");
-    remainingFragmentCount.set(nonRootFragments.size() + 1);
-    assert queryId == rootFragment.getHandle().getQueryId();
-    workBus = bee.getContext().getWorkBus();
-    coord = bee.getContext().getClusterCoordinator();
-
-    // set up the root fragment first so we'll have incoming buffers available.
-    {
-      logger.debug("Setting up root context.");
-      FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment, rootClient, bee.getContext().getFunctionImplementationRegistry());
-      logger.debug("Setting up incoming buffers");
-      IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
-      logger.debug("Setting buffers on root context.");
-      rootContext.setBuffers(buffers);
-      // add fragment to local node.
-      status.add(new FragmentData(rootFragment.getHandle(), rootFragment.getAssignment(), true));
-      logger.debug("Fragment added to local node.");
-      rootRunner = new FragmentExecutor(rootContext, bee, rootOperator, new RootStatusHandler(rootContext, rootFragment));
-      RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
-
-      if(buffers.isDone()){
-        // if we don't have to wait for any incoming data, start the fragment runner.
-        bee.addFragmentRunner(fragmentManager.getRunnable());
-      }else{
-        // if we do, record the fragment manager in the workBus.
-        workBus.setFragmentManager(fragmentManager);
-      }
-    }
-
-    Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create();
-    Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create();
-
-    // record all fragments for status purposes.
-    for (PlanFragment f : nonRootFragments) {
-      logger.debug("Tracking intermediate remote node {} with data {}", f.getAssignment(), f.getFragmentJson());
-      status.add(new FragmentData(f.getHandle(), f.getAssignment(), false));
-      if (f.getLeafFragment()) {
-        leafFragmentMap.put(f.getAssignment(), f);
-      } else {
-        intFragmentMap.put(f.getAssignment(), f);
-      }
-    }
-
-    CountDownLatch latch = new CountDownLatch(intFragmentMap.keySet().size());
-
-    // send remote intermediate fragments
-    for (DrillbitEndpoint ep : intFragmentMap.keySet()) {
-      sendRemoteFragments(ep, intFragmentMap.get(ep), latch);
-    }
-
-    // wait for send complete
-    try {
-      latch.await();
-    } catch (InterruptedException e) {
-      throw new ExecutionSetupException(e);
-    }
-
-    // send remote (leaf) fragments.
-    for (DrillbitEndpoint ep : leafFragmentMap.keySet()) {
-      sendRemoteFragments(ep, leafFragmentMap.get(ep), null);
-    }
-
-    bee.getContext().getAllocator().resetFragmentLimits();
-
-    logger.debug("Fragment runs setup is complete.");
-    running = true;
-    if (cancelled && !stopped) {
-      stopQuery();
-      QueryResult result = QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.CANCELED).setIsLastChunk(true).build();
-      foremanManagerListener.cleanupAndSendResult(result);
-    }
-  }
-
-  private void sendRemoteFragments(DrillbitEndpoint assignment, Collection<PlanFragment> fragments, CountDownLatch latch){
-    InitializeFragments.Builder fb = InitializeFragments.newBuilder();
-    for(PlanFragment f : fragments){
-      fb.addFragment(f);
-    }
-    InitializeFragments initFrags = fb.build();
-
-    logger.debug("Sending remote fragments to node {} with data {}", assignment, initFrags);
-    FragmentSubmitListener listener = new FragmentSubmitListener(assignment, initFrags, latch);
-    controller.getTunnel(assignment).sendFragments(listener, initFrags);
-  }
-
   @Override
   public void drillbitRegistered(Set<DrillbitEndpoint> registeredDrillbits) {
   }
 
   @Override
   public void drillbitUnregistered(Set<DrillbitEndpoint> unregisteredDrillbits) {
-    List<FragmentData> fragments = status.getFragmentData();
-
-    for (FragmentData fragment : fragments) {
-      if (unregisteredDrillbits.contains(fragment.getEndpoint())) {
-        logger.warn("Drillbit {} for major{}:minor{} is not responding. Stop query {}",
-            fragment.getEndpoint(),
-            fragment.getHandle().getMajorFragmentId(),
-            fragment.getHandle().getMinorFragmentId(),
-            fragment.getHandle().getQueryId());
-
-        UserBitShared.DrillPBError error = ErrorHelper.logAndConvertError(fragment.getEndpoint(), "Failure while running fragment.",
-            new DrillRuntimeException(String.format("Drillbit %s not responding", fragment.getEndpoint())), logger);
-        failWithError(error);
-        break;
+    for(DrillbitEndpoint ep : unregisteredDrillbits){
+      if(this.includedBits.contains(ep)){
+        logger.warn("Drillbit {} no longer registered in cluster.  Canceling query {}", ep.getAddress() + ep.getControlPort(), QueryIdHelper.getQueryId(queryId));
+        this.stateListener.moveToState(QueryState.FAILED, new ForemanException("One more more nodes lost connectivity during query.  Identified node was " + ep.getAddress()));
       }
     }
   }
 
+
   @Override
   public void statusUpdate(FragmentStatus status) {
+
     logger.debug("New fragment status was provided to Foreman of {}", status);
     switch(status.getProfile().getState()){
     case AWAITING_ALLOCATION:
@@ -224,7 +90,7 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
       // we don't care about cancellation messages since we're the only entity that should drive cancellations.
       break;
     case FAILED:
-      fail(status);
+      stateListener.moveToState(QueryState.FAILED, new RemoteRpcException(status.getProfile().getError()));
       break;
     case FINISHED:
       finished(status);
@@ -242,139 +108,47 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
   }
 
   private void finished(FragmentStatus status){
-    int remaining = remainingFragmentCount.decrementAndGet();
-    if(remaining == 0){
-      logger.info("Outcome status: {}", this.status);
-      QueryResult result = QueryResult.newBuilder() //
-              .setQueryState(QueryState.COMPLETED) //
-              .setQueryId(queryId) //
-              .build();
-      this.status.setEndTime(System.currentTimeMillis());
-      foremanManagerListener.cleanupAndSendResult(result);
-      workBus.removeFragmentStatusListener(queryId);
-      coord.removeDrillbitStatusListener(this);
-    }
     this.status.incrementFinishedFragments();
+    int remaining = remainingFragmentCount.decrementAndGet();
     updateFragmentStatus(status);
-  }
 
-  private void fail(FragmentStatus status){
-    updateFragmentStatus(status);
-    int failed = this.failedFragmentCount.incrementAndGet();
-    if (failed == 1) { // only first failed fragment need notify foreman (?)
-      failWithError(status.getProfile().getError());
+    if(remaining == 0){
+      stateListener.moveToState(QueryState.COMPLETED, null);
     }
   }
 
-  private void failWithError(UserBitShared.DrillPBError error) {
-    stopQuery();
-    QueryResult result = QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.FAILED).addError(error).setIsLastChunk(true).build();
-    this.status.setEndTime(System.currentTimeMillis());
-    foremanManagerListener.cleanupAndSendResult(result);
-  }
-
+  public void setup(FragmentHandle rootFragmentHandle, DrillbitEndpoint localIdentity, int countOfNonRootFragments){
+    remainingFragmentCount.set(countOfNonRootFragments + 1);
+    status.add(new FragmentData(rootFragmentHandle, localIdentity, true));
+    this.status.setTotalFragments(countOfNonRootFragments + 1);
 
-  private void stopQuery(){
-    // Stop all queries with a currently active status.
     List<FragmentData> fragments = status.getFragmentData();
-    Collections.sort(fragments, new Comparator<FragmentData>() {
-      @Override
-      public int compare(FragmentData o1, FragmentData o2) {
-        return o2.getHandle().getMajorFragmentId() - o1.getHandle().getMajorFragmentId();
-      }
-    });
-    for(FragmentData data: fragments){
-      FragmentHandle handle = data.getStatus().getHandle();
-      switch(data.getStatus().getProfile().getState()){
-      case SENDING:
-      case AWAITING_ALLOCATION:
-      case RUNNING:
-        if(data.isLocal()){
-          rootRunner.cancel();
-        }else{
-          controller.getTunnel(data.getEndpoint()).cancelFragment(new CancelListener(data.getEndpoint(), handle), handle);
-        }
-        break;
-      default:
-        break;
-      }
-    }
-
-    workBus.removeFragmentStatusListener(queryId);
-    coord.removeDrillbitStatusListener(this);
-
-    stopped = true;
-  }
-
-  public void cancel(){
-    cancelled = true;
-    if (running) {
-      stopQuery();
-      stopped = true;
-      QueryResult result = QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.CANCELED).setIsLastChunk(true).build();
-      foremanManagerListener.cleanupAndSendResult(result);
+    for (FragmentData fragment : fragments) {
+      this.includedBits.add(fragment.getEndpoint());
     }
   }
 
-  private class CancelListener extends EndpointListener<Ack, FragmentHandle>{
-
-    public CancelListener(DrillbitEndpoint endpoint, FragmentHandle handle) {
-      super(endpoint, handle);
-    }
-
-    @Override
-    public void failed(RpcException ex) {
-      logger.error("Failure while attempting to cancel fragment {} on endpoint {}.", value, endpoint, ex);
-    }
-
-    @Override
-    public void success(Ack value, ByteBuf buf) {
-      if(!value.getOk()){
-        logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", endpoint, value);
-      }
-      // do nothing.
-    }
-
+  public void addFragmentStatusTracker(PlanFragment fragment, boolean isRoot){
+    addFragmentStatusTracker(fragment.getHandle(), fragment.getAssignment(), isRoot);
   }
 
-  public RpcOutcomeListener<Ack> getSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value, CountDownLatch latch){
-    return new FragmentSubmitListener(endpoint, value, latch);
+  public void addFragmentStatusTracker(FragmentHandle handle, DrillbitEndpoint node, boolean isRoot){
+    status.add(new FragmentData(handle, node, isRoot));
   }
 
-  private class FragmentSubmitListener extends EndpointListener<Ack, InitializeFragments>{
-
-    private CountDownLatch latch;
-
-    public FragmentSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value, CountDownLatch latch) {
-      super(endpoint, value);
-      this.latch = latch;
-    }
-
-    @Override
-    public void success(Ack ack, ByteBuf byteBuf) {
-      if (latch != null) {
-        latch.countDown();
-      }
-    }
-
-    @Override
-    public void failed(RpcException ex) {
-      logger.debug("Failure while sending fragment.  Stopping query.", ex);
-      UserBitShared.DrillPBError error = ErrorHelper.logAndConvertError(endpoint, "Failure while sending fragment.", ex, logger);
-      failWithError(error);
-    }
-
+  public RootStatusReporter getRootStatusHandler(FragmentContext context, PlanFragment fragment){
+    return new RootStatusReporter(context, fragment);
   }
 
-  private class RootStatusHandler extends AbstractStatusReporter{
+  class RootStatusReporter extends AbstractStatusReporter{
 
-    private RootStatusHandler(FragmentContext context, PlanFragment fragment){
+    private RootStatusReporter(FragmentContext context, PlanFragment fragment){
       super(context);
     }
 
     @Override
     protected void statusChange(FragmentHandle handle, FragmentStatus status) {
-      QueryManager.this.statusUpdate(status);
+      statusUpdate(status);
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
index 8da0ffb..4e18da6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
@@ -89,8 +89,8 @@ public class QueryStatus {
     this.planText = planText;
   }
 
-  public void setStartTime(long startTime) {
-    this.startTime = startTime;
+  public void markStart() {
+    this.startTime = System.currentTimeMillis();
   }
 
   public void setEndTime(long endTime) {
@@ -125,8 +125,7 @@ public class QueryStatus {
     fragmentDataMap.get(majorFragmentId).get(minorFragmentId).setStatus(fragmentStatus);
   }
 
-  public synchronized void updateQueryStateInStore() {
-    QueryState queryState = foreman.getQueryState();
+  synchronized QueryState updateQueryStateInStore(QueryState queryState) {
     switch (queryState) {
       case PENDING:
       case RUNNING:
@@ -140,12 +139,13 @@ public class QueryStatus {
         }catch(Exception e){
           logger.warn("Failure while trying to delete the estore profile for this query.", e);
         }
-        //profileEStore.put(queryId, getAsProfile(false));  //  Change the state in EStore to complete/cancel/fail.
-        // profileEStore.delete(queryId);  // delete the ephemeral query profile.
+
         profilePStore.put(queryId, getAsProfile());
         break;
       default:
+        throw new IllegalStateException();
     }
+    return queryState;
   }
 
   @Override
@@ -212,7 +212,7 @@ public class QueryStatus {
   public QueryInfo getAsInfo() {
     return QueryInfo.newBuilder() //
       .setQuery(query.getPlan())
-      .setState(foreman.getQueryState())
+      .setState(foreman.getState())
       .setForeman(foreman.getContext().getCurrentEndpoint())
       .setStart(startTime)
       .build();
@@ -243,7 +243,7 @@ public class QueryStatus {
       }
     }
 
-    b.setState(foreman.getQueryState());
+    b.setState(foreman.getState());
     b.setForeman(foreman.getContext().getCurrentEndpoint());
     b.setStart(startTime);
     b.setEnd(endTime);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RootStatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RootStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RootStatusReporter.java
deleted file mode 100644
index 8c98296..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RootStatusReporter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.work.foreman;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
-
-public class RootStatusReporter extends AbstractStatusReporter{
-
-  QueryManager runningFragmentManager;
-
-  private RootStatusReporter(FragmentContext context){
-    super(context);
-  }
-
-  @Override
-  protected void statusChange(FragmentHandle handle, FragmentStatus status) {
-    runningFragmentManager.statusUpdate(status);
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 9f08e97..f76dfcd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -200,11 +200,10 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
 
     @Override
     public void drillbitUnregistered(Set<CoordinationProtos.DrillbitEndpoint> unregisteredDrillbits) {
-      if (unregisteredDrillbits.contains(FragmentExecutor.this.context.getForemanDrillbitEndPoint())) {
-        logger.warn("Forman : {} seems not responding or not work properly. Cancel this fragment {}:{}",
-            FragmentExecutor.this.context.getForemanDrillbitEndPoint(),
-            FragmentExecutor.this.context.getHandle().getMajorFragmentId(),
-            FragmentExecutor.this.context.getHandle().getMinorFragmentId());
+      if (unregisteredDrillbits.contains(FragmentExecutor.this.context.getForemanEndpoint())) {
+        logger.warn("Forman : {} no longer active. Cancelling fragment {}.",
+            FragmentExecutor.this.context.getForemanEndpoint().getAddress(),
+            QueryIdHelper.getQueryIdentifier(context.getHandle()));
         FragmentExecutor.this.cancel();
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 9798701..312f96a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.rpc.RemoteConnection;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
+import org.apache.drill.exec.work.foreman.ForemanException;
 
 /**
  * This managers determines when to run a non-root fragment node.
@@ -47,7 +48,7 @@ public class NonRootFragmentManager implements FragmentManager {
   private final FragmentContext context;
   private List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
 
-  public NonRootFragmentManager(PlanFragment fragment, WorkerBee bee) throws FragmentSetupException{
+  public NonRootFragmentManager(PlanFragment fragment, WorkerBee bee) throws ExecutionSetupException {
     try {
       this.fragment = fragment;
       DrillbitContext context = bee.getContext();
@@ -58,7 +59,7 @@ public class NonRootFragmentManager implements FragmentManager {
       this.context.setBuffers(buffers);
       this.runnerListener = new NonRootStatusReporter(this.context, context.getController().getTunnel(fragment.getForeman()));
 
-    } catch (ExecutionSetupException | IOException e) {
+    } catch (ForemanException | IOException e) {
       throw new FragmentSetupException("Failure while decoding fragment.", e);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index 30402b7..854f474 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -17,23 +17,16 @@
  */
 package org.apache.drill.exec.work.user;
 
-import java.util.Random;
-import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
-import org.apache.drill.exec.proto.UserProtos.RequestResults;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
 import org.apache.drill.exec.rpc.Acks;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.SchemaFactory;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.foreman.Foreman;
-import org.apache.drill.exec.work.fragment.FragmentExecutor;
 
 public class UserWorker{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserWorker.class);
@@ -46,7 +39,7 @@ public class UserWorker{
   }
 
   public QueryId submitWork(UserClientConnection connection, RunQuery query) {
-    Random r = new Random();
+    ThreadLocalRandom r = ThreadLocalRandom.current();
 
     // create a new queryid where the first four bytes are a growing time (each new value comes earlier in sequence).  Last 12 bytes are random.
     long time = (int) (System.currentTimeMillis()/1000);
@@ -58,14 +51,6 @@ public class UserWorker{
     return id;
   }
 
-  public QueryResult getResult(UserClientConnection connection, RequestResults req) {
-    Foreman foreman = bee.getForemanForQueryId(req.getQueryId());
-    if (foreman == null) {
-      return QueryResult.newBuilder().setQueryState(QueryState.UNKNOWN_QUERY).build();
-    }
-    return foreman.getResult(connection, req);
-  }
-
   public Ack cancelQuery(QueryId query) {
     Foreman foreman = bee.getForemanForQueryId(query);
     if(foreman != null) {
@@ -74,18 +59,6 @@ public class UserWorker{
     return Acks.OK;
   }
 
-  public Ack cancelFragment(FragmentHandle handle) {
-    FragmentExecutor runner = bee.getFragmentRunner(handle);
-    if (runner != null) {
-      runner.cancel();
-    }
-    return Acks.OK;
-  }
-
-  public SchemaFactory getSchemaFactory() {
-    return bee.getContext().getSchemaFactory();
-  }
-
   public OptionManager getSystemOptions() {
     return bee.getContext().getOptionManager();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
index ce47578..9a32ff9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
 import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
 import org.junit.BeforeClass;
 
 import com.google.common.base.Charsets;
@@ -53,7 +54,7 @@ public abstract class PopUnitTestBase  extends ExecTest{
     return i;
   }
 
-  public Fragment getRootFragment(PhysicalPlanReader reader, String file) throws FragmentSetupException, IOException {
+  public Fragment getRootFragment(PhysicalPlanReader reader, String file) throws FragmentSetupException, IOException, ForemanSetupException {
     MakeFragmentsVisitor f = new MakeFragmentsVisitor();
 
     PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmenter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmenter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmenter.java
index ec8bd94..6491df6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmenter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmenter.java
@@ -27,6 +27,7 @@ import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
 import org.junit.Test;
 
 public class TestFragmenter extends PopUnitTestBase {
@@ -34,7 +35,7 @@ public class TestFragmenter extends PopUnitTestBase {
 
 
   @Test
-  public void ensureOneFragment() throws FragmentSetupException, IOException {
+  public void ensureOneFragment() throws FragmentSetupException, IOException, ForemanSetupException {
     PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance());
     Fragment b = getRootFragment(ppr, "/physical_test1.json");
     assertEquals(1, getFragmentCount(b));
@@ -43,7 +44,7 @@ public class TestFragmenter extends PopUnitTestBase {
   }
 
   @Test
-  public void ensureThreeFragments() throws FragmentSetupException, IOException {
+  public void ensureThreeFragments() throws FragmentSetupException, IOException, ForemanSetupException {
     PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance());
     Fragment b = getRootFragment(ppr, "/physical_double_exchange.json");
     logger.debug("Fragment Node {}", b);


Mime
View raw message