drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [5/7] drill git commit: DRILL-2762: Update Fragment state reporting and error collection
Date Sun, 19 Apr 2015 01:03:40 GMT
http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 4b317e0..7b9fffb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -17,9 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.project;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.PrintStream;
 import java.util.HashMap;
 import java.util.List;
 
@@ -63,18 +61,15 @@ import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.util.BatchPrinter;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarCharVector;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
 import com.carrotsearch.hppc.IntOpenHashSet;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.sun.codemodel.JExpr;
 
 public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
@@ -85,7 +80,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   private boolean hasRemainder = false;
   private int remainderIndex = 0;
   private int recordCount;
-  private boolean buildingSchema = true;
+  private final boolean buildingSchema = true;
 
   private static final String EMPTY_STRING = "";
   private boolean first = true;
@@ -96,7 +91,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     public String prefix = "";
     public HashMap<String, Integer> prefixMap = Maps.newHashMap();
     public CaseInsensitiveMap outputMap = new CaseInsensitiveMap();
-    private CaseInsensitiveMap sequenceMap = new CaseInsensitiveMap();
+    private final CaseInsensitiveMap sequenceMap = new CaseInsensitiveMap();
 
     private void clear() {
       isStar = false;
@@ -109,7 +104,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     }
   }
 
-  public ProjectRecordBatch(Project pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
+  public ProjectRecordBatch(final Project pop, final RecordBatch incoming, final FragmentContext context) throws OutOfMemoryException {
     super(pop, context, incoming);
   }
 
@@ -120,7 +115,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
 
 
   @Override
-  protected void killIncoming(boolean sendUpstream) {
+  protected void killIncoming(final boolean sendUpstream) {
     super.killIncoming(sendUpstream);
     hasRemainder = false;
   }
@@ -157,7 +152,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         if (next == IterOutcome.OK_NEW_SCHEMA) {
           try {
             setupNewSchema();
-          } catch (SchemaChangeException e) {
+          } catch (final SchemaChangeException e) {
             throw new RuntimeException(e);
           }
         }
@@ -172,7 +167,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       return IterOutcome.OUT_OF_MEMORY;
     }
 
-    int outputRecords = projector.projectRecords(0, incomingRecordCount, 0);
+    final int outputRecords = projector.projectRecords(0, incomingRecordCount, 0);
     if (outputRecords < incomingRecordCount) {
       setValueCount(outputRecords);
       hasRemainder = true;
@@ -180,7 +175,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       this.recordCount = remainderIndex;
     } else {
       setValueCount(incomingRecordCount);
-      for(VectorWrapper<?> v: incoming) {
+      for(final VectorWrapper<?> v: incoming) {
         v.clear();
       }
       this.recordCount = outputRecords;
@@ -195,12 +190,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   }
 
   private void handleRemainder() {
-    int remainingRecordCount = incoming.getRecordCount() - remainderIndex;
+    final int remainingRecordCount = incoming.getRecordCount() - remainderIndex;
     if (!doAlloc()) {
       outOfMemory = true;
       return;
     }
-    int projRecords = projector.projectRecords(remainderIndex, remainingRecordCount, 0);
+    final int projRecords = projector.projectRecords(remainderIndex, remainingRecordCount, 0);
     if (projRecords < remainingRecordCount) {
       setValueCount(projRecords);
       this.recordCount = projRecords;
@@ -209,7 +204,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       setValueCount(remainingRecordCount);
       hasRemainder = false;
       remainderIndex = 0;
-      for (VectorWrapper<?> v : incoming) {
+      for (final VectorWrapper<?> v : incoming) {
         v.clear();
       }
       this.recordCount = remainingRecordCount;
@@ -221,13 +216,13 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     }
   }
 
-  public void addComplexWriter(ComplexWriter writer) {
+  public void addComplexWriter(final ComplexWriter writer) {
     complexWriters.add(writer);
   }
 
   private boolean doAlloc() {
     //Allocate vv in the allocationVectors.
-    for (ValueVector v : this.allocationVectors) {
+    for (final ValueVector v : this.allocationVectors) {
       AllocationHelper.allocateNew(v, incoming.getRecordCount());
     }
 
@@ -236,16 +231,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       return true;
     }
 
-    for (ComplexWriter writer : complexWriters) {
+    for (final ComplexWriter writer : complexWriters) {
       writer.allocate();
     }
 
     return true;
   }
 
-  private void setValueCount(int count) {
-    for (ValueVector v : allocationVectors) {
-      ValueVector.Mutator m = v.getMutator();
+  private void setValueCount(final int count) {
+    for (final ValueVector v : allocationVectors) {
+      final ValueVector.Mutator m = v.getMutator();
       m.setValueCount(count);
     }
 
@@ -253,15 +248,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       return;
     }
 
-    for (ComplexWriter writer : complexWriters) {
+    for (final ComplexWriter writer : complexWriters) {
       writer.setValueCount(count);
     }
   }
 
   /** hack to make ref and full work together... need to figure out if this is still necessary. **/
-  private FieldReference getRef(NamedExpression e) {
-    FieldReference ref = e.getRef();
-    PathSegment seg = ref.getRootSegment();
+  private FieldReference getRef(final NamedExpression e) {
+    final FieldReference ref = e.getRef();
+    final PathSegment seg = ref.getRootSegment();
 
 //    if (seg.isNamed() && "output".contentEquals(seg.getNameSegment().getPath())) {
 //      return new FieldReference(ref.getPath().toString().subSequence(7, ref.getPath().length()), ref.getPosition());
@@ -269,8 +264,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     return ref;
   }
 
-  private boolean isAnyWildcard(List<NamedExpression> exprs) {
-    for (NamedExpression e : exprs) {
+  private boolean isAnyWildcard(final List<NamedExpression> exprs) {
+    for (final NamedExpression e : exprs) {
       if (isWildcard(e)) {
         return true;
       }
@@ -278,18 +273,18 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     return false;
   }
 
-  private boolean isWildcard(NamedExpression ex) {
+  private boolean isWildcard(final NamedExpression ex) {
     if ( !(ex.getExpr() instanceof SchemaPath)) {
       return false;
     }
-    NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
+    final NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
     return expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
   }
 
   @Override
   protected boolean setupNewSchema() throws SchemaChangeException {
     if (allocationVectors != null) {
-      for (ValueVector v : allocationVectors) {
+      for (final ValueVector v : allocationVectors) {
         v.clear();
       }
     }
@@ -305,12 +300,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
 
     final ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getFunctionRegistry());
 
-    IntOpenHashSet transferFieldIds = new IntOpenHashSet();
+    final IntOpenHashSet transferFieldIds = new IntOpenHashSet();
 
-    boolean isAnyWildcard = isAnyWildcard(exprs);
+    final boolean isAnyWildcard = isAnyWildcard(exprs);
 
-    ClassifierResult result = new ClassifierResult();
-    boolean classify = isClassificationNeeded(exprs);
+    final ClassifierResult result = new ClassifierResult();
+    final boolean classify = isClassificationNeeded(exprs);
 
     for (int i = 0; i < exprs.size(); i++) {
       final NamedExpression namedExpression = exprs.get(i);
@@ -321,33 +316,33 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
 
         if (result.isStar) {
           // The value indicates which wildcard we are processing now
-          Integer value = result.prefixMap.get(result.prefix);
+          final Integer value = result.prefixMap.get(result.prefix);
           if (value != null && value.intValue() == 1) {
             int k = 0;
-            for (VectorWrapper<?> wrapper : incoming) {
-              ValueVector vvIn = wrapper.getValueVector();
-              SchemaPath originalPath = vvIn.getField().getPath();
+            for (final VectorWrapper<?> wrapper : incoming) {
+              final ValueVector vvIn = wrapper.getValueVector();
+              final SchemaPath originalPath = vvIn.getField().getPath();
               if (k > result.outputNames.size()-1) {
                 assert false;
               }
-              String name = result.outputNames.get(k++);  // get the renamed column names
+              final String name = result.outputNames.get(k++);  // get the renamed column names
               if (name == EMPTY_STRING) {
                 continue;
               }
-              FieldReference ref = new FieldReference(name);
-              ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vvIn.getField().getType()));
-              TransferPair tp = vvIn.makeTransferPair(vvOut);
+              final FieldReference ref = new FieldReference(name);
+              final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vvIn.getField().getType()));
+              final TransferPair tp = vvIn.makeTransferPair(vvOut);
               transfers.add(tp);
             }
           } else if (value != null && value.intValue() > 1) { // subsequent wildcards should do a copy of incoming valuevectors
             int k = 0;
-            for (VectorWrapper<?> wrapper : incoming) {
-              ValueVector vvIn = wrapper.getValueVector();
-              SchemaPath originalPath = vvIn.getField().getPath();
+            for (final VectorWrapper<?> wrapper : incoming) {
+              final ValueVector vvIn = wrapper.getValueVector();
+              final SchemaPath originalPath = vvIn.getField().getPath();
               if (k > result.outputNames.size()-1) {
                 assert false;
               }
-              String name = result.outputNames.get(k++);  // get the renamed column names
+              final String name = result.outputNames.get(k++);  // get the renamed column names
               if (name == EMPTY_STRING) {
                 continue;
               }
@@ -357,12 +352,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
                 throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
               }
 
-              MaterializedField outputField = MaterializedField.create(name, expr.getMajorType());
-              ValueVector vv = container.addOrGet(outputField, callBack);
+              final MaterializedField outputField = MaterializedField.create(name, expr.getMajorType());
+              final ValueVector vv = container.addOrGet(outputField, callBack);
               allocationVectors.add(vv);
-              TypedFieldId fid = container.getValueVectorId(outputField.getPath());
-              ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
-              HoldingContainer hc = cg.addExpr(write);
+              final TypedFieldId fid = container.getValueVectorId(outputField.getPath());
+              final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
+              final HoldingContainer hc = cg.addExpr(write);
             }
           }
           continue;
@@ -371,7 +366,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         // For the columns which do not needed to be classified,
         // it is still necessary to ensure the output column name is unique
         result.outputNames = Lists.newArrayList();
-        String outputName = getRef(namedExpression).getRootSegment().getPath();
+        final String outputName = getRef(namedExpression).getRootSegment().getPath();
         addToResultMaps(outputName, result, true);
       }
 
@@ -403,17 +398,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
           && !isAnyWildcard
           && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])) {
 
-        ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
-        TypedFieldId id = vectorRead.getFieldId();
-        ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
+        final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
+        final TypedFieldId id = vectorRead.getFieldId();
+        final ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
         Preconditions.checkNotNull(incoming);
 
-        FieldReference ref = getRef(namedExpression);
-        ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vectorRead.getMajorType()));
-        TransferPair tp = vvIn.makeTransferPair(vvOut);
+        final FieldReference ref = getRef(namedExpression);
+        final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vectorRead.getMajorType()));
+        final TransferPair tp = vvIn.makeTransferPair(vvOut);
         transfers.add(tp);
         transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
-        logger.debug("Added transfer for project expression.");
       } else if (expr instanceof DrillFuncHolderExpr &&
           ((DrillFuncHolderExpr) expr).isComplexWriterFuncHolder())  {
         // Need to process ComplexWriter function evaluation.
@@ -429,12 +423,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         cg.addExpr(expr);
       } else{
         // need to do evaluation.
-        ValueVector vector = container.addOrGet(outputField, callBack);
+        final ValueVector vector = container.addOrGet(outputField, callBack);
         allocationVectors.add(vector);
-        TypedFieldId fid = container.getValueVectorId(outputField.getPath());
-        boolean useSetSafe = !(vector instanceof FixedWidthVector);
-        ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
-        HoldingContainer hc = cg.addExpr(write);
+        final TypedFieldId fid = container.getValueVectorId(outputField.getPath());
+        final boolean useSetSafe = !(vector instanceof FixedWidthVector);
+        final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
+        final HoldingContainer hc = cg.addExpr(write);
 
         logger.debug("Added eval for project expression.");
       }
@@ -459,12 +453,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       return popConfig.getExprs();
     }
 
-    List<NamedExpression> exprs = Lists.newArrayList();
-    for (MaterializedField field : incoming.getSchema()) {
+    final List<NamedExpression> exprs = Lists.newArrayList();
+    for (final MaterializedField field : incoming.getSchema()) {
       if (Types.isComplex(field.getType()) || Types.isRepeated(field.getType())) {
-        LogicalExpression convertToJson = FunctionCallFactory.createConvert(ConvertExpression.CONVERT_TO, "JSON", field.getPath(), ExpressionPosition.UNKNOWN);
-        String castFuncName = CastFunctions.getCastFunc(MinorType.VARCHAR);
-        List<LogicalExpression> castArgs = Lists.newArrayList();
+        final LogicalExpression convertToJson = FunctionCallFactory.createConvert(ConvertExpression.CONVERT_TO, "JSON", field.getPath(), ExpressionPosition.UNKNOWN);
+        final String castFuncName = CastFunctions.getCastFunc(MinorType.VARCHAR);
+        final List<LogicalExpression> castArgs = Lists.newArrayList();
         castArgs.add(convertToJson);  //input_expr
         /*
          * We are implicitly casting to VARCHAR so we don't have a max length,
@@ -472,7 +466,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
          * to the actual size so this size doesn't really matter.
          */
         castArgs.add(new ValueExpressions.LongExpression(TypeHelper.VARCHAR_DEFAULT_CAST_LEN, null)); //
-        FunctionCall castCall = new FunctionCall(castFuncName, castArgs, ExpressionPosition.UNKNOWN);
+        final FunctionCall castCall = new FunctionCall(castFuncName, castArgs, ExpressionPosition.UNKNOWN);
         exprs.add(new NamedExpression(castCall, new FieldReference(field.getPath())));
       } else {
         exprs.add(new NamedExpression(field.getPath(), new FieldReference(field.getPath())));
@@ -481,17 +475,17 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     return exprs;
   }
 
-  private boolean isClassificationNeeded(List<NamedExpression> exprs) {
+  private boolean isClassificationNeeded(final List<NamedExpression> exprs) {
     boolean needed = false;
     for (int i = 0; i < exprs.size(); i++) {
       final NamedExpression ex = exprs.get(i);
       if (!(ex.getExpr() instanceof SchemaPath)) {
         continue;
       }
-      NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment();
-      NameSegment ref = ex.getRef().getRootSegment();
-      boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
-      boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
+      final NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment();
+      final NameSegment ref = ex.getRef().getRootSegment();
+      final boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
+      final boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
 
       if (refHasPrefix || exprContainsStar) {
         needed = true;
@@ -501,16 +495,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     return needed;
   }
 
-  private String getUniqueName(String name, ClassifierResult result) {
-    Integer currentSeq = (Integer) result.sequenceMap.get(name);
+  private String getUniqueName(final String name, final ClassifierResult result) {
+    final Integer currentSeq = (Integer) result.sequenceMap.get(name);
     if (currentSeq == null) { // name is unique, so return the original name
-      Integer n = -1;
+      final Integer n = -1;
       result.sequenceMap.put(name, n);
       return name;
     }
     // create a new name
-    Integer newSeq = currentSeq + 1;
-    String newName = name + newSeq;
+    final Integer newSeq = currentSeq + 1;
+    final String newName = name + newSeq;
     result.sequenceMap.put(name, newSeq);
     result.sequenceMap.put(newName, -1);
 
@@ -527,7 +521,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   *                            to ensure uniqueness
   * @Param allowDupsWithRename if the original name has been used, is renaming allowed to ensure output name unique
   */
-  private void addToResultMaps(String origName, ClassifierResult result, boolean allowDupsWithRename) {
+  private void addToResultMaps(final String origName, final ClassifierResult result, final boolean allowDupsWithRename) {
     String name = origName;
     if (allowDupsWithRename) {
       name = getUniqueName(origName, result);
@@ -540,22 +534,22 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     }
   }
 
-  private void classifyExpr(NamedExpression ex, RecordBatch incoming, ClassifierResult result)  {
-    NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
-    NameSegment ref = ex.getRef().getRootSegment();
-    boolean exprHasPrefix = expr.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
-    boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
-    boolean exprIsStar = expr.getPath().equals(StarColumnHelper.STAR_COLUMN);
-    boolean refContainsStar = ref.getPath().contains(StarColumnHelper.STAR_COLUMN);
-    boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
-    boolean refEndsWithStar = ref.getPath().endsWith(StarColumnHelper.STAR_COLUMN);
+  private void classifyExpr(final NamedExpression ex, final RecordBatch incoming, final ClassifierResult result)  {
+    final NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
+    final NameSegment ref = ex.getRef().getRootSegment();
+    final boolean exprHasPrefix = expr.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
+    final boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
+    final boolean exprIsStar = expr.getPath().equals(StarColumnHelper.STAR_COLUMN);
+    final boolean refContainsStar = ref.getPath().contains(StarColumnHelper.STAR_COLUMN);
+    final boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
+    final boolean refEndsWithStar = ref.getPath().endsWith(StarColumnHelper.STAR_COLUMN);
 
     String exprPrefix = EMPTY_STRING;
     String exprSuffix = expr.getPath();
 
     if (exprHasPrefix) {
       // get the prefix of the expr
-      String[] exprComponents = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
+      final String[] exprComponents = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
       assert(exprComponents.length == 2);
       exprPrefix = exprComponents[0];
       exprSuffix = exprComponents[1];
@@ -565,18 +559,18 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     boolean exprIsFirstWildcard = false;
     if (exprContainsStar) {
       result.isStar = true;
-      Integer value = (Integer) result.prefixMap.get(exprPrefix);
+      final Integer value = (Integer) result.prefixMap.get(exprPrefix);
       if (value == null) {
-        Integer n = 1;
+        final Integer n = 1;
         result.prefixMap.put(exprPrefix, n);
         exprIsFirstWildcard = true;
       } else {
-        Integer n = value + 1;
+        final Integer n = value + 1;
         result.prefixMap.put(exprPrefix, n);
       }
     }
 
-    int incomingSchemaSize = incoming.getSchema().getFieldCount();
+    final int incomingSchemaSize = incoming.getSchema().getFieldCount();
 
     // for debugging..
     // if (incomingSchemaSize > 9) {
@@ -585,16 +579,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
 
     // input is '*' and output is 'prefix_*'
     if (exprIsStar && refHasPrefix && refEndsWithStar) {
-      String[] components = ref.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
+      final String[] components = ref.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
       assert(components.length == 2);
-      String prefix = components[0];
+      final String prefix = components[0];
       result.outputNames = Lists.newArrayList();
-      for(VectorWrapper<?> wrapper : incoming) {
-        ValueVector vvIn = wrapper.getValueVector();
-        String name = vvIn.getField().getPath().getRootSegment().getPath();
+      for(final VectorWrapper<?> wrapper : incoming) {
+        final ValueVector vvIn = wrapper.getValueVector();
+        final String name = vvIn.getField().getPath().getRootSegment().getPath();
 
         // add the prefix to the incoming column name
-        String newName = prefix + StarColumnHelper.PREFIX_DELIMITER + name;
+        final String newName = prefix + StarColumnHelper.PREFIX_DELIMITER + name;
         addToResultMaps(newName, result, false);
       }
     }
@@ -609,19 +603,19 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
           result.outputNames.add(EMPTY_STRING);  // initialize
         }
 
-        for (VectorWrapper<?> wrapper : incoming) {
-          ValueVector vvIn = wrapper.getValueVector();
-          String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
+        for (final VectorWrapper<?> wrapper : incoming) {
+          final ValueVector vvIn = wrapper.getValueVector();
+          final String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
           // get the prefix of the name
-          String[] nameComponents = incomingName.split(StarColumnHelper.PREFIX_DELIMITER, 2);
+          final String[] nameComponents = incomingName.split(StarColumnHelper.PREFIX_DELIMITER, 2);
           // if incoming valuevector does not have a prefix, ignore it since this expression is not referencing it
           if (nameComponents.length <= 1) {
             k++;
             continue;
           }
-          String namePrefix = nameComponents[0];
+          final String namePrefix = nameComponents[0];
           if (exprPrefix.equals(namePrefix)) {
-            String newName = incomingName;
+            final String newName = incomingName;
             if (!result.outputMap.containsKey(newName)) {
               result.outputNames.set(k, newName);
               result.outputMap.put(newName,  newName);
@@ -632,9 +626,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       } else {
         result.outputNames = Lists.newArrayList();
         if (exprContainsStar) {
-          for (VectorWrapper<?> wrapper : incoming) {
-            ValueVector vvIn = wrapper.getValueVector();
-            String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
+          for (final VectorWrapper<?> wrapper : incoming) {
+            final ValueVector vvIn = wrapper.getValueVector();
+            final String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
             if (refContainsStar) {
               addToResultMaps(incomingName, result, true); // allow dups since this is likely top-level project
             } else {
@@ -642,7 +636,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
             }
           }
         } else {
-          String newName = expr.getPath();
+          final String newName = expr.getPath();
           if (!refHasPrefix && !exprHasPrefix) {
             addToResultMaps(newName, result, true); // allow dups since this is likely top-level project
           } else {
@@ -655,9 +649,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     // input is wildcard and it is not the first wildcard
     else if(exprIsStar) {
       result.outputNames = Lists.newArrayList();
-      for (VectorWrapper<?> wrapper : incoming) {
-        ValueVector vvIn = wrapper.getValueVector();
-        String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
+      for (final VectorWrapper<?> wrapper : incoming) {
+        final ValueVector vvIn = wrapper.getValueVector();
+        final String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
         addToResultMaps(incomingName, result, true); // allow dups since this is likely top-level project
       }
     }
@@ -665,7 +659,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     // only the output has prefix
     else if (!exprHasPrefix && refHasPrefix) {
       result.outputNames = Lists.newArrayList();
-      String newName = ref.getPath();
+      final String newName = ref.getPath();
       addToResultMaps(newName, result, false);
     }
     // input has prefix but output does not
@@ -676,24 +670,24 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         result.outputNames.add(EMPTY_STRING);  // initialize
       }
 
-      for (VectorWrapper<?> wrapper : incoming) {
-        ValueVector vvIn = wrapper.getValueVector();
-        String name = vvIn.getField().getPath().getRootSegment().getPath();
-        String[] components = name.split(StarColumnHelper.PREFIX_DELIMITER, 2);
+      for (final VectorWrapper<?> wrapper : incoming) {
+        final ValueVector vvIn = wrapper.getValueVector();
+        final String name = vvIn.getField().getPath().getRootSegment().getPath();
+        final String[] components = name.split(StarColumnHelper.PREFIX_DELIMITER, 2);
         if (components.length <= 1)  {
           k++;
           continue;
         }
-        String namePrefix = components[0];
-        String nameSuffix = components[1];
+        final String namePrefix = components[0];
+        final String nameSuffix = components[1];
         if (exprPrefix.equals(namePrefix)) {
           if (refContainsStar) {
             // remove the prefix from the incoming column names
-            String newName = getUniqueName(nameSuffix, result);  // for top level we need to make names unique
+            final String newName = getUniqueName(nameSuffix, result);  // for top level we need to make names unique
             result.outputNames.set(k, newName);
           } else if (exprSuffix.equals(nameSuffix)) {
             // example: ref: $f1, expr: T0<PREFIX><column_name>
-            String newName = ref.getPath();
+            final String newName = ref.getPath();
             result.outputNames.set(k, newName);
           }
         } else {
@@ -704,7 +698,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     }
     // input and output have prefixes although they could be different...
     else if (exprHasPrefix && refHasPrefix) {
-      String[] input = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
+      final String[] input = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
       assert(input.length == 2);
       assert false : "Unexpected project expression or reference";  // not handled yet
     }
@@ -713,11 +707,11 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       // then we just want to pick the ref name as the output column name
 
       result.outputNames = Lists.newArrayList();
-      for (VectorWrapper<?> wrapper : incoming) {
-        ValueVector vvIn = wrapper.getValueVector();
-        String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
+      for (final VectorWrapper<?> wrapper : incoming) {
+        final ValueVector vvIn = wrapper.getValueVector();
+        final String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
         if (expr.getPath().equals(incomingName)) {
-          String newName = ref.getPath();
+          final String newName = ref.getPath();
           addToResultMaps(newName, result, true);
         }
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 389d668..094865e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -27,9 +27,9 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.OpProfileDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
-import org.apache.drill.exec.ops.OpProfileDef;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.UnorderedReceiver;
 import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
@@ -53,13 +53,13 @@ import org.apache.drill.exec.rpc.RpcOutcomeListener;
 public class UnorderedReceiverBatch implements RecordBatch {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class);
 
-  private RecordBatchLoader batchLoader;
-  private RawFragmentBatchProvider fragProvider;
-  private FragmentContext context;
+  private final RecordBatchLoader batchLoader;
+  private final RawFragmentBatchProvider fragProvider;
+  private final FragmentContext context;
   private BatchSchema schema;
-  private OperatorStats stats;
+  private final OperatorStats stats;
   private boolean first = true;
-  private UnorderedReceiver config;
+  private final UnorderedReceiver config;
   OperatorContext oContext;
 
   public enum Metric implements MetricDef {
@@ -72,7 +72,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
     }
   }
 
-  public UnorderedReceiverBatch(FragmentContext context, RawFragmentBatchProvider fragProvider, UnorderedReceiver config) throws OutOfMemoryException {
+  public UnorderedReceiverBatch(final FragmentContext context, final RawFragmentBatchProvider fragProvider, final UnorderedReceiver config) throws OutOfMemoryException {
     this.fragProvider = fragProvider;
     this.context = context;
     // In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector,
@@ -101,7 +101,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
   }
 
   @Override
-  public void kill(boolean sendUpstream) {
+  public void kill(final boolean sendUpstream) {
     if (sendUpstream) {
       informSenders();
     }
@@ -124,12 +124,12 @@ public class UnorderedReceiverBatch implements RecordBatch {
   }
 
   @Override
-  public TypedFieldId getValueVectorId(SchemaPath path) {
+  public TypedFieldId getValueVectorId(final SchemaPath path) {
     return batchLoader.getValueVectorId(path);
   }
 
   @Override
-  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+  public VectorWrapper<?> getValueAccessorById(final Class<?> clazz, final int... ids) {
     return batchLoader.getValueAccessorById(clazz, ids);
   }
 
@@ -154,7 +154,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
 
       if (batch == null) {
         batchLoader.clear();
-        if (context.isCancelled()) {
+        if (!context.shouldContinue()) {
           return IterOutcome.STOP;
         }
         return IterOutcome.NONE;
@@ -167,8 +167,8 @@ public class UnorderedReceiverBatch implements RecordBatch {
 
 //      logger.debug("Next received batch {}", batch);
 
-      RecordBatchDef rbd = batch.getHeader().getDef();
-      boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
+      final RecordBatchDef rbd = batch.getHeader().getDef();
+      final boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
       stats.addLongStat(Metric.BYTES_RECEIVED, batch.getByteCount());
 
       batch.release();
@@ -206,15 +206,16 @@ public class UnorderedReceiverBatch implements RecordBatch {
   }
 
   private void informSenders() {
-    FragmentHandle handlePrototype = FragmentHandle.newBuilder()
+    logger.info("Informing senders of request to terminate sending.");
+    final FragmentHandle handlePrototype = FragmentHandle.newBuilder()
             .setMajorFragmentId(config.getOppositeMajorFragmentId())
             .setQueryId(context.getHandle().getQueryId())
             .build();
-    for (MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
-      FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
+    for (final MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
+      final FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
               .setMinorFragmentId(providingEndpoint.getId())
               .build();
-      FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
+      final FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
               .setReceiver(context.getHandle())
               .setSender(sender)
               .build();
@@ -225,12 +226,12 @@ public class UnorderedReceiverBatch implements RecordBatch {
   private class OutcomeListener implements RpcOutcomeListener<Ack> {
 
     @Override
-    public void failed(RpcException ex) {
+    public void failed(final RpcException ex) {
       logger.warn("Failed to inform upstream that receiver is finished");
     }
 
     @Override
-    public void success(Ack value, ByteBuf buffer) {
+    public void success(final Ack value, final ByteBuf buffer) {
       // Do nothing
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index bd3c4e7..95d062c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -347,6 +347,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         mSorter.setup(context, oContext.getAllocator(), getSelectionVector4(), this.container);
         mSorter.sort(this.container);
 
+        // sort may have prematurely exited due to should continue returning false.
+        if (!context.shouldContinue()) {
+          return IterOutcome.STOP;
+        }
         sv4 = mSorter.getSV4();
 
         long t = watch.elapsed(TimeUnit.MICROSECONDS);

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 94bc3a3..9b97e1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -41,19 +41,20 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
   private long compares;
   private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
   private Queue<Integer> newRunStarts;
-
+  private FragmentContext context;
 
   @Override
-  public void setup(FragmentContext context, BufferAllocator allocator, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException{
+  public void setup(final FragmentContext context, final BufferAllocator allocator, final SelectionVector4 vector4, final VectorContainer hyperBatch) throws SchemaChangeException{
     // we pass in the local hyperBatch since that is where we'll be reading data.
     Preconditions.checkNotNull(vector4);
     this.vector4 = vector4.createNewWrapperCurrent();
+    this.context = context;
     vector4.clear();
     doSetup(context, hyperBatch, null);
     runStarts.add(0);
     int batch = 0;
     for (int i = 0; i < this.vector4.getTotalCount(); i++) {
-      int newBatch = this.vector4.get(i) >>> 16;
+      final int newBatch = this.vector4.get(i) >>> 16;
       if (newBatch == batch) {
         continue;
       } else if(newBatch == batch + 1) {
@@ -63,7 +64,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
         throw new UnsupportedOperationException("Missing batch");
       }
     }
-    BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
+    final BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
     preAlloc.preAllocate(4 * this.vector4.getTotalCount());
     aux = new SelectionVector4(preAlloc.getAllocation(), this.vector4.getTotalCount(), Character.MAX_VALUE);
   }
@@ -75,12 +76,12 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
    * @param recordCount
    * @return
    */
-  public static long memoryNeeded(int recordCount) {
+  public static long memoryNeeded(final int recordCount) {
     // We need 4 bytes (SV4) for each record.
     return recordCount * 4;
   }
 
-  private int merge(int leftStart, int rightStart, int rightEnd, int outStart) {
+  private int merge(final int leftStart, final int rightStart, final int rightEnd, final int outStart) {
     int l = leftStart;
     int r = rightStart;
     int o = outStart;
@@ -107,17 +108,23 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
   }
 
   @Override
-  public void sort(VectorContainer container) {
-    Stopwatch watch = new Stopwatch();
+  public void sort(final VectorContainer container) {
+    final Stopwatch watch = new Stopwatch();
     watch.start();
     while (runStarts.size() > 1) {
+
+      // check if we're cancelled/failed frequently
+      if (!context.shouldContinue()) {
+        return;
+      }
+
       int outIndex = 0;
       newRunStarts = Queues.newLinkedBlockingQueue();
       newRunStarts.add(outIndex);
-      int size = runStarts.size();
+      final int size = runStarts.size();
       for (int i = 0; i < size / 2; i++) {
-        int left = runStarts.poll();
-        int right = runStarts.poll();
+        final int left = runStarts.poll();
+        final int right = runStarts.poll();
         Integer end = runStarts.peek();
         if (end == null) {
           end = vector4.getTotalCount();
@@ -130,7 +137,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
       if (outIndex < vector4.getTotalCount()) {
         copyRun(outIndex, vector4.getTotalCount());
       }
-      SelectionVector4 tmp = aux.createNewWrapperCurrent();
+      final SelectionVector4 tmp = aux.createNewWrapperCurrent();
       aux.clear();
       aux = this.vector4.createNewWrapperCurrent();
       vector4.clear();
@@ -141,23 +148,23 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
     aux.clear();
   }
 
-  private void copyRun(int start, int end) {
+  private void copyRun(final int start, final int end) {
     for (int i = start; i < end; i++) {
       aux.set(i, vector4.get(i));
     }
   }
 
   @Override
-  public void swap(int sv0, int sv1) {
-    int tmp = vector4.get(sv0);
+  public void swap(final int sv0, final int sv1) {
+    final int tmp = vector4.get(sv0);
     vector4.set(sv0, vector4.get(sv1));
     vector4.set(sv1, tmp);
   }
 
   @Override
-  public int compare(int leftIndex, int rightIndex) {
-    int sv1 = vector4.get(leftIndex);
-    int sv2 = vector4.get(rightIndex);
+  public int compare(final int leftIndex, final int rightIndex) {
+    final int sv1 = vector4.get(leftIndex);
+    final int sv2 = vector4.get(rightIndex);
     compares++;
     return doEval(sv1, sv2);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
index 1aadaa2..5e0cd82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
@@ -18,7 +18,6 @@
 
 package org.apache.drill.exec.proto.helper;
 
-import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 
@@ -29,22 +28,32 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
 public class QueryIdHelper {
 
   /* Generate a UUID from the two parts of the queryid */
-  public static String getQueryId(QueryId queryId) {
+  public static String getQueryId(final QueryId queryId) {
     return (new UUID(queryId.getPart1(), queryId.getPart2())).toString();
   }
 
-  public static QueryId getQueryIdFromString(String queryId) {
-    UUID uuid = UUID.fromString(queryId);
+  public static QueryId getQueryIdFromString(final String queryId) {
+    final UUID uuid = UUID.fromString(queryId);
     return QueryId.newBuilder().setPart1(uuid.getMostSignificantBits()).setPart2(uuid.getLeastSignificantBits()).build();
   }
 
-  public static String getQueryIdentifier(FragmentHandle h) {
+  public static String getQueryIdentifier(final FragmentHandle h) {
     return getQueryId(h.getQueryId()) + ":" + h.getMajorFragmentId() + ":" + h.getMinorFragmentId();
   }
 
-  public static String getQueryIdentifiers(QueryId queryId, int majorFragmentId, List<Integer> minorFragmentIds) {
-    String fragmentIds = minorFragmentIds.size() == 1 ? minorFragmentIds.get(0).toString() : minorFragmentIds.toString();
+  public static String getExecutorThreadName(final FragmentHandle fragmentHandle) {
+    return String.format("%s:frag:%s:%s",
+        getQueryId(fragmentHandle.getQueryId()),
+        fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId());
+  }
+
+  public static String getQueryIdentifiers(final QueryId queryId, final int majorFragmentId, final List<Integer> minorFragmentIds) {
+    final String fragmentIds = minorFragmentIds.size() == 1 ? minorFragmentIds.get(0).toString() : minorFragmentIds.toString();
     return getQueryId(queryId) + ":" + majorFragmentId + ":" + fragmentIds;
   }
 
+  public static String getFragmentId(final FragmentHandle fragmentHandle) {
+    return fragmentHandle.getMajorFragmentId() + ":" + fragmentHandle.getMinorFragmentId();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 2bb29e5..215f580 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -41,15 +41,15 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
 
   protected BatchState state;
 
-  protected AbstractRecordBatch(T popConfig, FragmentContext context) throws OutOfMemoryException {
+  protected AbstractRecordBatch(final T popConfig, final FragmentContext context) throws OutOfMemoryException {
     this(popConfig, context, true, new OperatorContext(popConfig, context, true));
   }
 
-  protected AbstractRecordBatch(T popConfig, FragmentContext context, boolean buildSchema) throws OutOfMemoryException {
+  protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema) throws OutOfMemoryException {
     this(popConfig, context, buildSchema, new OperatorContext(popConfig, context, true));
   }
 
-  protected AbstractRecordBatch(T popConfig, FragmentContext context, boolean buildSchema, OperatorContext oContext) throws OutOfMemoryException {
+  protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema, final OperatorContext oContext) throws OutOfMemoryException {
     super();
     this.context = context;
     this.popConfig = popConfig;
@@ -84,16 +84,18 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     return popConfig;
   }
 
-  public final IterOutcome next(RecordBatch b) {
-
+  public final IterOutcome next(final RecordBatch b) {
+    if(!context.shouldContinue()) {
+      return IterOutcome.STOP;
+    }
     return next(0, b);
   }
 
-  public final IterOutcome next(int inputIndex, RecordBatch b){
+  public final IterOutcome next(final int inputIndex, final RecordBatch b){
     IterOutcome next = null;
     stats.stopProcessing();
     try{
-      if (context.isCancelled()) {
+      if (!context.shouldContinue()) {
         return IterOutcome.STOP;
       }
       next = b.next();
@@ -141,7 +143,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
         default:
           return innerNext();
       }
-    } catch (SchemaChangeException e) {
+    } catch (final SchemaChangeException e) {
       throw new DrillRuntimeException(e);
     } finally {
       stats.stopProcessing();
@@ -159,7 +161,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   }
 
   @Override
-  public void kill(boolean sendUpstream) {
+  public void kill(final boolean sendUpstream) {
     killIncoming(sendUpstream);
   }
 
@@ -182,12 +184,12 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   }
 
   @Override
-  public TypedFieldId getValueVectorId(SchemaPath path) {
+  public TypedFieldId getValueVectorId(final SchemaPath path) {
     return container.getValueVectorId(path);
   }
 
   @Override
-  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+  public VectorWrapper<?> getValueAccessorById(final Class<?> clazz, final int... ids) {
     return container.getValueAccessorById(clazz, ids);
   }
 
@@ -195,7 +197,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   @Override
   public WritableBatch getWritableBatch() {
 //    logger.debug("Getting writable batch.");
-    WritableBatch batch = WritableBatch.get(this);
+    final WritableBatch batch = WritableBatch.get(this);
     return batch;
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
index 7d157fe..4f99081 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
@@ -19,15 +19,9 @@ package org.apache.drill.exec.record;
 
 import io.netty.buffer.ByteBuf;
 
-import java.util.List;
-
 import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
-import org.apache.drill.exec.proto.UserBitShared.SerializedField;
-
-import com.google.common.collect.Lists;
 
 public class FragmentWritableBatch{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentWritableBatch.class);
@@ -37,26 +31,25 @@ public class FragmentWritableBatch{
   private final ByteBuf[] buffers;
   private final FragmentRecordBatch header;
 
-  public FragmentWritableBatch(boolean isLast, QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId, WritableBatch batch){
+  public FragmentWritableBatch(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int receiveMinorFragmentId, final WritableBatch batch){
     this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, new int[]{receiveMinorFragmentId}, batch.getDef(), batch.getBuffers());
   }
 
-  public FragmentWritableBatch(boolean isLast, QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int[] receiveMinorFragmentIds, WritableBatch batch){
+  public FragmentWritableBatch(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int[] receiveMinorFragmentIds, final WritableBatch batch){
     this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentIds, batch.getDef(), batch.getBuffers());
   }
 
-  private FragmentWritableBatch(boolean isLast, QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int[] receiveMinorFragmentId, RecordBatchDef def, ByteBuf... buffers){
+  private FragmentWritableBatch(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int[] receiveMinorFragmentId, final RecordBatchDef def, final ByteBuf... buffers){
     this.buffers = buffers;
-    FragmentRecordBatch.Builder builder = FragmentRecordBatch //
-        .newBuilder() //
-        .setIsLastBatch(isLast) //
-        .setDef(def) //
+    final FragmentRecordBatch.Builder builder = FragmentRecordBatch.newBuilder()
+        .setIsLastBatch(isLast)
+        .setDef(def)
         .setQueryId(queryId)
-        .setReceivingMajorFragmentId(receiveMajorFragmentId) //
-        .setSendingMajorFragmentId(sendMajorFragmentId) //
+        .setReceivingMajorFragmentId(receiveMajorFragmentId)
+        .setSendingMajorFragmentId(sendMajorFragmentId)
         .setSendingMinorFragmentId(sendMinorFragmentId);
 
-    for(int i : receiveMinorFragmentId){
+    for(final int i : receiveMinorFragmentId){
       builder.addReceivingMinorFragmentId(i);
     }
 
@@ -64,31 +57,32 @@ public class FragmentWritableBatch{
   }
 
 
-  public static FragmentWritableBatch getEmptyLast(QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId){
+  public static FragmentWritableBatch getEmptyLast(final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int receiveMinorFragmentId){
     return getEmptyLast(queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, new int[]{receiveMinorFragmentId});
   }
 
-  public static FragmentWritableBatch getEmptyLast(QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int[] receiveMinorFragmentIds){
+  public static FragmentWritableBatch getEmptyLast(final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int[] receiveMinorFragmentIds){
     return new FragmentWritableBatch(true, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentIds, EMPTY_DEF);
   }
 
 
-  public static FragmentWritableBatch getEmptyLastWithSchema(QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId,
-                                                             int receiveMajorFragmentId, int receiveMinorFragmentId, BatchSchema schema){
+  public static FragmentWritableBatch getEmptyLastWithSchema(final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId,
+                                                             final int receiveMajorFragmentId, final int receiveMinorFragmentId, final BatchSchema schema){
     return getEmptyBatchWithSchema(true, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId,
         receiveMinorFragmentId, schema);
   }
 
-  public static FragmentWritableBatch getEmptyBatchWithSchema(boolean isLast, QueryId queryId, int sendMajorFragmentId,
-      int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId, BatchSchema schema){
+  public static FragmentWritableBatch getEmptyBatchWithSchema(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId,
+      final int sendMinorFragmentId, final int receiveMajorFragmentId, final int receiveMinorFragmentId, final BatchSchema schema){
 
-    List<SerializedField> fields = Lists.newArrayList();
-    for (MaterializedField field : schema) {
-      fields.add(field.getSerializedField());
+    final RecordBatchDef.Builder def = RecordBatchDef.newBuilder();
+    if (schema != null) {
+      for (final MaterializedField field : schema) {
+        def.addField(field.getSerializedField());
+      }
     }
-    RecordBatchDef def = RecordBatchDef.newBuilder().addAllField(fields).build();
     return new FragmentWritableBatch(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId,
-        new int[]{receiveMinorFragmentId}, def);
+        new int[] { receiveMinorFragmentId }, def.build());
   }
 
   public ByteBuf[] getBuffers(){
@@ -97,7 +91,7 @@ public class FragmentWritableBatch{
 
   public long getByteCount() {
     long n = 0;
-    for (ByteBuf buf : buffers) {
+    for (final ByteBuf buf : buffers) {
       n += buf.readableBytes();
     }
     return n;

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
index 33e0665..e18b94c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
@@ -19,14 +19,14 @@ package org.apache.drill.exec.rpc.data;
 
 import io.netty.buffer.DrillBuf;
 
+import java.io.IOException;
+
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
 import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.fragment.FragmentManager;
 
-import java.io.IOException;
-
 public class DataResponseHandlerImpl implements DataResponseHandler{
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataResponseHandlerImpl.class);
   private final WorkerBee bee;
@@ -45,7 +45,7 @@ public class DataResponseHandlerImpl implements DataResponseHandler{
       final DrillBuf data, final AckSender sender) throws FragmentSetupException, IOException {
 //      logger.debug("Fragment Batch received {}", fragmentBatch);
 
-    boolean canRun = manager.handle(new RawFragmentBatch(fragmentBatch, data, sender));
+    final boolean canRun = manager.handle(new RawFragmentBatch(fragmentBatch, data, sender));
     if (canRun) {
 //    logger.debug("Arriving batch means local batch can run, starting local batch.");
       /*
@@ -54,10 +54,5 @@ public class DataResponseHandlerImpl implements DataResponseHandler{
        */
       bee.startFragmentPendingRemote(manager);
     }
-    if (fragmentBatch.getIsLastBatch() && !manager.isWaiting()) {
-//    logger.debug("Removing handler.  Is Last Batch {}.  Is Waiting for more {}", fragmentBatch.getIsLastBatch(),
-//        manager.isWaiting());
-      bee.getContext().getWorkBus().removeFragmentManager(manager.getHandle());
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index c15bb7c..e7a9a3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -76,13 +76,13 @@ public class Drillbit implements AutoCloseable {
     Drillbit bit;
     try {
       bit = new Drillbit(config, remoteServiceSet);
-    } catch (Exception ex) {
+    } catch (final Exception ex) {
       throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex);
     }
 
     try {
       bit.run();
-    } catch (Exception e) {
+    } catch (final Exception e) {
       bit.close();
       throw new DrillbitStartupException("Failure during initial startup of Drillbit.", e);
     }
@@ -131,7 +131,7 @@ public class Drillbit implements AutoCloseable {
 
     // parse out the properties, validate, and then set them
     final String systemProps[] = allSystemProps.split(",");
-    for(String systemProp : systemProps) {
+    for(final String systemProp : systemProps) {
       final String keyValue[] = systemProp.split("=");
       if (keyValue.length != 2) {
         throwInvalidSystemOption(systemProp, "does not contain a key=value assignment");
@@ -162,7 +162,7 @@ public class Drillbit implements AutoCloseable {
   }
 
   public static void main(final String[] cli) throws DrillbitStartupException {
-    StartupOptions options = StartupOptions.parse(cli);
+    final StartupOptions options = StartupOptions.parse(cli);
     start(options);
   }
 
@@ -174,7 +174,7 @@ public class Drillbit implements AutoCloseable {
   private final Server embeddedJetty;
   private RegistrationHandle registrationHandle;
 
-  public Drillbit(DrillConfig config, RemoteServiceSet serviceSet) throws Exception {
+  public Drillbit(final DrillConfig config, final RemoteServiceSet serviceSet) throws Exception {
     final long startTime = System.currentTimeMillis();
     logger.debug("Construction started.");
     final boolean allowPortHunting = serviceSet != null;
@@ -269,14 +269,14 @@ public class Drillbit implements AutoCloseable {
 
     try {
       Thread.sleep(context.getConfig().getInt(ExecConstants.ZK_REFRESH) * 2);
-    } catch (InterruptedException e) {
+    } catch (final InterruptedException e) {
       logger.warn("Interrupted while sleeping during coordination deregistration.");
     }
 
     if (embeddedJetty != null) {
       try {
         embeddedJetty.stop();
-      } catch (Exception e) {
+      } catch (final Exception e) {
         logger.warn("Failure while shutting down embedded jetty server.");
       }
     }
@@ -323,7 +323,7 @@ public class Drillbit implements AutoCloseable {
       logger.info("Received shutdown request.");
       try {
         drillbit.close();
-      } catch(Exception e) {
+      } catch(final Exception e) {
         throw new RuntimeException("Caught exception closing Drillbit started from\n" + stackTrace, e);
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
index e9024ff..20f76a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
@@ -26,67 +26,79 @@ import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
 
 interface Comparators {
   final static Comparator<MajorFragmentProfile> majorIdCompare = new Comparator<MajorFragmentProfile>() {
-    public int compare(MajorFragmentProfile o1, MajorFragmentProfile o2) {
+    public int compare(final MajorFragmentProfile o1, final MajorFragmentProfile o2) {
       return Long.compare(o1.getMajorFragmentId(), o2.getMajorFragmentId());
     }
   };
 
   final static Comparator<MinorFragmentProfile> minorIdCompare = new Comparator<MinorFragmentProfile>() {
-    public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+    public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
       return Long.compare(o1.getMinorFragmentId(), o2.getMinorFragmentId());
     }
   };
 
   final static Comparator<MinorFragmentProfile> startTimeCompare = new Comparator<MinorFragmentProfile>() {
-    public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+    public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
       return Long.compare(o1.getStartTime(), o2.getStartTime());
     }
   };
 
+  final static Comparator<MinorFragmentProfile> lastUpdateCompare = new Comparator<MinorFragmentProfile>() {
+    public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
+      return Long.compare(o1.getLastUpdate(), o2.getLastUpdate());
+    }
+  };
+
+  final static Comparator<MinorFragmentProfile> lastProgressCompare = new Comparator<MinorFragmentProfile>() {
+    public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
+      return Long.compare(o1.getLastProgress(), o2.getLastProgress());
+    }
+  };
+
   final static Comparator<MinorFragmentProfile> endTimeCompare = new Comparator<MinorFragmentProfile>() {
-    public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+    public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
       return Long.compare(o1.getEndTime(), o2.getEndTime());
     }
   };
 
   final static Comparator<MinorFragmentProfile> fragPeakMemAllocated = new Comparator<MinorFragmentProfile>() {
-    public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+    public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
       return Long.compare(o1.getMaxMemoryUsed(), o2.getMaxMemoryUsed());
     }
   };
 
   final static Comparator<MinorFragmentProfile> runTimeCompare = new Comparator<MinorFragmentProfile>() {
-    public int compare(MinorFragmentProfile o1, MinorFragmentProfile o2) {
+    public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
       return Long.compare(o1.getEndTime() - o1.getStartTime(), o2.getEndTime() - o2.getStartTime());
     }
   };
 
   final static Comparator<OperatorProfile> operatorIdCompare = new Comparator<OperatorProfile>() {
-    public int compare(OperatorProfile o1, OperatorProfile o2) {
+    public int compare(final OperatorProfile o1, final OperatorProfile o2) {
       return Long.compare(o1.getOperatorId(), o2.getOperatorId());
     }
   };
 
   final static Comparator<Pair<OperatorProfile, Integer>> setupTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
-    public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+    public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
       return Long.compare(o1.getLeft().getSetupNanos(), o2.getLeft().getSetupNanos());
     }
   };
 
   final static Comparator<Pair<OperatorProfile, Integer>> processTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
-    public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+    public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
       return Long.compare(o1.getLeft().getProcessNanos(), o2.getLeft().getProcessNanos());
     }
   };
 
   final static Comparator<Pair<OperatorProfile, Integer>> waitTimeSort = new Comparator<Pair<OperatorProfile, Integer>>() {
-    public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+    public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
       return Long.compare(o1.getLeft().getWaitNanos(), o2.getLeft().getWaitNanos());
     }
   };
 
   final static Comparator<Pair<OperatorProfile, Integer>> opPeakMem = new Comparator<Pair<OperatorProfile, Integer>>() {
-    public int compare(Pair<OperatorProfile, Integer> o1, Pair<OperatorProfile, Integer> o2) {
+    public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
       return Long.compare(o1.getLeft().getPeakLocalMemoryAllocated(), o2.getLeft().getPeakLocalMemoryAllocated());
     }
   };

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java
index 3a66327..b245b30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/FragmentWrapper.java
@@ -32,7 +32,7 @@ public class FragmentWrapper {
   private final MajorFragmentProfile major;
   private final long start;
 
-  public FragmentWrapper(MajorFragmentProfile major, long start) {
+  public FragmentWrapper(final MajorFragmentProfile major, final long start) {
     this.major = Preconditions.checkNotNull(major);
     this.start = start;
   }
@@ -45,44 +45,51 @@ public class FragmentWrapper {
     return String.format("fragment-%s", major.getMajorFragmentId());
   }
 
-  public void addSummary(TableBuilder tb) {
+  public void addSummary(final TableBuilder tb, final int colCount) {
     final String fmt = " (%d)";
-    long t0 = start;
+    final long t0 = start;
 
-    ArrayList<MinorFragmentProfile> complete = new ArrayList<MinorFragmentProfile>(
+    final ArrayList<MinorFragmentProfile> complete = new ArrayList<MinorFragmentProfile>(
         Collections2.filter(major.getMinorFragmentProfileList(), Filters.hasOperatorsAndTimes));
 
     tb.appendCell(new OperatorPathBuilder().setMajor(major).build(), null);
     tb.appendCell(complete.size() + " / " + major.getMinorFragmentProfileCount(), null);
 
     if (complete.size() < 1) {
-      tb.appendRepeated("", null, 7);
+      tb.appendRepeated("", null, colCount - 2);
       return;
     }
 
-    int li = complete.size() - 1;
+    final MinorFragmentProfile firstStart = Collections.min(complete, Comparators.startTimeCompare);
+    final MinorFragmentProfile lastStart = Collections.max(complete, Comparators.startTimeCompare);
+    tb.appendMillis(firstStart.getStartTime() - t0, String.format(fmt, firstStart.getMinorFragmentId()));
+    tb.appendMillis(lastStart.getStartTime() - t0, String.format(fmt, lastStart.getMinorFragmentId()));
 
-    Collections.sort(complete, Comparators.startTimeCompare);
-    tb.appendMillis(complete.get(0).getStartTime() - t0, String.format(fmt, complete.get(0).getMinorFragmentId()));
-    tb.appendMillis(complete.get(li).getStartTime() - t0, String.format(fmt, complete.get(li).getMinorFragmentId()));
-
-    Collections.sort(complete, Comparators.endTimeCompare);
-    tb.appendMillis(complete.get(0).getEndTime() - t0, String.format(fmt, complete.get(0).getMinorFragmentId()));
-    tb.appendMillis(complete.get(li).getEndTime() - t0, String.format(fmt, complete.get(li).getMinorFragmentId()));
+    final MinorFragmentProfile firstEnd = Collections.min(complete, Comparators.endTimeCompare);
+    final MinorFragmentProfile lastEnd = Collections.max(complete, Comparators.endTimeCompare);
+    tb.appendMillis(firstEnd.getEndTime() - t0, String.format(fmt, firstEnd.getMinorFragmentId()));
+    tb.appendMillis(lastEnd.getEndTime() - t0, String.format(fmt, lastEnd.getMinorFragmentId()));
 
     long total = 0;
-    for (MinorFragmentProfile p : complete) {
+    for (final MinorFragmentProfile p : complete) {
       total += p.getEndTime() - p.getStartTime();
     }
-    Collections.sort(complete, Comparators.runTimeCompare);
-    tb.appendMillis(complete.get(0).getEndTime() - complete.get(0).getStartTime(),
-        String.format(fmt, complete.get(0).getMinorFragmentId()));
+
+    final MinorFragmentProfile shortRun = Collections.min(complete, Comparators.endTimeCompare);
+    final MinorFragmentProfile longRun = Collections.max(complete, Comparators.endTimeCompare);
+
+    tb.appendMillis(shortRun.getEndTime() - shortRun.getStartTime(), String.format(fmt, shortRun.getMinorFragmentId()));
     tb.appendMillis((long) (total / complete.size()), null);
-    tb.appendMillis(complete.get(li).getEndTime() - complete.get(li).getStartTime(),
-        String.format(fmt, complete.get(li).getMinorFragmentId()));
+    tb.appendMillis(longRun.getEndTime() - longRun.getStartTime(), String.format(fmt, longRun.getMinorFragmentId()));
 
-    Collections.sort(complete, Comparators.fragPeakMemAllocated);
-    tb.appendBytes(complete.get(li).getMaxMemoryUsed(), null);
+    final MinorFragmentProfile lastUpdate = Collections.max(complete, Comparators.lastUpdateCompare);
+    tb.appendTime(lastUpdate.getLastUpdate(), null);
+
+    final MinorFragmentProfile lastProgress = Collections.max(complete, Comparators.lastProgressCompare);
+    tb.appendTime(lastProgress.getLastProgress(), null);
+
+    final MinorFragmentProfile maxMem = Collections.max(complete, Comparators.fragPeakMemAllocated);
+    tb.appendBytes(maxMem.getMaxMemoryUsed(), null);
   }
 
   public String getContent() {
@@ -90,9 +97,10 @@ public class FragmentWrapper {
   }
 
 
-  public String majorFragmentTimingProfile(MajorFragmentProfile major) {
-    final String[] columns = {"Minor Fragment", "Host", "Start", "End", "Total Time", "Max Records", "Max Batches", "Peak Memory", "State"};
-    TableBuilder builder = new TableBuilder(columns);
+  public String majorFragmentTimingProfile(final MajorFragmentProfile major) {
+    final String[] columns = { "Minor Fragment", "Host", "Start", "End", "Total Time", "Max Records", "Max Batches",
+        "Last Update", "Last Progress", "Peak Memory", "State" };
+    final TableBuilder builder = new TableBuilder(columns);
 
     ArrayList<MinorFragmentProfile> complete, incomplete;
     complete = new ArrayList<MinorFragmentProfile>(Collections2.filter(
@@ -101,17 +109,17 @@ public class FragmentWrapper {
         major.getMinorFragmentProfileList(), Filters.missingOperatorsOrTimes));
 
     Collections.sort(complete, Comparators.minorIdCompare);
-    for (MinorFragmentProfile minor : complete) {
-      ArrayList<OperatorProfile> ops = new ArrayList<OperatorProfile>(minor.getOperatorProfileList());
+    for (final MinorFragmentProfile minor : complete) {
+      final ArrayList<OperatorProfile> ops = new ArrayList<OperatorProfile>(minor.getOperatorProfileList());
 
-      long t0 = start;
+      final long t0 = start;
       long biggestIncomingRecords = 0;
       long biggestBatches = 0;
 
-      for (OperatorProfile op : ops) {
+      for (final OperatorProfile op : ops) {
         long incomingRecords = 0;
         long batches = 0;
-        for (StreamProfile sp : op.getInputProfileList()) {
+        for (final StreamProfile sp : op.getInputProfileList()) {
           incomingRecords += sp.getRecords();
           batches += sp.getBatches();
         }
@@ -127,14 +135,17 @@ public class FragmentWrapper {
 
       builder.appendFormattedInteger(biggestIncomingRecords, null);
       builder.appendFormattedInteger(biggestBatches, null);
+
+      builder.appendTime(minor.getLastUpdate(), null);
+      builder.appendTime(minor.getLastProgress(), null);
       builder.appendBytes(minor.getMaxMemoryUsed(), null);
       builder.appendCell(minor.getState().name(), null);
     }
-    for (MinorFragmentProfile m : incomplete) {
+    for (final MinorFragmentProfile m : incomplete) {
       builder.appendCell(
           major.getMajorFragmentId() + "-"
               + m.getMinorFragmentId(), null);
-      builder.appendRepeated(m.getState().toString(), null, 6);
+      builder.appendRepeated(m.getState().toString(), null, columns.length - 1);
     }
     return builder.toString();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
index 4f4fcdb..7a1d9b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/OperatorWrapper.java
@@ -81,7 +81,6 @@ public class OperatorWrapper {
     CoreOperatorType operatorType = CoreOperatorType.valueOf(ops.get(0).getLeft().getOperatorType());
     tb.appendCell(operatorType == null ? "UNKNOWN_OPERATOR" : operatorType.toString(), null);
 
-    int li = ops.size() - 1;
     String fmt = " (%s)";
 
     double setupSum = 0.0;
@@ -95,23 +94,26 @@ public class OperatorWrapper {
       memSum += ip.getLeft().getPeakLocalMemoryAllocated();
     }
 
-    Collections.sort(ops, Comparators.setupTimeSort);
-    tb.appendNanos(ops.get(0).getLeft().getSetupNanos(), String.format(fmt, ops.get(0).getRight()));
+    final ImmutablePair<OperatorProfile, Integer> shortSetup = Collections.min(ops, Comparators.setupTimeSort);
+    final ImmutablePair<OperatorProfile, Integer> longSetup = Collections.max(ops, Comparators.setupTimeSort);
+    tb.appendNanos(shortSetup.getLeft().getSetupNanos(), String.format(fmt, shortSetup.getRight()));
     tb.appendNanos((long) (setupSum / ops.size()), null);
-    tb.appendNanos(ops.get(li).getLeft().getSetupNanos(), String.format(fmt, ops.get(li).getRight()));
+    tb.appendNanos(longSetup.getLeft().getSetupNanos(), String.format(fmt, longSetup.getRight()));
 
-    Collections.sort(ops, Comparators.processTimeSort);
-    tb.appendNanos(ops.get(0).getLeft().getProcessNanos(), String.format(fmt, ops.get(0).getRight()));
+    final ImmutablePair<OperatorProfile, Integer> shortProcess = Collections.min(ops, Comparators.processTimeSort);
+    final ImmutablePair<OperatorProfile, Integer> longProcess = Collections.max(ops, Comparators.processTimeSort);
+    tb.appendNanos(shortProcess.getLeft().getProcessNanos(), String.format(fmt, shortProcess.getRight()));
     tb.appendNanos((long) (processSum / ops.size()), null);
-    tb.appendNanos(ops.get(li).getLeft().getProcessNanos(), String.format(fmt, ops.get(li).getRight()));
+    tb.appendNanos(longProcess.getLeft().getProcessNanos(), String.format(fmt, longProcess.getRight()));
 
-    Collections.sort(ops, Comparators.waitTimeSort);
-    tb.appendNanos(ops.get(0).getLeft().getWaitNanos(), String.format(fmt, ops.get(0).getRight()));
+    final ImmutablePair<OperatorProfile, Integer> shortWait = Collections.min(ops, Comparators.waitTimeSort);
+    final ImmutablePair<OperatorProfile, Integer> longWait = Collections.max(ops, Comparators.waitTimeSort);
+    tb.appendNanos(shortWait.getLeft().getWaitNanos(), String.format(fmt, shortWait.getRight()));
     tb.appendNanos((long) (waitSum / ops.size()), null);
-    tb.appendNanos(ops.get(li).getLeft().getWaitNanos(), String.format(fmt, ops.get(li).getRight()));
+    tb.appendNanos(longWait.getLeft().getWaitNanos(), String.format(fmt, longWait.getRight()));
 
-    Collections.sort(ops, Comparators.opPeakMem);
+    final ImmutablePair<OperatorProfile, Integer> peakMem = Collections.max(ops, Comparators.opPeakMem);
     tb.appendBytes((long) (memSum / ops.size()), null);
-    tb.appendBytes(ops.get(li).getLeft().getPeakLocalMemoryAllocated(), null);
+    tb.appendBytes(peakMem.getLeft().getPeakLocalMemoryAllocated(), null);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/c0d5a693/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
index 80016aa..479e655 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
@@ -38,7 +38,7 @@ public class ProfileWrapper {
   public QueryProfile profile;
   public String id;
 
-  public ProfileWrapper(QueryProfile profile) {
+  public ProfileWrapper(final QueryProfile profile) {
     this.profile = profile;
     this.id = QueryIdHelper.getQueryId(profile.getId());
   }
@@ -56,25 +56,25 @@ public class ProfileWrapper {
   }
 
   public List<OperatorWrapper> getOperatorProfiles() {
-    List<OperatorWrapper> ows = Lists.newArrayList();
-    Map<ImmutablePair<Integer, Integer>, List<ImmutablePair<OperatorProfile, Integer>>> opmap = Maps.newHashMap();
+    final List<OperatorWrapper> ows = Lists.newArrayList();
+    final Map<ImmutablePair<Integer, Integer>, List<ImmutablePair<OperatorProfile, Integer>>> opmap = Maps.newHashMap();
 
-    List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
+    final List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
     Collections.sort(majors, Comparators.majorIdCompare);
-    for (MajorFragmentProfile major : majors) {
+    for (final MajorFragmentProfile major : majors) {
 
-      List<MinorFragmentProfile> minors = new ArrayList<>(major.getMinorFragmentProfileList());
+      final List<MinorFragmentProfile> minors = new ArrayList<>(major.getMinorFragmentProfileList());
       Collections.sort(minors, Comparators.minorIdCompare);
-      for (MinorFragmentProfile minor : minors) {
+      for (final MinorFragmentProfile minor : minors) {
 
-        List<OperatorProfile> ops = new ArrayList<>(minor.getOperatorProfileList());
+        final List<OperatorProfile> ops = new ArrayList<>(minor.getOperatorProfileList());
         Collections.sort(ops, Comparators.operatorIdCompare);
-        for (OperatorProfile op : ops) {
+        for (final OperatorProfile op : ops) {
 
-          ImmutablePair<Integer, Integer> ip = new ImmutablePair<>(
+          final ImmutablePair<Integer, Integer> ip = new ImmutablePair<>(
               major.getMajorFragmentId(), op.getOperatorId());
           if (!opmap.containsKey(ip)) {
-            List<ImmutablePair<OperatorProfile, Integer>> l = Lists.newArrayList();
+            final List<ImmutablePair<OperatorProfile, Integer>> l = Lists.newArrayList();
             opmap.put(ip, l);
           }
           opmap.get(ip).add(new ImmutablePair<>(op, minor.getMinorFragmentId()));
@@ -82,10 +82,10 @@ public class ProfileWrapper {
       }
     }
 
-    List<ImmutablePair<Integer, Integer>> keys = new ArrayList<>(opmap.keySet());
+    final List<ImmutablePair<Integer, Integer>> keys = new ArrayList<>(opmap.keySet());
     Collections.sort(keys);
 
-    for (ImmutablePair<Integer, Integer> ip : keys) {
+    for (final ImmutablePair<Integer, Integer> ip : keys) {
       ows.add(new OperatorWrapper(ip.getLeft(), opmap.get(ip)));
     }
 
@@ -93,11 +93,11 @@ public class ProfileWrapper {
   }
 
   public List<FragmentWrapper> getFragmentProfiles() {
-    List<FragmentWrapper> fws = Lists.newArrayList();
+    final List<FragmentWrapper> fws = Lists.newArrayList();
 
-    List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
+    final List<MajorFragmentProfile> majors = new ArrayList<>(profile.getFragmentProfileList());
     Collections.sort(majors, Comparators.majorIdCompare);
-    for (MajorFragmentProfile major : majors) {
+    for (final MajorFragmentProfile major : majors) {
       fws.add(new FragmentWrapper(major, profile.getStart()));
     }
 
@@ -105,10 +105,11 @@ public class ProfileWrapper {
   }
 
   public String getFragmentsOverview() {
-    final String[] columns = {"Major Fragment", "Minor Fragments Reporting", "First Start", "Last Start", "First End", "Last End", "tmin", "tavg", "tmax", "memmax"};
-    TableBuilder tb = new TableBuilder(columns);
-    for (FragmentWrapper fw : getFragmentProfiles()) {
-      fw.addSummary(tb);
+    final String[] columns = { "Major Fragment", "Minor Fragments Reporting", "First Start", "Last Start", "First End",
+        "Last End", "tmin", "tavg", "tmax", "last update", "last progress", "memmax" };
+    final TableBuilder tb = new TableBuilder(columns);
+    for (final FragmentWrapper fw : getFragmentProfiles()) {
+      fw.addSummary(tb, columns.length);
     }
     return tb.toString();
   }
@@ -117,17 +118,17 @@ public class ProfileWrapper {
 
   public String getOperatorsOverview() {
     final String [] columns = {"Operator", "Type", "Setup (min)", "Setup (avg)", "Setup (max)", "Process (min)", "Process (avg)", "Process (max)", "Wait (min)", "Wait (avg)", "Wait (max)", "Mem (avg)", "Mem (max)"};
-    TableBuilder tb = new TableBuilder(columns);
-    for (OperatorWrapper ow : getOperatorProfiles()) {
+    final TableBuilder tb = new TableBuilder(columns);
+    for (final OperatorWrapper ow : getOperatorProfiles()) {
       ow.addSummary(tb);
     }
     return tb.toString();
   }
 
   public String getOperatorsJSON() {
-    StringBuilder sb = new StringBuilder("{");
+    final StringBuilder sb = new StringBuilder("{");
     String sep = "";
-    for (CoreOperatorType op : CoreOperatorType.values()) {
+    for (final CoreOperatorType op : CoreOperatorType.values()) {
       sb.append(String.format("%s\"%d\" : \"%s\"", sep, op.ordinal(), op));
       sep = ", ";
     }


Mime
View raw message