drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [4/7] drill git commit: DRILL-2762: Update Fragment state reporting and error collection
Date Sun, 19 Apr 2015 01:03:39 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java
index 72f4436..e91404f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/TableBuilder.java
@@ -25,23 +25,25 @@ import java.util.Date;
 import java.util.Locale;
 
 class TableBuilder {
-  NumberFormat format = NumberFormat.getInstance(Locale.US);
-  SimpleDateFormat hours = new SimpleDateFormat("HH:mm");
-  SimpleDateFormat shours = new SimpleDateFormat("H:mm");
-  SimpleDateFormat mins = new SimpleDateFormat("mm:ss");
-  SimpleDateFormat smins = new SimpleDateFormat("m:ss");
-
-  SimpleDateFormat secs = new SimpleDateFormat("ss.SSS");
-  SimpleDateFormat ssecs = new SimpleDateFormat("s.SSS");
-  DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
-  DecimalFormat dec = new DecimalFormat("0.00");
-  DecimalFormat intformat = new DecimalFormat("#,###");
-
-  StringBuilder sb;
-  int w = 0;
-  int width;
-
-  public TableBuilder(String[] columns) {
+  private final NumberFormat format = NumberFormat.getInstance(Locale.US);
+  private final SimpleDateFormat days = new SimpleDateFormat("DD'd'hh'h'mm'm'");
+  private final SimpleDateFormat sdays = new SimpleDateFormat("DD'd'hh'h'mm'm'");
+  private final SimpleDateFormat hours = new SimpleDateFormat("HH'h'mm'm'");
+  private final SimpleDateFormat shours = new SimpleDateFormat("H'h'mm'm'");
+  private final SimpleDateFormat mins = new SimpleDateFormat("mm'm'ss's'");
+  private final SimpleDateFormat smins = new SimpleDateFormat("m'm'ss's'");
+
+  private final SimpleDateFormat secs = new SimpleDateFormat("ss.SSS's'");
+  private final SimpleDateFormat ssecs = new SimpleDateFormat("s.SSS's'");
+  private final DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
+  private final DecimalFormat dec = new DecimalFormat("0.00");
+  private final DecimalFormat intformat = new DecimalFormat("#,###");
+
+  private StringBuilder sb;
+  private int w = 0;
+  private int width;
+
+  public TableBuilder(final String[] columns) {
     sb = new StringBuilder();
     width = columns.length;
 
@@ -49,13 +51,13 @@ class TableBuilder {
     format.setMinimumFractionDigits(3);
 
     sb.append("<table class=\"table table-bordered text-right\">\n<tr>");
-    for (String cn : columns) {
+    for (final String cn : columns) {
       sb.append("<th>" + cn + "</th>");
     }
     sb.append("</tr>\n");
   }
 
-  public void appendCell(String s, String link) {
+  public void appendCell(final String s, final String link) {
     if (w == 0) {
       sb.append("<tr>");
     }
@@ -66,22 +68,27 @@ class TableBuilder {
     }
   }
 
-  public void appendRepeated(String s, String link, int n) {
+  public void appendRepeated(final String s, final String link, final int n) {
     for (int i = 0; i < n; i++) {
       appendCell(s, link);
     }
   }
 
-  public void appendTime(long d, String link) {
+  public void appendTime(final long d, final String link) {
     appendCell(dateFormat.format(d), link);
   }
 
-  public void appendMillis(long p, String link) {
-    double secs = p/1000.0;
-    double mins = secs/60;
-    double hours = mins/60;
+  public void appendMillis(final long p, final String link) {
+    final double secs = p/1000.0;
+    final double mins = secs/60;
+    final double hours = mins/60;
+    final double days = hours / 24;
     SimpleDateFormat timeFormat = null;
-    if(hours >= 10){
+    if (days >= 10) {
+      timeFormat = this.days;
+    } else if (days >= 1) {
+      timeFormat = this.sdays;
+    } else if (hours >= 10) {
       timeFormat = this.hours;
     }else if(hours >= 1){
       timeFormat = this.shours;
@@ -97,30 +104,30 @@ class TableBuilder {
     appendCell(timeFormat.format(new Date(p)), null);
   }
 
-  public void appendNanos(long p, String link) {
+  public void appendNanos(final long p, final String link) {
     appendMillis((long) (p / 1000.0 / 1000.0), link);
   }
 
-  public void appendFormattedNumber(Number n, String link) {
+  public void appendFormattedNumber(final Number n, final String link) {
     appendCell(format.format(n), link);
   }
 
-  public void appendFormattedInteger(long n, String link) {
+  public void appendFormattedInteger(final long n, final String link) {
     appendCell(intformat.format(n), link);
   }
 
-  public void appendInteger(long l, String link) {
+  public void appendInteger(final long l, final String link) {
     appendCell(Long.toString(l), link);
   }
 
-  public void appendBytes(long l, String link){
+  public void appendBytes(final long l, final String link){
     appendCell(bytePrint(l), link);
   }
 
-  private String bytePrint(long size){
-    double m = size/Math.pow(1024, 2);
-    double g = size/Math.pow(1024, 3);
-    double t = size/Math.pow(1024, 4);
+  private String bytePrint(final long size){
+    final double m = size/Math.pow(1024, 2);
+    final double g = size/Math.pow(1024, 3);
+    final double t = size/Math.pow(1024, 4);
     if (t > 1) {
       return dec.format(t).concat("TB");
     } else if (g > 1) {

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index e2bcec3..a3ceb8f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -23,7 +23,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.SelfCleaningRunnable;
@@ -47,6 +48,7 @@ import org.apache.drill.exec.store.sys.PStoreProvider;
 import org.apache.drill.exec.work.batch.ControlHandlerImpl;
 import org.apache.drill.exec.work.batch.ControlMessageHandler;
 import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.foreman.QueryManager;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.apache.drill.exec.work.fragment.FragmentManager;
 import org.apache.drill.exec.work.user.UserWorker;
@@ -101,7 +103,17 @@ public class WorkManager implements AutoCloseable {
      * threads that can be created. Ideally, this might be computed based on the number of cores or
      * some similar metric; ThreadPoolExecutor can impose an upper bound, and might be a better choice.
      */
-    executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-"));
+    executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+        new NamedThreadFactory("WorkManager-")) {
+            @Override
+            protected void afterExecute(final Runnable r, final Throwable t) {
+              if(t != null){
+                logger.error("{}.run() leaked an exception.", r.getClass().getName(), t);
+              }
+              super.afterExecute(r, t);
+            }
+      };
+
 
     // TODO references to this escape here (via WorkerBee) before construction is done
     controlMessageWorker = new ControlHandlerImpl(bee); // TODO getFragmentRunner(), getForemanForQueryId()
@@ -125,7 +137,7 @@ public class WorkManager implements AutoCloseable {
                   return runningFragments.size();
                 }
           });
-    } catch (IllegalArgumentException e) {
+    } catch (final IllegalArgumentException e) {
       logger.warn("Exception while registering metrics", e);
     }
   }
@@ -160,7 +172,7 @@ public class WorkManager implements AutoCloseable {
       if (executor != null) {
         executor.awaitTermination(1, TimeUnit.SECONDS);
       }
-    } catch (InterruptedException e) {
+    } catch (final InterruptedException e) {
       logger.warn("Executor interrupted while awaiting termination");
     }
   }
@@ -188,7 +200,7 @@ public class WorkManager implements AutoCloseable {
     while(true) {
       try {
         exitLatch.await(5, TimeUnit.SECONDS);
-      } catch(InterruptedException e) {
+      } catch(final InterruptedException e) {
         // keep waiting
       }
       break;
@@ -263,6 +275,7 @@ public class WorkManager implements AutoCloseable {
         @Override
         protected void cleanup() {
           runningFragments.remove(fragmentHandle);
+          workBus.removeFragmentManager(fragmentHandle);
           indicateIfSafeToExit();
         }
       });
@@ -289,28 +302,29 @@ public class WorkManager implements AutoCloseable {
     @Override
     public void run() {
       while(true) {
+        final Controller controller = dContext.getController();
         final List<DrillRpcFuture<Ack>> futures = Lists.newArrayList();
-        for(FragmentExecutor fragmentExecutor : runningFragments.values()) {
+        for(final FragmentExecutor fragmentExecutor : runningFragments.values()) {
           final FragmentStatus status = fragmentExecutor.getStatus();
           if (status == null) {
             continue;
           }
 
           final DrillbitEndpoint ep = fragmentExecutor.getContext().getForemanEndpoint();
-          futures.add(dContext.getController().getTunnel(ep).sendFragmentStatus(status));
+          futures.add(controller.getTunnel(ep).sendFragmentStatus(status));
         }
 
-        for(DrillRpcFuture<Ack> future : futures) {
+        for(final DrillRpcFuture<Ack> future : futures) {
           try {
             future.checkedGet();
-          } catch(RpcException ex) {
+          } catch(final RpcException ex) {
             logger.info("Failure while sending intermediate fragment status to Foreman", ex);
           }
         }
 
         try {
           Thread.sleep(STATUS_PERIOD_SECONDS * 1000);
-        } catch(InterruptedException e) {
+        } catch(final InterruptedException e) {
           // exit status thread on interrupt.
           break;
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index 3a7123d..3fb0775 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.Acks;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.RpcConstants;
@@ -137,10 +138,10 @@ public class ControlHandlerImpl implements ControlMessageHandler {
         drillbitContext.getWorkBus().addFragmentManager(manager);
       }
 
-    } catch (Exception e) {
+    } catch (final Exception e) {
         throw new UserRpcException(drillbitContext.getEndpoint(),
             "Failure while trying to start remote fragment", e);
-    } catch (OutOfMemoryError t) {
+    } catch (final OutOfMemoryError t) {
       if (t.getMessage().startsWith("Direct buffer")) {
         throw new UserRpcException(drillbitContext.getEndpoint(),
             "Out of direct memory while trying to start remote fragment", t);
@@ -171,19 +172,24 @@ public class ControlHandlerImpl implements ControlMessageHandler {
   }
 
   private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {
+
     final FragmentManager manager =
         bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
 
     FragmentExecutor executor;
     if (manager != null) {
-      executor = manager.getRunnable();
+      manager.receivingFragmentFinished(finishedReceiver.getReceiver());
     } else {
       // then try local cancel.
       executor = bee.getFragmentRunner(finishedReceiver.getSender());
-    }
-
-    if (executor != null) {
-      executor.receivingFragmentFinished(finishedReceiver.getReceiver());
+      if (executor != null) {
+        executor.receivingFragmentFinished(finishedReceiver.getReceiver());
+      } else {
+        logger.warn(
+            "Dropping request for early fragment termination for path {} -> {} as path to executor unavailable.",
+            QueryIdHelper.getFragmentId(finishedReceiver.getSender()),
+            QueryIdHelper.getFragmentId(finishedReceiver.getReceiver()));
+      }
     }
 
     return Acks.OK;

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 2430e64..85262de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -47,10 +47,10 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
   private final AtomicBoolean outOfMemory = new AtomicBoolean(false);
   private final ResponseSenderQueue readController = new ResponseSenderQueue();
   private int streamCounter;
-  private int fragmentCount;
-  private FragmentContext context;
+  private final int fragmentCount;
+  private final FragmentContext context;
 
-  public UnlimitedRawBatchBuffer(FragmentContext context, int fragmentCount) {
+  public UnlimitedRawBatchBuffer(final FragmentContext context, final int fragmentCount) {
     bufferSizePerSocket = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE);
 
     this.softlimit = bufferSizePerSocket * fragmentCount;
@@ -63,7 +63,14 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
   }
 
   @Override
-  public void enqueue(RawFragmentBatch batch) throws IOException {
+  public void enqueue(final RawFragmentBatch batch) throws IOException {
+
+    // if this fragment is already canceled or failed, we shouldn't need any or more stuff. We do the null check to
+    // ensure that tests run.
+    if (context != null && !context.shouldContinue()) {
+      this.kill(context);
+    }
+
     if (isFinished()) {
       if (state == BufferState.KILLED) {
         // do not even enqueue just release and send ack back
@@ -76,8 +83,8 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
     }
     if (batch.getHeader().getIsOutOfMemory()) {
       logger.trace("Setting autoread false");
-      RawFragmentBatch firstBatch = buffer.peekFirst();
-      FragmentRecordBatch header = firstBatch == null ? null :firstBatch.getHeader();
+      final RawFragmentBatch firstBatch = buffer.peekFirst();
+      final FragmentRecordBatch header = firstBatch == null ? null :firstBatch.getHeader();
       if (!outOfMemory.get() && !(header == null) && header.getIsOutOfMemory()) {
         buffer.addFirst(batch);
       }
@@ -96,16 +103,16 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
 
   @Override
   public void cleanup() {
-    if (!isFinished() && !context.isCancelled()) {
-      String msg = String.format("Cleanup before finished. " + (fragmentCount - streamCounter) + " out of " + fragmentCount + " streams have finished.");
+    if (!isFinished() && context.shouldContinue()) {
+      final String msg = String.format("Cleanup before finished. " + (fragmentCount - streamCounter) + " out of " + fragmentCount + " streams have finished.");
       logger.error(msg);
-      IllegalStateException e = new IllegalStateException(msg);
+      final IllegalStateException e = new IllegalStateException(msg);
       context.fail(e);
       throw e;
     }
 
     if (!buffer.isEmpty()) {
-      if (!context.isFailed() && !context.isCancelled()) {
+      if (context.shouldContinue()) {
         context.fail(new IllegalStateException("Batches still in queue during cleanup"));
         logger.error("{} Batches in queue.", buffer.size());
       }
@@ -114,7 +121,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
   }
 
   @Override
-  public void kill(FragmentContext context) {
+  public void kill(final FragmentContext context) {
     state = BufferState.KILLED;
     clearBufferWithBody();
   }
@@ -125,7 +132,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
    */
   private void clearBufferWithBody() {
     while (!buffer.isEmpty()) {
-      RawFragmentBatch batch = buffer.poll();
+      final RawFragmentBatch batch = buffer.poll();
       if (batch.getBody() != null) {
         batch.getBody().release();
       }
@@ -160,7 +167,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
     if (b == null && (!isFinished() || !buffer.isEmpty())) {
       try {
         b = buffer.take();
-      } catch (InterruptedException e) {
+      } catch (final InterruptedException e) {
         return null;
         // TODO InterruptedException
       }
@@ -175,9 +182,9 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
     // try to flush the difference between softlimit and queue size, so every flush we are reducing backlog
     // when queue size is lower then softlimit - the bigger the difference the more we can flush
     if (!isFinished() && overlimit.get()) {
-      int flushCount = softlimit - buffer.size();
+      final int flushCount = softlimit - buffer.size();
       if ( flushCount > 0 ) {
-        int flushed = readController.flushResponses(flushCount);
+        final int flushed = readController.flushResponses(flushCount);
         logger.trace("flush {} entries, flushed {} entries ", flushCount, flushed);
         if ( flushed == 0 ) {
           // queue is empty - nothing to do for now

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index f824b53..d94b9f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -23,15 +23,15 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Preconditions;
-
 import org.apache.drill.common.EventProcessor;
+import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
 import org.apache.drill.exec.ExecConstants;
@@ -77,8 +77,10 @@ import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.apache.drill.exec.work.fragment.RootFragmentManager;
 import org.codehaus.jackson.map.ObjectMapper;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
 
 /**
  * Foreman manages all the fragments (local and remote) for a single query where this
@@ -99,6 +101,7 @@ import com.google.common.collect.Multimap;
 public class Foreman implements Runnable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
   private final static ExceptionInjector injector = ExceptionInjector.getInjector(Foreman.class);
+  private static final int RPC_WAIT_IN_SECONDS = 90;
 
   private final QueryId queryId;
   private final RunQuery queryRequest;
@@ -113,7 +116,7 @@ public class Foreman implements Runnable {
 
   private FragmentExecutor rootRunner; // root Fragment
 
-  private final CountDownLatch acceptExternalEvents = new CountDownLatch(1); // gates acceptance of external events
+  private final ExtendedLatch acceptExternalEvents = new ExtendedLatch(); // gates acceptance of external events
   private final StateListener stateListener = new StateListener(); // source of external events
   private final ResponseSendListener responseListener = new ResponseSendListener();
   private final StateSwitch stateSwitch = new StateSwitch();
@@ -204,12 +207,12 @@ public class Foreman implements Runnable {
         throw new IllegalStateException();
       }
       injector.injectChecked(drillbitContext, "run-try-end", ForemanException.class);
-    } catch (ForemanException e) {
+    } catch (final ForemanException e) {
       moveToState(QueryState.FAILED, e);
     } catch (AssertionError | Exception ex) {
       moveToState(QueryState.FAILED,
           new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex));
-    } catch (OutOfMemoryError e) {
+    } catch (final OutOfMemoryError e) {
       /*
        * FragmentExecutors use a DrillbitStatusListener to watch out for the death of their query's Foreman.
        * So, if we die here, they should get notified about that, and cancel themselves; we don't have to
@@ -255,9 +258,9 @@ public class Foreman implements Runnable {
       try {
         lease.close();
         lease = null;
-      } catch (InterruptedException e) {
+      } catch (final InterruptedException e) {
         // if we end up here, the while loop will try again
-      } catch (Exception e) {
+      } catch (final Exception e) {
         logger.warn("Failure while releasing lease.", e);
         break;
       }
@@ -268,7 +271,7 @@ public class Foreman implements Runnable {
     LogicalPlan logicalPlan;
     try {
       logicalPlan = drillbitContext.getPlanReader().readLogicalPlan(json);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new ForemanException("Failure parsing logical plan.", e);
     }
 
@@ -299,9 +302,9 @@ public class Foreman implements Runnable {
   private void log(final PhysicalPlan plan) {
     if (logger.isDebugEnabled()) {
       try {
-        String planText = queryContext.getConfig().getMapper().writeValueAsString(plan);
+        final String planText = queryContext.getConfig().getMapper().writeValueAsString(plan);
         logger.debug("Physical {}", planText);
-      } catch (IOException e) {
+      } catch (final IOException e) {
         logger.warn("Error while attempting to log physical plan.", e);
       }
     }
@@ -324,7 +327,7 @@ public class Foreman implements Runnable {
     try {
       final PhysicalPlan plan = drillbitContext.getPlanReader().readPhysicalPlan(json);
       runPhysicalPlan(plan);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new ForemanSetupException("Failure while parsing physical plan.", e);
     }
   }
@@ -339,8 +342,8 @@ public class Foreman implements Runnable {
     final PlanFragment rootPlanFragment = work.getRootFragment();
     assert queryId == rootPlanFragment.getHandle().getQueryId();
 
-    drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager);
-    drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager);
+    drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener());
+    drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
 
     logger.debug("Submitting fragments to run.");
 
@@ -365,7 +368,7 @@ public class Foreman implements Runnable {
   private void setupSortMemoryAllocations(final PhysicalPlan plan) {
     // look for external sorts
     final List<ExternalSort> sortList = new LinkedList<>();
-    for (PhysicalOperator op : plan.getSortedOperators()) {
+    for (final PhysicalOperator op : plan.getSortedOperators()) {
       if (op instanceof ExternalSort) {
         sortList.add((ExternalSort) op);
       }
@@ -382,7 +385,7 @@ public class Foreman implements Runnable {
       final long maxSortAlloc = maxAllocPerNode / (sortList.size() * maxWidthPerNode);
       logger.debug("Max sort alloc: {}", maxSortAlloc);
 
-      for(ExternalSort externalSort : sortList) {
+      for(final ExternalSort externalSort : sortList) {
         externalSort.setMaxAllocation(maxSortAlloc);
       }
     }
@@ -403,7 +406,7 @@ public class Foreman implements Runnable {
     if (queuingEnabled) {
       final long queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_KEY).num_val;
       double totalCost = 0;
-      for (PhysicalOperator ops : plan.getSortedOperators()) {
+      for (final PhysicalOperator ops : plan.getSortedOperators()) {
         totalCost += ops.getCost();
       }
 
@@ -423,7 +426,7 @@ public class Foreman implements Runnable {
 
         final long queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT_KEY).num_val;
         lease = distributedSemaphore.acquire(queueTimeout, TimeUnit.MILLISECONDS);
-      } catch (Exception e) {
+      } catch (final Exception e) {
         throw new ForemanSetupException("Unable to acquire slot for query.", e);
       }
     }
@@ -447,7 +450,7 @@ public class Foreman implements Runnable {
       final List<PlanFragment> planFragments = queryWorkUnit.getFragments();
       final int fragmentCount = planFragments.size();
       int fragmentIndex = 0;
-      for(PlanFragment planFragment : planFragments) {
+      for(final PlanFragment planFragment : planFragments) {
         final FragmentHandle fragmentHandle = planFragment.getHandle();
         sb.append("PlanFragment(");
         sb.append(++fragmentIndex);
@@ -471,7 +474,7 @@ public class Foreman implements Runnable {
         {
           final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class);
           jsonString = objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json);
-        } catch(Exception e) {
+        } catch(final Exception e) {
           // we've already set jsonString to a fallback value
         }
         sb.append(jsonString);
@@ -567,7 +570,7 @@ public class Foreman implements Runnable {
 
       try {
         autoCloseable.close();
-      } catch(Exception e) {
+      } catch(final Exception e) {
         /*
          * Even if the query completed successfully, we'll still report failure if we have
          * problems cleaning up.
@@ -582,11 +585,11 @@ public class Foreman implements Runnable {
       Preconditions.checkState(!isClosed);
       Preconditions.checkState(resultState != null);
 
-      logger.info("foreman cleaning up - status: {}", queryManager.getFragmentStatesAsString());
+      logger.info("foreman cleaning up.");
 
       // These are straight forward removals from maps, so they won't throw.
       drillbitContext.getWorkBus().removeFragmentStatusListener(queryId);
-      drillbitContext.getClusterCoordinator().removeDrillbitStatusListener(queryManager);
+      drillbitContext.getClusterCoordinator().removeDrillbitStatusListener(queryManager.getDrillbitStatusListener());
 
       suppressingClose(queryContext);
 
@@ -612,8 +615,8 @@ public class Foreman implements Runnable {
           .setQueryId(queryId)
           .setQueryState(resultState);
       if (resultException != null) {
-        boolean verbose = queryContext.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
-        UserException uex = UserException.systemError(resultException).addIdentity(queryContext.getCurrentEndpoint()).build();
+        final boolean verbose = queryContext.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
+        final UserException uex = UserException.systemError(resultException).addIdentity(queryContext.getCurrentEndpoint()).build();
         resultBuilder.addError(uex.getOrCreatePBError(verbose));
       }
 
@@ -626,7 +629,7 @@ public class Foreman implements Runnable {
       try {
         // send whatever result we ended up with
         initiatingClient.sendResult(responseListener, resultBuilder.build(), true);
-      } catch(Exception e) {
+      } catch(final Exception e) {
         addException(e);
         logger.warn("Exception sending result to client", resultException);
       }
@@ -658,7 +661,7 @@ public class Foreman implements Runnable {
     }
 
     @Override
-    protected void processEvent(StateEvent event) {
+    protected void processEvent(final StateEvent event) {
       final QueryState newState = event.newState;
       final Exception exception = event.exception;
 
@@ -803,7 +806,7 @@ public class Foreman implements Runnable {
     queryManager.addFragmentStatusTracker(rootFragment, true);
 
     rootRunner = new FragmentExecutor(rootContext, rootOperator,
-        queryManager.getRootStatusHandler(rootContext));
+        queryManager.newRootStatusHandler(rootContext));
     final RootFragmentManager fragmentManager =
         new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
 
@@ -836,7 +839,7 @@ public class Foreman implements Runnable {
     final Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create();
 
     // record all fragments for status purposes.
-    for (PlanFragment planFragment : fragments) {
+    for (final PlanFragment planFragment : fragments) {
       logger.trace("Tracking intermediate remote node {} with data {}",
                    planFragment.getAssignment(), planFragment.getFragmentJson());
       queryManager.addFragmentStatusTracker(planFragment, false);
@@ -855,40 +858,53 @@ public class Foreman implements Runnable {
      * count down (see FragmentSubmitFailures), but we count the number of failures so that we'll
      * know if any submissions did fail.
      */
-    final CountDownLatch endpointLatch = new CountDownLatch(intFragmentMap.keySet().size());
+    final ExtendedLatch endpointLatch = new ExtendedLatch(intFragmentMap.keySet().size());
     final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures();
 
     // send remote intermediate fragments
-    for (DrillbitEndpoint ep : intFragmentMap.keySet()) {
+    for (final DrillbitEndpoint ep : intFragmentMap.keySet()) {
       sendRemoteFragments(ep, intFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures);
     }
 
-    // wait for the status of all requests sent above to be known
-    boolean ready = false;
-    while(!ready) {
-      try {
-        endpointLatch.await();
-        ready = true;
-      } catch (InterruptedException e) {
-        // if we weren't ready, the while loop will continue to wait
-      }
+    if(!endpointLatch.awaitUninterruptibly(RPC_WAIT_IN_SECONDS * 1000)){
+      long numberRemaining = endpointLatch.getCount();
+      throw UserException.connectionError()
+          .message(
+              "Exceeded timeout while waiting send intermediate work fragments to remote nodes.  Sent %d and only heard response back from %d nodes.",
+              intFragmentMap.keySet().size(), intFragmentMap.keySet().size() - numberRemaining)
+          .build();
     }
 
+
     // if any of the intermediate fragment submissions failed, fail the query
-    final List<FragmentSubmitFailures.SubmissionException> submissionExceptions =
-        fragmentSubmitFailures.submissionExceptions;
+    final List<FragmentSubmitFailures.SubmissionException> submissionExceptions = fragmentSubmitFailures.submissionExceptions;
     if (submissionExceptions.size() > 0) {
-      throw new ForemanSetupException("Error setting up remote intermediate fragment execution",
-          submissionExceptions.get(0).rpcException);
-      // TODO indicate the failing drillbit?
-      // TODO report on all the failures?
+      Set<DrillbitEndpoint> endpoints = Sets.newHashSet();
+      StringBuilder sb = new StringBuilder();
+      boolean first = true;
+
+      for (FragmentSubmitFailures.SubmissionException e : fragmentSubmitFailures.submissionExceptions) {
+        DrillbitEndpoint endpoint = e.drillbitEndpoint;
+        if (endpoints.add(endpoint)) {
+          if (first) {
+            first = false;
+          } else {
+            sb.append(", ");
+          }
+          sb.append(endpoint.getAddress());
+        }
+      }
+      throw UserException.connectionError(submissionExceptions.get(0).rpcException)
+          .message("Error setting up remote intermediate fragment execution")
+          .addContext("Nodes with failures", sb.toString())
+          .build();
     }
 
     /*
      * Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through
      * the regular sendListener event delivery.
      */
-    for (DrillbitEndpoint ep : leafFragmentMap.keySet()) {
+    for (final DrillbitEndpoint ep : leafFragmentMap.keySet()) {
       sendRemoteFragments(ep, leafFragmentMap.get(ep), null, null);
     }
   }
@@ -906,7 +922,7 @@ public class Foreman implements Runnable {
     @SuppressWarnings("resource")
     final Controller controller = drillbitContext.getController();
     final InitializeFragments.Builder fb = InitializeFragments.newBuilder();
-    for(PlanFragment planFragment : fragments) {
+    for(final PlanFragment planFragment : fragments) {
       fb.addFragment(planFragment);
     }
     final InitializeFragments initFrags = fb.build();
@@ -926,12 +942,12 @@ public class Foreman implements Runnable {
    */
   private static class FragmentSubmitFailures {
     static class SubmissionException {
-//      final DrillbitEndpoint drillbitEndpoint;
+      final DrillbitEndpoint drillbitEndpoint;
       final RpcException rpcException;
 
       SubmissionException(@SuppressWarnings("unused") final DrillbitEndpoint drillbitEndpoint,
           final RpcException rpcException) {
-//        this.drillbitEndpoint = drillbitEndpoint;
+        this.drillbitEndpoint = drillbitEndpoint;
         this.rpcException = rpcException;
       }
     }
@@ -999,16 +1015,7 @@ public class Foreman implements Runnable {
      *   to the user
      */
     public void moveToState(final QueryState newState, final Exception ex) {
-      boolean ready = false;
-      while(!ready) {
-        try {
-          acceptExternalEvents.await();
-          ready = true;
-        } catch(InterruptedException e) {
-          // if we're still not ready, the while loop will cause us to wait again
-          logger.warn("Interrupted while waiting to move state.", e);
-        }
-      }
+      acceptExternalEvents.awaitUninterruptibly();
 
       Foreman.this.moveToState(newState, ex);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
index 433ab26..ceb77f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
@@ -22,11 +22,16 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.FragmentState;
 import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
+import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 
 public class FragmentData {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentData.class);
+
   private final boolean isLocal;
   private volatile FragmentStatus status;
-  private volatile long lastStatusUpdate = 0;
+  private volatile long lastStatusUpdate = System.currentTimeMillis();
+  private volatile long lastProgress = System.currentTimeMillis();
   private final DrillbitEndpoint endpoint;
 
   public FragmentData(final FragmentHandle handle, final DrillbitEndpoint endpoint, final boolean isLocal) {
@@ -43,13 +48,45 @@ public class FragmentData {
         .build();
   }
 
-  public void setStatus(final FragmentStatus status) {
-    this.status = status;
-    lastStatusUpdate = System.currentTimeMillis();
+  /**
+   * Update the status for this fragment.  Also records last update and last progress time.
+   * @param status Updated status
+   * @return Whether or not the status update resulted in a FragmentState change.
+   */
+  public boolean setStatus(final FragmentStatus newStatus) {
+    final long time = System.currentTimeMillis();
+    final FragmentState oldState = status.getProfile().getState();
+    final boolean inTerminalState = oldState == FragmentState.FAILED || oldState == FragmentState.FINISHED || oldState == FragmentState.CANCELLED;
+    final FragmentState currentState = newStatus.getProfile().getState();
+    final boolean stateChanged = currentState != oldState;
+
+    if (inTerminalState) {
+      // already in a terminal state. This shouldn't happen.
+      logger.warn(String.format("Received status message for fragment %s after fragment was in state %s. New state was %s",
+          QueryIdHelper.getQueryIdentifier(getHandle()), oldState, currentState));
+      return false;
+    }
+
+    this.lastStatusUpdate = time;
+    if (madeProgress(status, newStatus)) {
+      this.lastProgress = time;
+    }
+    status = newStatus;
+
+    return stateChanged;
+  }
+
+  public FragmentState getState() {
+    return status.getProfile().getState();
   }
 
-  public FragmentStatus getStatus() {
-    return status;
+  public MinorFragmentProfile getProfile() {
+    return status
+        .getProfile()
+        .toBuilder()
+        .setLastUpdate(lastStatusUpdate)
+        .setLastProgress(lastProgress)
+        .build();
   }
 
   public boolean isLocal() {
@@ -64,6 +101,34 @@ public class FragmentData {
     return status.getHandle();
   }
 
+  private boolean madeProgress(final FragmentStatus prev, final FragmentStatus cur) {
+    final MinorFragmentProfile previous = prev.getProfile();
+    final MinorFragmentProfile current = cur.getProfile();
+
+    if (previous.getState() != current.getState()) {
+      return true;
+    }
+
+    if (previous.getOperatorProfileCount() != current.getOperatorProfileCount()) {
+      return true;
+    }
+
+    for(int i =0; i < current.getOperatorProfileCount(); i++){
+      if (madeProgress(previous.getOperatorProfile(i), current.getOperatorProfile(i))) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  private boolean madeProgress(final OperatorProfile prev, final OperatorProfile cur) {
+    return prev.getInputProfileCount() != cur.getInputProfileCount()
+        || !prev.getInputProfileList().equals(cur.getInputProfileList())
+        || prev.getMetricCount() != cur.getMetricCount()
+        || !prev.getMetricList().equals(cur.getMetricList());
+  }
+
   @Override
   public String toString() {
     return "FragmentData [isLocal=" + isLocal + ", status=" + status + ", lastStatusUpdate=" + lastStatusUpdate

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 31b1f2b..34fa639 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -21,11 +21,12 @@ import io.netty.buffer.ByteBuf;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -50,15 +51,17 @@ import org.apache.drill.exec.work.EndpointListener;
 import org.apache.drill.exec.work.foreman.Foreman.StateListener;
 import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
 import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import org.apache.drill.exec.work.fragment.StatusReporter;
 
 import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import com.google.common.collect.Maps;
 
 /**
  * Each Foreman holds its own QueryManager.  This manages the events associated with execution of a particular query across all fragments.
  */
-public class QueryManager implements FragmentStatusListener, DrillbitStatusListener {
+public class QueryManager {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
 
   public static final PStoreConfig<QueryProfile> QUERY_PROFILE = PStoreConfig.
@@ -74,7 +77,7 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
       .ephemeral()
       .build();
 
-  private final Set<DrillbitEndpoint> includedBits;
+  private final Map<DrillbitEndpoint, NodeTracker> nodeMap = Maps.newHashMap();
   private final StateListener stateListener;
   private final QueryId queryId;
   private final String stringQueryId;
@@ -94,8 +97,13 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
 
   // the following mutable variables are used to capture ongoing query status
   private String planText;
-  private long startTime;
+  private long startTime = System.currentTimeMillis();
   private long endTime;
+
+  // How many nodes have finished their execution.  Query is complete when all nodes are complete.
+  private final AtomicInteger finishedNodes = new AtomicInteger(0);
+
+  // How many fragments have finished their execution.
   private final AtomicInteger finishedFragments = new AtomicInteger(0);
 
   public QueryManager(final QueryId queryId, final RunQuery runQuery, final PStoreProvider pStoreProvider,
@@ -109,83 +117,27 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
     try {
       profilePStore = pStoreProvider.getStore(QUERY_PROFILE);
       profileEStore = pStoreProvider.getStore(RUNNING_QUERY_INFO);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new DrillRuntimeException(e);
     }
-
-    includedBits = Sets.newHashSet();
-  }
-
-  @Override
-  public void drillbitRegistered(final Set<DrillbitEndpoint> registeredDrillbits) {
-  }
-
-  @Override
-  public void drillbitUnregistered(final Set<DrillbitEndpoint> unregisteredDrillbits) {
-    for(DrillbitEndpoint ep : unregisteredDrillbits) {
-      if (includedBits.contains(ep)) {
-        logger.warn("Drillbit {} no longer registered in cluster.  Canceling query {}",
-            ep.getAddress() + ep.getControlPort(), QueryIdHelper.getQueryId(queryId));
-        stateListener.moveToState(QueryState.FAILED,
-            new ForemanException("One more more nodes lost connectivity during query.  Identified node was "
-                + ep.getAddress()));
-      }
-    }
   }
 
-  @Override
-  public void statusUpdate(final FragmentStatus status) {
-    logger.debug("New fragment status was provided to QueryManager of {}", status);
-    switch(status.getProfile().getState()) {
-    case AWAITING_ALLOCATION:
-    case RUNNING:
-      updateFragmentStatus(status);
-      break;
-
-    case FINISHED:
-      fragmentDone(status);
-      break;
-
-    case CANCELLED:
-      /*
-       * TODO
-       * This doesn't seem right; shouldn't this be similar to FAILED?
-       * and this means once all are cancelled we'll get to COMPLETED, even though some weren't?
-       *
-       * So, we add it to the finishedFragments if we ourselves we receive a statusUpdate (from where),
-       * but not if our cancellation listener gets it?
-       */
-      // TODO(DRILL-2370) we might not get these, so we need to take extra care for cleanup
-      fragmentDone(status);
-      break;
-
-    case FAILED:
-      stateListener.moveToState(QueryState.FAILED, new UserRemoteException(status.getProfile().getError()));
-      break;
-
-    default:
-      throw new UnsupportedOperationException(String.format("Received status of %s", status));
-    }
-  }
-
-  private void updateFragmentStatus(final FragmentStatus fragmentStatus) {
+  private boolean updateFragmentStatus(final FragmentStatus fragmentStatus) {
     final FragmentHandle fragmentHandle = fragmentStatus.getHandle();
     final int majorFragmentId = fragmentHandle.getMajorFragmentId();
     final int minorFragmentId = fragmentHandle.getMinorFragmentId();
-    fragmentDataMap.get(majorFragmentId).get(minorFragmentId).setStatus(fragmentStatus);
+    final FragmentData data = fragmentDataMap.get(majorFragmentId).get(minorFragmentId);
+    return data.setStatus(fragmentStatus);
   }
 
   private void fragmentDone(final FragmentStatus status) {
-    updateFragmentStatus(status);
+    final boolean stateChanged = updateFragmentStatus(status);
 
-    final int finishedFragments = this.finishedFragments.incrementAndGet();
-    final int totalFragments = fragmentDataSet.size();
-    assert finishedFragments <= totalFragments : "The finished fragment count exceeds the total fragment count";
-    final int remaining = totalFragments - finishedFragments;
-    logger.debug("waiting for {} fragments", remaining);
-    if (remaining == 0) {
-      // this target state may be adjusted in moveToState() based on current FAILURE/CANCELLATION_REQUESTED status
-      stateListener.moveToState(QueryState.COMPLETED, null);
+    if (stateChanged) {
+      // since we're in the fragment done clause and this was a change from previous
+      final NodeTracker node = nodeMap.get(status.getProfile().getEndpoint());
+      node.fragmentComplete();
+      finishedFragments.incrementAndGet();
     }
   }
 
@@ -201,9 +153,6 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
     }
     minorMap.put(minorFragmentId, fragmentData);
     fragmentDataSet.add(fragmentData);
-
-    // keep track of all the drill bits that are used by this query
-    includedBits.add(fragmentData.getEndpoint());
   }
 
   public String getFragmentStatesAsString() {
@@ -211,7 +160,16 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
   }
 
   void addFragmentStatusTracker(final PlanFragment fragment, final boolean isRoot) {
-    addFragment(new FragmentData(fragment.getHandle(), fragment.getAssignment(), isRoot));
+    final DrillbitEndpoint assignment = fragment.getAssignment();
+
+    NodeTracker tracker = nodeMap.get(assignment);
+    if (tracker == null) {
+      tracker = new NodeTracker(assignment);
+      nodeMap.put(assignment, tracker);
+    }
+
+    tracker.addFragment();
+    addFragment(new FragmentData(fragment.getHandle(), assignment, isRoot));
   }
 
   /**
@@ -219,23 +177,23 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
    */
   void cancelExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
     final Controller controller = drillbitContext.getController();
-    for(FragmentData data : fragmentDataSet) {
-      final FragmentStatus fragmentStatus = data.getStatus();
-      switch(fragmentStatus.getProfile().getState()) {
+    for(final FragmentData data : fragmentDataSet) {
+      switch(data.getState()) {
       case SENDING:
       case AWAITING_ALLOCATION:
       case RUNNING:
-        if (rootRunner != null) {
+        if (rootRunner.getContext().getHandle().equals(data.getHandle())) {
             rootRunner.cancel();
         } else {
           final DrillbitEndpoint endpoint = data.getEndpoint();
-          final FragmentHandle handle = fragmentStatus.getHandle();
+          final FragmentHandle handle = data.getHandle();
           // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
           controller.getTunnel(endpoint).cancelFragment(new CancelListener(endpoint, handle), handle);
         }
         break;
 
       case FINISHED:
+      case CANCELLATION_REQUESTED:
       case CANCELLED:
       case FAILED:
         // nothing to do
@@ -267,21 +225,6 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
     }
   }
 
-  public RootStatusReporter getRootStatusHandler(final FragmentContext context) {
-    return new RootStatusReporter(context);
-  }
-
-  class RootStatusReporter extends AbstractStatusReporter {
-    private RootStatusReporter(final FragmentContext context) {
-      super(context);
-    }
-
-    @Override
-    protected void statusChange(final FragmentHandle handle, final FragmentStatus status) {
-      statusUpdate(status);
-    }
-  }
-
   QueryState updateQueryStateInStore(final QueryState queryState) {
     switch (queryState) {
       case PENDING:
@@ -295,7 +238,7 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
       case FAILED:
         try {
           profileEStore.delete(stringQueryId);
-        } catch(Exception e) {
+        } catch(final Exception e) {
           logger.warn("Failure while trying to delete the estore profile for this query.", e);
         }
 
@@ -345,7 +288,7 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
         for (int v = 0; v < minorMap.allocated.length; v++) {
           if (minorMap.allocated[v]) {
             final FragmentData data = (FragmentData) ((Object[]) minorMap.values)[v];
-            fb.addMinorFragmentProfile(data.getStatus().getProfile());
+            fb.addMinorFragmentProfile(data.getProfile());
           }
         }
         profileBuilder.addFragmentProfile(fb);
@@ -366,4 +309,159 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe
   void markEndTime() {
     endTime = System.currentTimeMillis();
   }
+
+  /**
+   * Internal class used to track the number of pending completion messages required from particular node. This allows
+   * to know for each node that is part of this query, what portion of fragments are still outstanding. In the case that
+   * there is a node failure, we can then correctly track how many outstanding messages will never arrive.
+   */
+  private class NodeTracker {
+    private final DrillbitEndpoint endpoint;
+    private final AtomicInteger totalFragments = new AtomicInteger(0);
+    private final AtomicInteger completedFragments = new AtomicInteger(0);
+
+    public NodeTracker(final DrillbitEndpoint endpoint) {
+      this.endpoint = endpoint;
+    }
+
+    /**
+     * Increments the number of fragment this node is running.
+     */
+    public void addFragment() {
+      totalFragments.incrementAndGet();
+    }
+
+    /**
+     * Increments the number of fragments completed on this node.  Once the number of fragments completed
+     * equals the number of fragments running, this will be marked as a finished node and result in the finishedNodes being incremented.
+     *
+     * If the number of remaining nodes has been decremented to zero, this will allow the query to move to a completed state.
+     */
+    public void fragmentComplete() {
+      if (totalFragments.get() == completedFragments.incrementAndGet()) {
+        nodeComplete();
+      }
+    }
+
+    /**
+     * Increments the number of fragments completed on this node until we mark this node complete. Note that this uses
+     * the internal fragmentComplete() method so whether we have failure or success, the nodeComplete event will only
+     * occur once. (Two threads could be decrementing the fragment at the same time since this will likely come from an
+     * external event).
+     */
+    public void nodeDead() {
+      while (completedFragments.get() < totalFragments.get()) {
+        fragmentComplete();
+      }
+    }
+
+  }
+
+  /**
+   * Increments the number of currently complete nodes and returns the number of completed nodes. If the there are no
+   * more pending nodes, moves the query to a terminal state.
+   */
+  private void nodeComplete() {
+    final int finishedNodes = this.finishedNodes.incrementAndGet();
+    final int totalNodes = nodeMap.size();
+    Preconditions.checkArgument(finishedNodes <= totalNodes, "The finished node count exceeds the total node count");
+    final int remaining = totalNodes - finishedNodes;
+    if (remaining == 0) {
+      // this target state may be adjusted in moveToState() based on current FAILURE/CANCELLATION_REQUESTED status
+      stateListener.moveToState(QueryState.COMPLETED, null);
+    } else {
+      logger.debug("Foreman is still waiting for completion message from {} nodes containing {} fragments", remaining,
+          this.fragmentDataSet.size() - finishedFragments.get());
+    }
+  }
+
+  public StatusReporter newRootStatusHandler(final FragmentContext context) {
+    return new RootStatusReporter(context);
+  }
+
+  private class RootStatusReporter extends AbstractStatusReporter {
+    private RootStatusReporter(final FragmentContext context) {
+      super(context);
+    }
+
+    @Override
+    protected void statusChange(final FragmentHandle handle, final FragmentStatus status) {
+      fragmentStatusListener.statusUpdate(status);
+    }
+  }
+
+  public FragmentStatusListener getFragmentStatusListener(){
+    return fragmentStatusListener;
+  }
+
+  private final FragmentStatusListener fragmentStatusListener = new FragmentStatusListener() {
+    @Override
+    public void statusUpdate(final FragmentStatus status) {
+      logger.debug("New fragment status was provided to QueryManager of {}", status);
+      switch(status.getProfile().getState()) {
+      case AWAITING_ALLOCATION:
+      case RUNNING:
+      case CANCELLATION_REQUESTED:
+        updateFragmentStatus(status);
+        break;
+
+      case FAILED:
+        stateListener.moveToState(QueryState.FAILED, new UserRemoteException(status.getProfile().getError()));
+        // fall-through.
+      case FINISHED:
+      case CANCELLED:
+        fragmentDone(status);
+        break;
+
+      default:
+        throw new UnsupportedOperationException(String.format("Received status of %s", status));
+      }
+    }
+  };
+
+
+  public DrillbitStatusListener getDrillbitStatusListener() {
+    return drillbitStatusListener;
+  }
+
+  private final DrillbitStatusListener drillbitStatusListener = new DrillbitStatusListener(){
+
+    @Override
+    public void drillbitRegistered(final Set<DrillbitEndpoint> registeredDrillbits) {
+    }
+
+    @Override
+    public void drillbitUnregistered(final Set<DrillbitEndpoint> unregisteredDrillbits) {
+      final StringBuilder failedNodeList = new StringBuilder();
+      boolean atLeastOneFailure = false;
+
+      for(final DrillbitEndpoint ep : unregisteredDrillbits) {
+        final NodeTracker tracker = nodeMap.get(ep);
+        if (tracker != null) {
+          // mark node as dead.
+          tracker.nodeDead();
+
+          // capture node name for exception or logging message
+          if (atLeastOneFailure) {
+            failedNodeList.append(", ");
+          }else{
+            atLeastOneFailure = true;
+          }
+          failedNodeList.append(ep.getAddress());
+          failedNodeList.append(":");
+          failedNodeList.append(ep.getUserPort());
+
+        }
+      }
+
+      if (!atLeastOneFailure) {
+        logger.warn("Drillbits [{}] no longer registered in cluster.  Canceling query {}",
+            failedNodeList, QueryIdHelper.getQueryId(queryId));
+        stateListener.moveToState(QueryState.FAILED,
+            new ForemanException(String.format("One more more nodes lost connectivity during query.  Identified nodes were [%s].",
+                failedNodeList)));
+      }
+
+    }
+  };
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
index 4ff28f3..8a40f1b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/AbstractStatusReporter.java
@@ -24,29 +24,29 @@ import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.FragmentState;
 import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 
 public abstract class AbstractStatusReporter implements StatusReporter{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStatusReporter.class);
 
-  private FragmentContext context;
-  private volatile long startNanos;
+  private final FragmentContext context;
 
-  public AbstractStatusReporter(FragmentContext context) {
+  public AbstractStatusReporter(final FragmentContext context) {
     super();
     this.context = context;
   }
 
-  private  FragmentStatus.Builder getBuilder(FragmentState state){
+  private  FragmentStatus.Builder getBuilder(final FragmentState state){
     return getBuilder(context, state, null);
   }
 
-  public static FragmentStatus.Builder getBuilder(FragmentContext context, FragmentState state, UserException ex){
-    FragmentStatus.Builder status = FragmentStatus.newBuilder();
-    MinorFragmentProfile.Builder b = MinorFragmentProfile.newBuilder();
+  public static FragmentStatus.Builder getBuilder(final FragmentContext context, final FragmentState state, final UserException ex){
+    final FragmentStatus.Builder status = FragmentStatus.newBuilder();
+    final MinorFragmentProfile.Builder b = MinorFragmentProfile.newBuilder();
     context.getStats().addMetricsToStatus(b);
     b.setState(state);
     if(ex != null){
-      boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
+      final boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
       b.setError(ex.getOrCreatePBError(verbose));
     }
     status.setHandle(context.getHandle());
@@ -57,60 +57,36 @@ public abstract class AbstractStatusReporter implements StatusReporter{
   }
 
   @Override
-  public void stateChanged(FragmentHandle handle, FragmentState newState) {
-    FragmentStatus.Builder status = getBuilder(newState);
-
+  public void stateChanged(final FragmentHandle handle, final FragmentState newState) {
+    final FragmentStatus.Builder status = getBuilder(newState);
+    logger.info("State changed for {}. New state: {}", QueryIdHelper.getQueryIdentifier(handle), newState);
     switch(newState){
     case AWAITING_ALLOCATION:
-      awaitingAllocation(handle, status);
-      break;
+    case CANCELLATION_REQUESTED:
     case CANCELLED:
-      cancelled(handle, status);
-      break;
-    case FAILED:
-      // no op since fail should have also been called.
-      break;
     case FINISHED:
-      finished(handle, status);
-      break;
     case RUNNING:
-      this.startNanos = System.nanoTime();
-      running(handle, status);
+      statusChange(handle, status.build());
       break;
     case SENDING:
       // no op.
       break;
+    case FAILED:
+      // shouldn't get here since fail() should be called.
     default:
-      break;
-
+      throw new IllegalStateException(String.format("Received state changed event for unexpected state of %s.", newState));
     }
   }
 
-  protected void awaitingAllocation(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
-    statusChange(handle, statusBuilder.build());
-  }
-
-  protected void running(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
-    statusChange(handle, statusBuilder.build());
-  }
-
-  protected void cancelled(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
-    statusChange(handle, statusBuilder.build());
-  }
-
-  protected void finished(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
-    statusChange(handle, statusBuilder.build());
-  }
-
   protected abstract void statusChange(FragmentHandle handle, FragmentStatus status);
 
   @Override
-  public final void fail(FragmentHandle handle, String message, UserException excep) {
-    FragmentStatus.Builder status = getBuilder(context, FragmentState.FAILED, excep);
+  public final void fail(final FragmentHandle handle, final UserException excep) {
+    final FragmentStatus.Builder status = getBuilder(context, FragmentState.FAILED, excep);
     fail(handle, status);
   }
 
-  protected void fail(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
+  private void fail(final FragmentHandle handle, final FragmentStatus.Builder statusBuilder) {
     statusChange(handle, statusBuilder.build());
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index a4a97c9..3570ba5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -18,12 +18,14 @@
 package org.apache.drill.exec.work.fragment;
 
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.drill.common.DeferredException;
+import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentContext.ExecutorState;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.RootExec;
@@ -42,36 +44,42 @@ import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
 public class FragmentExecutor implements Runnable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
 
-  // TODO:  REVIEW:  Can't this be AtomicReference<FragmentState> (so that
-  // debugging and logging don't show just integer values--and for type safety)?
-  private final AtomicInteger state = new AtomicInteger(FragmentState.AWAITING_ALLOCATION_VALUE);
+  private final String fragmentName;
   private final FragmentRoot rootOperator;
   private final FragmentContext fragmentContext;
   private final StatusReporter listener;
-  private volatile boolean canceled;
-  private volatile boolean closed;
-  private RootExec root;
+  private final DeferredException deferredException = new DeferredException();
 
+  private volatile RootExec root;
+  private final AtomicReference<FragmentState> fragmentState = new AtomicReference<>(FragmentState.AWAITING_ALLOCATION);
+  private final ExtendedLatch acceptExternalEvents = new ExtendedLatch();
 
   public FragmentExecutor(final FragmentContext context, final FragmentRoot rootOperator,
                           final StatusReporter listener) {
     this.fragmentContext = context;
     this.rootOperator = rootOperator;
     this.listener = listener;
+    this.fragmentName = QueryIdHelper.getQueryIdentifier(context.getHandle());
+
+    context.setExecutorState(new ExecutorStateImpl());
   }
 
   @Override
   public String toString() {
-    return
-        super.toString()
-        + "[closed = " + closed
-        + ", state = " + state
-        + ", rootOperator = " + rootOperator
-        + ", fragmentContext = " + fragmentContext
-        + ", listener = " + listener
-        + "]";
+    final StringBuilder builder = new StringBuilder();
+    builder.append("FragmentExecutor [fragmentContext=");
+    builder.append(fragmentContext);
+    builder.append(", fragmentState=");
+    builder.append(fragmentState);
+    builder.append("]");
+    return builder.toString();
   }
 
+  /**
+   * Returns the current fragment status if the fragment is running. Otherwise, returns no status.
+   *
+   * @return FragmentStatus or null.
+   */
   public FragmentStatus getStatus() {
     /*
      * If the query is not in a running state, the operator tree is still being constructed and
@@ -81,29 +89,46 @@ public class FragmentExecutor implements Runnable {
      * before this check. This caused a concurrent modification exception as the list of operator
      * stats is iterated over while collecting info, and added to while building the operator tree.
      */
-    if(state.get() != FragmentState.RUNNING_VALUE) {
+    if (fragmentState.get() != FragmentState.RUNNING) {
       return null;
     }
 
-    return AbstractStatusReporter.getBuilder(fragmentContext, FragmentState.RUNNING, null).build();
+    final FragmentStatus status = AbstractStatusReporter
+        .getBuilder(fragmentContext, FragmentState.RUNNING, null)
+        .build();
+    return status;
   }
 
+  /**
+   * Cancel the execution of this fragment is in an appropriate state. Messages come from external.
+   */
   public void cancel() {
+    acceptExternalEvents.awaitUninterruptibly();
+
     /*
-     * Note that this can be called from threads *other* than the one running this runnable(), so
-     * we need to be careful about the state transitions that can result. We set the canceled flag,
-     * and this is checked in the run() loop, where action will be taken as soon as possible.
-     *
-     * If the run loop has already exited, because we've already either completed or failed the query,
-     * then the request to cancel is a no-op anyway, so it doesn't matter that we won't see the flag.
+     * Note that this can be called from threads *other* than the one running this runnable(), so we need to be careful
+     * about the state transitions that can result. We set the cancel requested flag but the actual cancellation is
+     * managed by the run() loop.
      */
-    canceled = true;
+    updateState(FragmentState.CANCELLATION_REQUESTED);
   }
 
-  public void receivingFragmentFinished(FragmentHandle handle) {
-    cancel();
+  /**
+   * Inform this fragment that one of its downstream partners no longer needs additional records. This is most commonly
+   * called in the case that a limit query is executed.
+   *
+   * @param handle
+   *          The downstream FragmentHandle of the Fragment that needs no more records from this Fragment.
+   */
+  public void receivingFragmentFinished(final FragmentHandle handle) {
+    acceptExternalEvents.awaitUninterruptibly();
     if (root != null) {
+      logger.info("Applying request for early sender termination for {} -> {}.",
+          QueryIdHelper.getFragmentId(this.getContext().getHandle()), QueryIdHelper.getFragmentId(handle));
       root.receivingFragmentFinished(handle);
+    } else {
+      logger.warn("Dropping request for early fragment termination for path {} -> {} as no root exec exists.",
+          QueryIdHelper.getFragmentId(this.getContext().getHandle()), QueryIdHelper.getFragmentId(handle));
     }
   }
 
@@ -114,184 +139,204 @@ public class FragmentExecutor implements Runnable {
     final FragmentHandle fragmentHandle = fragmentContext.getHandle();
     final ClusterCoordinator clusterCoordinator = fragmentContext.getDrillbitContext().getClusterCoordinator();
     final DrillbitStatusListener drillbitStatusListener = new FragmentDrillbitStatusListener();
+    final String newThreadName = QueryIdHelper.getExecutorThreadName(fragmentHandle);
 
     try {
-      final String newThreadName = String.format("%s:frag:%s:%s",
-          QueryIdHelper.getQueryId(fragmentHandle.getQueryId()),
-          fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId());
+
       myThread.setName(newThreadName);
 
       root = ImplCreator.getExec(fragmentContext, rootOperator);
+
       clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
+      updateState(FragmentState.RUNNING);
+
+      acceptExternalEvents.countDown();
 
       logger.debug("Starting fragment runner. {}:{}",
           fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId());
-      if (!updateStateOrFail(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING)) {
-        logger.warn("Unable to set fragment state to RUNNING.  Cancelled or failed?");
-        return;
-      }
 
       /*
-       * Run the query until root.next returns false OR cancel() changes the
-       * state.
-       * Note that we closeOutResources() here if we're done.  That's because
-       * this can also throw exceptions that we want to treat as failures of the
-       * request, even if the request did fine up until this point.  Any
-       * failures there will be caught in the catch clause below, which will be
-       * reported to the user.  If they were to come from the finally clause,
-       * the uncaught exception there will simply terminate this thread without
-       * alerting the user--the behavior then is to hang.
+       * Run the query until root.next returns false OR we no longer need to continue.
        */
-      while (state.get() == FragmentState.RUNNING_VALUE) {
-        if (canceled) {
-          logger.debug("Cancelling fragment {}", fragmentContext.getHandle());
-
-          // Change state checked by main loop to terminate it (if not already done):
-          updateState(FragmentState.CANCELLED);
-
-          fragmentContext.cancel();
-
-          logger.debug("Cancelled fragment {}", fragmentContext.getHandle());
-
-          /*
-           * The state will be altered because of the updateState(), which would cause
-           * us to fall out of the enclosing while loop; we just short-circuit that here
-           */
-          break;
-        }
-
-        if (!root.next()) {
-          if (fragmentContext.isFailed()) {
-            internalFail(fragmentContext.getFailureCause());
-            closeOutResources();
-          } else {
-            /*
-             * Close out resources before we report success. We do this so that we'll get an
-             * error if there's a problem cleaning up, even though the query execution portion
-             * succeeded.
-             */
-            closeOutResources();
-            updateStateOrFail(FragmentState.RUNNING, FragmentState.FINISHED);
-          }
-          break;
-        }
+      while (shouldContinue() && root.next()) {
+        // loop
       }
+
+      updateState(FragmentState.FINISHED);
+
     } catch (AssertionError | Exception e) {
-      logger.warn("Error while initializing or executing fragment", e);
-      fragmentContext.fail(e);
-      internalFail(e);
+      fail(e);
     } finally {
-      clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
 
-      // Final check to make sure RecordBatches are cleaned up.
+      // We need to sure we countDown at least once. We'll do it here to guarantee that.
+      acceptExternalEvents.countDown();
+
       closeOutResources();
 
+      // send the final state of the fragment. only the main execution thread can send the final state and it can
+      // only be sent once.
+      sendFinalState();
+
+      clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
+
       myThread.setName(originalThreadName);
     }
   }
 
-  private static final String CLOSE_FAILURE = "Failure while closing out resources";
-
-  private void closeOutResources() {
-    /*
-     * Because of the way this method can be called, it needs to be idempotent; it must
-     * be safe to call it more than once. We use this flag to bypass the body if it has
-     * been called before.
-     */
-    synchronized(this) { // synchronize for the state of closed
-      if (closed) {
-        return;
-      }
+  /**
+   * Utility method to check where we are in a no terminal state.
+   *
+   * @return Whether or not execution should continue.
+   */
+  private boolean shouldContinue() {
+    return !isCompleted() && FragmentState.CANCELLATION_REQUESTED != fragmentState.get();
+  }
 
-      final DeferredException deferredException = fragmentContext.getDeferredException();
-      try {
-        root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
-      } catch (RuntimeException e) {
-        logger.warn(CLOSE_FAILURE, e);
-        deferredException.addException(e);
-      }
+  /**
+   * Returns true if the fragment is in a terminal state
+   *
+   * @return Whether this state is in a terminal state.
+   */
+  private boolean isCompleted() {
+    return isTerminal(fragmentState.get());
+  }
 
-      closed = true;
+  private void sendFinalState() {
+    final FragmentState outcome = fragmentState.get();
+    if (outcome == FragmentState.FAILED) {
+      final FragmentHandle handle = getContext().getHandle();
+      final UserException uex = UserException.systemError(deferredException.getAndClear())
+          .addIdentity(getContext().getIdentity())
+          .addContext("Fragment", handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId())
+          .build();
+      listener.fail(fragmentContext.getHandle(), uex);
+    } else {
+      listener.stateChanged(fragmentContext.getHandle(), outcome);
     }
+  }
+
+
+  private void closeOutResources() {
 
-    /*
-     * This must be last, because this may throw deferred exceptions.
-     * We are forced to wrap the checked exception (if any) so that it will be unchecked.
-     */
     try {
-      fragmentContext.close();
-    } catch(Exception e) {
-      throw new RuntimeException("Error closing fragment context.", e);
+      root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
+    } catch (final Exception e) {
+      fail(e);
     }
-  }
 
-  private void internalFail(final Throwable excep) {
-    state.set(FragmentState.FAILED_VALUE);
+    fragmentContext.close();
 
-    UserException uex = UserException.systemError(excep).addIdentity(getContext().getIdentity()).build();
-    listener.fail(fragmentContext.getHandle(), "Failure while running fragment.", uex);
   }
 
-  /**
-   * Updates the fragment state with the given state
-   *
-   * @param  to  target state
-   */
-  private void updateState(final FragmentState to) {
-    state.set(to.getNumber());
-    listener.stateChanged(fragmentContext.getHandle(), to);
+  private void warnStateChange(final FragmentState current, final FragmentState target) {
+    logger.warn("Ignoring unexpected state transition {} => {}.", current.name(), target.name());
   }
 
-  /**
-   * Updates the fragment state only iff the current state matches the expected.
-   *
-   * @param  expected  expected current state
-   * @param  to  target state
-   * @return true only if update succeeds
-   */
-  private boolean checkAndUpdateState(final FragmentState expected, final FragmentState to) {
-    final boolean success = state.compareAndSet(expected.getNumber(), to.getNumber());
-    if (success) {
-      listener.stateChanged(fragmentContext.getHandle(), to);
-    } else {
-      logger.debug("State change failed. Expected state: {} -- target state: {} -- current state: {}.",
-          expected.name(), to.name(), FragmentState.valueOf(state.get()));
+  private void errorStateChange(final FragmentState current, final FragmentState target) {
+    final String msg = "Invalid state transition %s => %s.";
+    throw new StateTransitionException(String.format(msg, current.name(), target.name()));
+  }
+
+  private synchronized boolean updateState(FragmentState target) {
+    final FragmentHandle handle = fragmentContext.getHandle();
+    final FragmentState current = fragmentState.get();
+    logger.info(fragmentName + ": State change requested from {} --> {} for ", current, target);
+    switch (target) {
+    case CANCELLATION_REQUESTED:
+      switch (current) {
+      case SENDING:
+      case AWAITING_ALLOCATION:
+      case RUNNING:
+        fragmentState.set(target);
+        listener.stateChanged(handle, target);
+        return true;
+
+      default:
+        warnStateChange(current, target);
+        return false;
+      }
+
+    case FINISHED:
+      if(current == FragmentState.CANCELLATION_REQUESTED){
+        target = FragmentState.CANCELLED;
+      }
+      // fall-through
+    case FAILED:
+      if(!isTerminal(current)){
+        fragmentState.set(target);
+        // don't notify listener until we finalize this terminal state.
+        return true;
+      } else if (current == FragmentState.FAILED) {
+        // no warn since we can call fail multiple times.
+        return false;
+      } else if (current == FragmentState.CANCELLED && target == FragmentState.FAILED) {
+        fragmentState.set(FragmentState.FAILED);
+        return true;
+      }else{
+        warnStateChange(current, target);
+        return false;
+      }
+
+    case RUNNING:
+      if(current == FragmentState.AWAITING_ALLOCATION){
+        fragmentState.set(target);
+        listener.stateChanged(handle, target);
+        return true;
+      }else{
+        errorStateChange(current, target);
+      }
+
+    // these should never be requested.
+    case SENDING:
+    case AWAITING_ALLOCATION:
+    case CANCELLED:
+    default:
+      errorStateChange(current, target);
     }
-    return success;
+
+    // errorStateChange() throw should mean this is never executed
+    throw new IllegalStateException();
   }
 
-  /**
-   * Returns true if the fragment is in a terminal state
-   */
-  private boolean isCompleted() {
-    return state.get() == FragmentState.CANCELLED_VALUE
-        || state.get() == FragmentState.FAILED_VALUE
-        || state.get() == FragmentState.FINISHED_VALUE;
+  private boolean isTerminal(final FragmentState state) {
+    return state == FragmentState.CANCELLED
+        || state == FragmentState.FAILED
+        || state == FragmentState.FINISHED;
   }
 
   /**
-   * Update the state if current state matches expected or fail the fragment if state transition fails even though
-   * fragment is not in a terminal state.
+   * Capture an exception and add store it. Update state to failed status (if not already there). Does not immediately
+   * report status back to Foreman. Only the original thread can return status to the Foreman.
    *
-   * @param expected  current expected state
-   * @param to  target state
-   * @return true only if update succeeds
+   * @param excep
+   *          The failure that occurred.
    */
-  private boolean updateStateOrFail(final FragmentState expected, final FragmentState to) {
-    final boolean updated = checkAndUpdateState(expected, to);
-    if (!updated && !isCompleted()) {
-      final String msg = "State was different than expected while attempting to update state from %s to %s"
-          + "however current state was %s.";
-      internalFail(new StateTransitionException(
-          String.format(msg, expected.name(), to.name(), FragmentState.valueOf(state.get()))));
-    }
-    return updated;
+  private void fail(final Throwable excep) {
+    deferredException.addThrowable(excep);
+    updateState(FragmentState.FAILED);
   }
 
   public FragmentContext getContext() {
     return fragmentContext;
   }
 
+  private class ExecutorStateImpl implements ExecutorState {
+    public boolean shouldContinue() {
+      return FragmentExecutor.this.shouldContinue();
+    }
+
+    public void fail(final Throwable t) {
+      FragmentExecutor.this.fail(t);
+    }
+
+    public boolean isFailed() {
+      return fragmentState.get() == FragmentState.FAILED;
+    }
+    public Throwable getFailureCause(){
+      return deferredException.getException();
+    }
+  }
+
   private class FragmentDrillbitStatusListener implements DrillbitStatusListener {
     @Override
     public void drillbitRegistered(final Set<CoordinationProtos.DrillbitEndpoint> registeredDrillbits) {

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
index 7a819c4..0ba91b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
@@ -17,14 +17,14 @@
  */
 package org.apache.drill.exec.work.fragment;
 
+import java.io.IOException;
+
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.rpc.RemoteConnection;
 
-import java.io.IOException;
-
 /**
  * The Fragment Manager is responsible managing incoming data and executing a fragment. Once enough data and resources
  * are avialable, a fragment manager will start a fragment executor to run the associated fragment.
@@ -57,6 +57,8 @@ public interface FragmentManager {
 
   public abstract void addConnection(RemoteConnection connection);
 
+  public void receivingFragmentFinished(final FragmentHandle handle);
+
   /**
    *  Sets autoRead property on all connections
    * @param autoRead

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 41e87cd..a5b928b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -33,19 +33,23 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 import org.apache.drill.exec.work.foreman.ForemanException;
 
+import com.google.common.base.Preconditions;
+
 /**
  * This managers determines when to run a non-root fragment node.
  */
 // TODO a lot of this is the same as RootFragmentManager
 public class NonRootFragmentManager implements FragmentManager {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NonRootFragmentManager.class);
+
   private final PlanFragment fragment;
   private FragmentRoot root;
   private final IncomingBuffers buffers;
-  private final StatusReporter runnerListener;
-  private volatile FragmentExecutor runner;
+  private final FragmentExecutor runner;
   private volatile boolean cancel = false;
   private final FragmentContext context;
-  private List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
+  private final List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
+  private volatile boolean runnerRetrieved = false;
 
   public NonRootFragmentManager(final PlanFragment fragment, final DrillbitContext context)
       throws ExecutionSetupException {
@@ -54,8 +58,10 @@ public class NonRootFragmentManager implements FragmentManager {
       this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
       this.context = new FragmentContext(context, fragment, null, context.getFunctionImplementationRegistry());
       this.buffers = new IncomingBuffers(root, this.context);
+      final StatusReporter reporter = new NonRootStatusReporter(this.context, context.getController().getTunnel(
+          fragment.getForeman()));
+      this.runner = new FragmentExecutor(this.context, root, reporter);
       this.context.setBuffers(buffers);
-      this.runnerListener = new NonRootStatusReporter(this.context, context.getController().getTunnel(fragment.getForeman()));
 
     } catch (ForemanException | IOException e) {
       throw new FragmentSetupException("Failure while decoding fragment.", e);
@@ -66,7 +72,7 @@ public class NonRootFragmentManager implements FragmentManager {
    * @see org.apache.drill.exec.work.fragment.FragmentHandler#handle(org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle, org.apache.drill.exec.record.RawFragmentBatch)
    */
   @Override
-  public boolean handle(RawFragmentBatch batch) throws FragmentSetupException, IOException {
+  public boolean handle(final RawFragmentBatch batch) throws FragmentSetupException, IOException {
     return buffers.batchArrived(batch);
   }
 
@@ -76,16 +82,22 @@ public class NonRootFragmentManager implements FragmentManager {
   @Override
   public FragmentExecutor getRunnable() {
     synchronized(this) {
-      if (runner != null) {
-        throw new IllegalStateException("Get Runnable can only be run once.");
-      }
+
+      // historically, we had issues where we tried to run the same fragment multiple times. Let's check to make sure
+      // this isn't happening.
+      Preconditions.checkArgument(!runnerRetrieved, "Get Runnable can only be run once.");
+
       if (cancel) {
         return null;
       }
-      runner = new FragmentExecutor(context, root, runnerListener);
+      runnerRetrieved = true;
       return runner;
     }
+  }
 
+  @Override
+  public void receivingFragmentFinished(final FragmentHandle handle) {
+    runner.receivingFragmentFinished(handle);
   }
 
   /* (non-Javadoc)
@@ -95,9 +107,7 @@ public class NonRootFragmentManager implements FragmentManager {
   public void cancel() {
     synchronized(this) {
       cancel = true;
-      if (runner != null) {
-        runner.cancel();
-      }
+      runner.cancel();
     }
   }
 
@@ -117,13 +127,13 @@ public class NonRootFragmentManager implements FragmentManager {
   }
 
   @Override
-  public void addConnection(RemoteConnection connection) {
+  public void addConnection(final RemoteConnection connection) {
     connections.add(connection);
   }
 
   @Override
-  public void setAutoRead(boolean autoRead) {
-    for (RemoteConnection c : connections) {
+  public void setAutoRead(final boolean autoRead) {
+    for (final RemoteConnection c : connections) {
       c.setAutoRead(autoRead);
     }
   }


Mime
View raw message