drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject [2/4] drill git commit: DRILL-1781: Fast Complex Schema
Date Tue, 02 Dec 2014 13:53:34 GMT
DRILL-1781: Fast Complex Schema


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3581a327
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3581a327
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3581a327

Branch: refs/heads/master
Commit: 3581a327e3fad7045615fe03f4da16f1a7f95757
Parents: a60e1db
Author: Steven Phillips <sphillips@maprtech.com>
Authored: Wed Nov 19 18:08:18 2014 -0800
Committer: Steven Phillips <sphillips@maprtech.com>
Committed: Tue Dec 2 04:55:06 2014 -0800

----------------------------------------------------------------------
 .../drill/exec/physical/impl/RootExec.java      |  5 --
 .../drill/exec/physical/impl/ScanBatch.java     | 42 +---------
 .../drill/exec/physical/impl/ScreenCreator.java | 27 +------
 .../exec/physical/impl/SingleSenderCreator.java | 26 ------
 .../exec/physical/impl/TopN/TopNBatch.java      | 78 +++++++++++-------
 .../exec/physical/impl/WriterRecordBatch.java   | 19 ++---
 .../physical/impl/aggregate/HashAggBatch.java   | 50 ++++++------
 .../impl/aggregate/StreamingAggBatch.java       | 43 +++++-----
 .../BroadcastSenderRootExec.java                | 28 -------
 .../physical/impl/filter/FilterRecordBatch.java |  4 -
 .../impl/flatten/FlattenRecordBatch.java        | 33 --------
 .../exec/physical/impl/join/HashJoinBatch.java  | 51 ++++++------
 .../exec/physical/impl/join/JoinStatus.java     | 29 +++++--
 .../exec/physical/impl/join/MergeJoinBatch.java | 74 ++++++++---------
 .../physical/impl/limit/LimitRecordBatch.java   | 18 +++--
 .../impl/mergereceiver/MergingRecordBatch.java  | 57 +++++++++-----
 .../PartitionSenderRootExec.java                | 24 +++---
 .../partitionsender/PartitionerTemplate.java    |  1 +
 .../impl/producer/ProducerConsumerBatch.java    | 25 ------
 .../impl/project/ProjectRecordBatch.java        | 39 +++++----
 .../impl/svremover/RemovingRecordBatch.java     | 10 +--
 .../impl/union/UnionAllRecordBatch.java         | 22 ++----
 .../UnorderedReceiverBatch.java                 | 10 +--
 .../IteratorValidatorBatchIterator.java         |  6 --
 .../window/StreamingWindowFrameRecordBatch.java |  2 +-
 .../physical/impl/xsort/ExternalSortBatch.java  | 58 ++++++++------
 .../drill/exec/record/AbstractRecordBatch.java  | 59 +++++++++++---
 .../exec/record/AbstractSingleRecordBatch.java  | 35 +++++----
 .../apache/drill/exec/record/RecordBatch.java   |  8 --
 .../org/apache/drill/exec/work/WorkManager.java | 37 +++++----
 .../work/batch/UnlimitedRawBatchBuffer.java     | 34 ++++++--
 .../exec/work/fragment/FragmentExecutor.java    |  1 -
 .../exec/nested/TestFastComplexSchema.java      | 83 ++++++++++++++++++++
 .../exec/physical/impl/SimpleRootExec.java      | 14 ----
 .../exec/physical/impl/TestCastFunctions.java   |  2 +-
 .../drill/exec/physical/impl/TestDecimal.java   | 10 +--
 .../physical/impl/TestExtractFunctions.java     |  2 +-
 .../physical/impl/TestSimpleFragmentRun.java    |  2 +-
 .../exec/physical/impl/join/TestMergeJoin.java  |  1 +
 .../physical/impl/window/TestWindowFrame.java   |  8 +-
 .../exec/physical/impl/writer/TestWriter.java   |  2 +-
 .../drill/exec/record/vector/TestDateTypes.java |  2 +-
 .../drill/exec/sql/TestWindowFunctions.java     |  2 +
 .../fn/TestJsonReaderWithSparseFiles.java       |  2 +-
 .../vector/complex/writer/TestJsonReader.java   |  8 +-
 45 files changed, 537 insertions(+), 556 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
index d9c4e5b..a644c34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -29,11 +29,6 @@ public interface RootExec {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootExec.class);
 
   /**
-   * Generate and send emtpy schema batch
-   */
-  public void buildSchema() throws SchemaChangeException;
-
-  /**
    * Do the next batch of work.
    * @return Whether or not additional batches of work are necessary.  False means that this fragment is done.
    */

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 4ed1180..6e1f139 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -80,7 +80,7 @@ public class ScanBatch implements RecordBatch {
   private List<ValueVector> partitionVectors;
   private List<Integer> selectedPartitionColumns;
   private String partitionColumnDesignator;
-  private boolean first = false;
+  private boolean first = true;
   private boolean done = false;
   private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
 
@@ -121,32 +121,12 @@ public class ScanBatch implements RecordBatch {
   }
 
   @Override
-  public IterOutcome buildSchema() {
-    IterOutcome outcome = next();
-    if (outcome == IterOutcome.NONE) {
-      container.buildSchema(SelectionVectorMode.NONE);
-      schema = container.getSchema();
-      done = true;
-    }
-    first = true;
-    tempContainer = VectorContainer.getTransferClone(container);
-    for (VectorWrapper w : container) {
-      w.getValueVector().allocateNew();
-    }
-    return IterOutcome.OK_NEW_SCHEMA;
-  }
-
-  @Override
   public int getRecordCount() {
     return recordCount;
   }
 
   @Override
   public void kill(boolean sendUpstream) {
-    if (currentReader != null) {
-      currentReader.cleanup();
-    }
-
     if (sendUpstream) {
       done = true;
     } else {
@@ -158,24 +138,11 @@ public class ScanBatch implements RecordBatch {
     container.zeroVectors();
   }
 
-  private void transfer() {
-    container.zeroVectors();
-    for (VectorWrapper w : tempContainer) {
-      MaterializedField field = w.getField();
-      w.getValueVector().makeTransferPair(container.addOrGet(field)).transfer();
-    }
-  }
-
   @Override
   public IterOutcome next() {
     if (done) {
       return IterOutcome.NONE;
     }
-    if (first) {
-      first = false;
-      transfer();
-      return IterOutcome.OK;
-    }
     long t1 = System.nanoTime();
     oContext.getStats().startProcessing();
     try {
@@ -193,6 +160,7 @@ public class ScanBatch implements RecordBatch {
           if (!readers.hasNext()) {
             currentReader.cleanup();
             releaseAssets();
+            done = true;
             return IterOutcome.NONE;
           }
           oContext.getStats().startSetup();
@@ -226,14 +194,8 @@ public class ScanBatch implements RecordBatch {
       if (mutator.isNewSchema()) {
         container.buildSchema(SelectionVectorMode.NONE);
         schema = container.getSchema();
-        long t2 = System.nanoTime();
-//        System.out.println((t2 - t1) / recordCount);
-//        BatchPrinter.printBatch(this, "\t");
         return IterOutcome.OK_NEW_SCHEMA;
       } else {
-        long t2 = System.nanoTime();
-//        System.out.println((t2 - t1) / recordCount);
-//        BatchPrinter.printBatch(this, "\t");
         return IterOutcome.OK;
       }
     } catch (Exception ex) {

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 3a843ea..41f6349 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -87,31 +87,6 @@ public class ScreenCreator implements RootCreator<Screen>{
     }
 
     @Override
-    public void buildSchema() throws SchemaChangeException {
-      stats.startProcessing();
-      try {
-        stats.stopProcessing();
-        try {
-          incoming.buildSchema();
-        } finally {
-          stats.startProcessing();
-        }
-
-        QueryWritableBatch batch = QueryWritableBatch.getEmptyBatchWithSchema(context.getHandle().getQueryId(), 0, false, incoming.getSchema());
-        stats.startWait();
-        try {
-          connection.sendResult(listener, batch);
-        } finally {
-          stats.stopWait();
-        }
-        sendCount.increment();
-      } finally {
-        stats.stopProcessing();
-      }
-      materializer = new VectorRecordMaterializer(context, incoming);
-    }
-
-    @Override
     public boolean innerNext() {
       if(!ok){
         stop();
@@ -129,7 +104,7 @@ public class ScreenCreator implements RootCreator<Screen>{
               .setQueryId(context.getHandle().getQueryId()) //
               .setRowCount(0) //
               .setQueryState(QueryState.FAILED)
-              .addError(ErrorHelper.logAndConvertMessageError(context.getIdentity(), "Query stopeed.",
+              .addError(ErrorHelper.logAndConvertMessageError(context.getIdentity(), "Query stopped.",
                 context.getFailureCause(), logger, verbose))
               .setDef(RecordBatchDef.getDefaultInstance()) //
               .setIsLastChunk(true) //

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index b638de0..bd4ccd0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -85,32 +85,6 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
     }
 
     @Override
-    public void buildSchema() throws SchemaChangeException {
-      stats.startProcessing();
-      try {
-        stats.stopProcessing();
-        try {
-          incoming.buildSchema();
-        } finally {
-          stats.startProcessing();
-        }
-
-        FragmentWritableBatch batch = FragmentWritableBatch.getEmptyBatchWithSchema(handle.getQueryId(),
-                handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), 0, incoming.getSchema());
-
-        stats.startWait();
-        try {
-          tunnel.sendRecordBatch(new RecordSendFailure(), batch);
-        } finally {
-          stats.stopWait();
-        }
-        sendCount.increment();
-      } finally {
-        stats.stopProcessing();
-      }
-    }
-
-    @Override
     public boolean innerNext() {
       if (!ok) {
         incoming.kill(false);

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 400a867..a67f835 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -86,6 +86,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   private long countSincePurge;
   private int batchCount;
   private Copier copier;
+  private boolean schemaBuilt = false;
+  private boolean first = true;
+  private int recordCount = 0;
+  private boolean stop;
 
   public TopNBatch(TopN popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context);
@@ -96,7 +100,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
 
   @Override
   public int getRecordCount() {
-    return sv4.getCount();
+    return recordCount;
   }
 
   @Override
@@ -121,38 +125,43 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     incoming.cleanup();
   }
 
-  @Override
-  public IterOutcome buildSchema() throws SchemaChangeException {
+  public void buildSchema() throws SchemaChangeException {
     VectorContainer c = new VectorContainer(oContext);
-    stats.startProcessing();
-    try {
-      stats.stopProcessing();
-      try {
-        incoming.buildSchema();
-      } finally {
-        stats.startProcessing();
-      }
-      for (VectorWrapper w : incoming) {
-        c.addOrGet(w.getField());
-      }
-      c = VectorContainer.canonicalize(c);
-      for (VectorWrapper w : c) {
-        container.add(w.getValueVector());
-      }
-      container.buildSchema(SelectionVectorMode.NONE);
-      container.setRecordCount(0);
-      return IterOutcome.OK_NEW_SCHEMA;
-    } finally {
-      stats.stopProcessing();
+    IterOutcome outcome = next(incoming);
+    switch (outcome) {
+      case OK:
+      case OK_NEW_SCHEMA:
+        for (VectorWrapper w : incoming) {
+          c.addOrGet(w.getField());
+        }
+        c = VectorContainer.canonicalize(c);
+        for (VectorWrapper w : c) {
+          ValueVector v = container.addOrGet(w.getField());
+          v.allocateNew();
+        }
+        container.buildSchema(SelectionVectorMode.NONE);
+        container.setRecordCount(0);
+        return;
+      case STOP:
+        stop = true;
+      case NONE:
+        state = BatchState.DONE;
+      default:
+        return;
     }
   }
 
   @Override
   public IterOutcome innerNext() {
+    if (state == BatchState.DONE) {
+      return IterOutcome.NONE;
+    }
     if (schema != null) {
       if (getSelectionVector4().next()) {
+        recordCount = sv4.getCount();
         return IterOutcome.OK;
       } else {
+        recordCount = 0;
         return IterOutcome.NONE;
       }
     }
@@ -162,7 +171,13 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
       outer: while (true) {
         Stopwatch watch = new Stopwatch();
         watch.start();
-        IterOutcome upstream = incoming.next();
+        IterOutcome upstream;
+        if (first) {
+          upstream = IterOutcome.OK_NEW_SCHEMA;
+          first = false;
+        } else {
+          upstream = next(incoming);
+        }
         if (upstream == IterOutcome.OK && schema == null) {
           upstream = IterOutcome.OK_NEW_SCHEMA;
           container.clear();
@@ -185,6 +200,12 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
           }
           // fall through.
         case OK:
+          if (incoming.getRecordCount() == 0) {
+            for (VectorWrapper w : incoming) {
+              w.clear();
+            }
+            break;
+          }
           countSincePurge += incoming.getRecordCount();
           batchCount++;
           RecordBatchData batch = new RecordBatchData(incoming);
@@ -204,8 +225,9 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
         }
       }
 
-      if (schema == null) {
+      if (schema == null || priorityQueue == null) {
         // builder may be null at this point if the first incoming batch is empty
+        state = BatchState.DONE;
         return IterOutcome.NONE;
       }
 
@@ -218,6 +240,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
       }
       container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
 
+      recordCount = sv4.getCount();
       return IterOutcome.OK_NEW_SCHEMA;
 
     } catch(SchemaChangeException | ClassTransformationException | IOException ex) {
@@ -345,11 +368,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     }
 
     @Override
-    public IterOutcome buildSchema() throws SchemaChangeException {
-      return null;
-    }
-
-    @Override
     public int getRecordCount() {
       if (sv4 != null) {
         return sv4.getCount();

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index cb0de02..74a674e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -54,7 +54,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
   private BatchSchema schema;
 
   public WriterRecordBatch(Writer writer, RecordBatch incoming, FragmentContext context, RecordWriter recordWriter) throws OutOfMemoryException {
-    super(writer, context);
+    super(writer, context, false);
     this.incoming = incoming;
 
     FragmentHandle handle = context.getHandle();
@@ -78,17 +78,12 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
   }
 
   @Override
-  public IterOutcome buildSchema() throws SchemaChangeException {
-    incoming.buildSchema();
-    try {
-      stats.startProcessing();
-      setupNewSchema();
-    } catch (Exception e) {
-      throw new SchemaChangeException(e);
-    } finally {
-      stats.stopProcessing();
-    }
-    return IterOutcome.OK_NEW_SCHEMA;
+  public void buildSchema() throws SchemaChangeException {
+//    try {
+//      setupNewSchema();
+//    } catch (Exception e) {
+//      throw new SchemaChangeException(e);
+//    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index a0b8d3f..113e883 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -46,6 +46,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.ValueVector;
@@ -58,11 +59,9 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
 
   private HashAggregator aggregator;
   private final RecordBatch incoming;
-  private boolean done = false;
   private LogicalExpression[] aggrExprs;
   private TypedFieldId[] groupByOutFieldIds ;
   private TypedFieldId[] aggrOutFieldIds ;      // field ids for the outgoing batch
-  private boolean first = true;
 
   private final GeneratorMapping UPDATE_AGGR_INSIDE =
     GeneratorMapping.create("setupInterior" /* setup method */, "updateAggrValuesInternal" /* eval method */,
@@ -82,44 +81,41 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
 
   @Override
   public int getRecordCount() {
-    if (done) {
+    if (state == BatchState.DONE) {
       return 0;
     }
     return aggregator.getOutputCount();
   }
 
   @Override
-  public IterOutcome buildSchema() throws SchemaChangeException {
-    stats.startProcessing();
-    try {
-      stats.stopProcessing();
-      try {
-        incoming.buildSchema();
-      } finally {
-        stats.startProcessing();
-      }
-      if (!createAggregator()) {
-        done = true;
-        return IterOutcome.STOP;
-      }
-      return IterOutcome.OK_NEW_SCHEMA;
-    } finally {
-      stats.stopProcessing();
+  public void buildSchema() throws SchemaChangeException {
+    if (next(incoming) == IterOutcome.NONE) {
+      state = BatchState.DONE;
+      container.buildSchema(SelectionVectorMode.NONE);
+      return;
+    }
+    if (!createAggregator()) {
+      state = BatchState.DONE;
+    }
+    for (VectorWrapper w : container) {
+      w.getValueVector().allocateNew();
     }
   }
 
   @Override
   public IterOutcome innerNext() {
-    if (done) {
-      return IterOutcome.NONE;
-    }
     // this is only called on the first batch. Beyond this, the aggregator manages batches.
-    if (aggregator == null || first) {
-      first = false;
+    if (aggregator == null || state == BatchState.FIRST) {
       if (aggregator != null) {
         aggregator.cleanup();
       }
-      IterOutcome outcome = next(incoming);
+      IterOutcome outcome;
+      if (state == BatchState.FIRST) {
+        state = BatchState.NOT_FIRST;
+        outcome = IterOutcome.OK;
+      } else {
+        outcome = next(incoming);
+      }
       if (outcome == IterOutcome.OK) {
         outcome = IterOutcome.OK_NEW_SCHEMA;
       }
@@ -133,7 +129,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
         return outcome;
       case OK_NEW_SCHEMA:
         if (!createAggregator()) {
-          done = true;
+          state = BatchState.DONE;
           return IterOutcome.STOP;
         }
         break;
@@ -163,7 +159,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       case CLEANUP_AND_RETURN:
         container.zeroVectors();
         aggregator.cleanup();
-        done = true;
+        state = BatchState.DONE;
         // fall through
       case RETURN_OUTCOME:
         IterOutcome outcome = aggregator.getOutcome();

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 17aaae8..ef85a36 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -45,6 +45,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.ValueVector;
@@ -59,6 +60,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   private final RecordBatch incoming;
   private boolean done = false;
   private boolean first = true;
+  private boolean schemaBuilt = false;
 
   public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
     super(popConfig, context);
@@ -77,34 +79,31 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   }
 
   @Override
-  public IterOutcome buildSchema() throws SchemaChangeException {
-    stats.startProcessing();
-    try {
-      stats.stopProcessing();
-      try {
-        incoming.buildSchema();
-      } finally {
-        stats.startProcessing();
-      }
-      if (!createAggregator()) {
-        done = true;
-        return IterOutcome.STOP;
-      }
-      return IterOutcome.OK_NEW_SCHEMA;
-    } finally {
-      stats.stopProcessing();
+  public void buildSchema() throws SchemaChangeException {
+    if (next(incoming) == IterOutcome.NONE) {
+      state = BatchState.DONE;
+      container.buildSchema(SelectionVectorMode.NONE);
+      return;
+    }
+    if (!createAggregator()) {
+      state = BatchState.DONE;
+    }
+    for (VectorWrapper w : container) {
+      w.getValueVector().allocateNew();
     }
   }
+
   @Override
   public IterOutcome innerNext() {
-    if (done) {
-      container.zeroVectors();
-      return IterOutcome.NONE;
-    }
       // this is only called on the first batch. Beyond this, the aggregator manages batches.
     if (aggregator == null || first) {
-      first = false;
-      IterOutcome outcome = next(incoming);
+      IterOutcome outcome;
+      if (first && incoming.getRecordCount() > 0) {
+        first = false;
+        outcome = IterOutcome.OK_NEW_SCHEMA;
+      } else {
+        outcome = next(incoming);
+      }
       logger.debug("Next outcome of {}", outcome);
       switch (outcome) {
       case NONE:

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index 4e7d222..22fa047 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -85,34 +85,6 @@ public class BroadcastSenderRootExec extends BaseRootExec {
   }
 
   @Override
-  public void buildSchema() throws SchemaChangeException {
-    stats.startProcessing();
-    try {
-      stats.stopProcessing();
-      try {
-        incoming.buildSchema();
-      } finally {
-        stats.startProcessing();
-      }
-
-      FragmentWritableBatch batch = FragmentWritableBatch.getEmptyBatchWithSchema(handle.getQueryId(),
-              handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), 0, incoming.getSchema());
-
-      stats.startWait();
-      for (int i = 0; i < tunnels.length; i++) {
-        try {
-          tunnels[i].sendRecordBatch(this.statusHandler, batch);
-        } finally {
-          stats.stopWait();
-        }
-        statusHandler.sendCount.increment();
-      }
-    } finally {
-      stats.stopProcessing();
-    }
-  }
-
-  @Override
   public boolean innerNext() {
     if(!ok) {
       context.fail(statusHandler.ex);

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index 7d68e07..064d5c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -80,10 +80,6 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
     container.zeroVectors();
     int recordCount = incoming.getRecordCount();
     filter.filterBatch(recordCount);
-//    for (VectorWrapper<?> v : container) {
-//      ValueVector.Mutator m = v.getValueVector().getMutator();
-//      m.setValueCount(recordCount);
-//    }
 
     return IterOutcome.OK;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 8d14d2e..e82dd29 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -252,39 +252,6 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
     return ref;
   }
 
-  @Override
-  public IterOutcome buildSchema() throws SchemaChangeException {
-    incoming.buildSchema();
-    if ( ! fastSchemaCalled ) {
-      for (VectorWrapper vw : incoming) {
-        if (vw.getField().getPath().equals(popConfig.getColumn())) {
-          if (vw.getValueVector() instanceof MapVector) {
-            // fast schema upstream did not report a repeated type
-            // assume it will be repeated in the actual results and it will fail in execution if it is not
-            container.addOrGet(vw.getField());
-          } else if (! (vw.getValueVector() instanceof RepeatedVector )) {
-            container.addOrGet(vw.getField());
-          } else {
-            TransferPair pair = getFlattenFieldTransferPair();
-            if (pair == null) {
-              continue;
-            }
-            container.add(pair.getTo());
-          }
-        } else {
-          container.addOrGet(vw.getField());
-        }
-      }
-      fastSchemaCalled = true;
-      container.buildSchema(SelectionVectorMode.NONE);
-      return IterOutcome.OK_NEW_SCHEMA;
-    }
-    else {
-      setupNewSchema();
-      return IterOutcome.OK_NEW_SCHEMA;
-    }
-  }
-
   /**
    * The data layout is the same for the actual data within a repeated field, as it is in a scalar vector for
    * the same sql type. For example, a repeated int vector has a vector of offsets into a regular int vector to

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 7f4d03c..7d2557e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -103,9 +103,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     // Schema of the build side
     private BatchSchema rightSchema = null;
 
-    private boolean first = true;
 
-    private boolean done = false;
 
     // Generator mapping for the build side
     // Generator mapping for the build side : scalar
@@ -143,10 +141,11 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     boolean firstOutputBatch = true;
 
     IterOutcome leftUpstream = IterOutcome.NONE;
+    IterOutcome rightUpstream = IterOutcome.NONE;
 
     private final HashTableStats htStats = new HashTableStats();
 
-    public enum Metric implements MetricDef {
+  public enum Metric implements MetricDef {
 
       NUM_BUCKETS,
       NUM_ENTRIES,
@@ -169,9 +168,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
 
 
   @Override
-  public IterOutcome buildSchema() throws SchemaChangeException {
-    leftUpstream = left.buildSchema();
-    right.buildSchema();
+  protected void buildSchema() throws SchemaChangeException {
+    leftUpstream = next(left);
+    rightUpstream = next(right);
     // Initialize the hash join helper context
     hjHelper = new HashJoinHelper(context, oContext.getAllocator());
     try {
@@ -188,29 +187,27 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
       setupHashTable();
       hashJoinProbe = setupHashJoinProbe();
       // Build the container schema and set the counts
+      for (VectorWrapper w : container) {
+        w.getValueVector().allocateNew();
+      }
       container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
       container.setRecordCount(outputRecords);
     } catch (IOException | ClassTransformationException e) {
       throw new SchemaChangeException(e);
     }
-    return IterOutcome.OK_NEW_SCHEMA;
   }
 
     @Override
     public IterOutcome innerNext() {
-        if (done) {
-          return IterOutcome.NONE;
-        }
         try {
             /* If we are here for the first time, execute the build phase of the
              * hash join and setup the run time generated class for the probe side
              */
-            if (first) {
-                first = false;
+            if (state == BatchState.FIRST) {
                 // Build the hash table, using the build side record batches.
                 executeBuildPhase();
 //                IterOutcome next = next(HashJoinHelper.LEFT_INPUT, left);
-                hashJoinProbe.setupHashJoinProbe(context, hyperContainer, left, 0, this, hashTable, hjHelper, joinType);
+                hashJoinProbe.setupHashJoinProbe(context, hyperContainer, left, left.getRecordCount(), this, hashTable, hjHelper, joinType);
 
                 // Update the hash table related stats for the operator
                 updateStats(this.hashTable);
@@ -230,8 +227,10 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
                  * 2. We've filled up the outgoing batch to the maximum and we need to return upstream
                  * Either case build the output container's schema and return
                  */
-                if (outputRecords > 0 || first) {
-                  first = false;
+                if (outputRecords > 0 || state == BatchState.FIRST) {
+                  if (state == BatchState.FIRST) {
+                    state = BatchState.NOT_FIRST;
+                  }
 
 
                   for (VectorWrapper<?> v : container) {
@@ -258,10 +257,10 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
             }
 
             // No more output records, clean up and return
-            done = true;
-            if (first) {
-              return IterOutcome.OK_NEW_SCHEMA;
-            }
+            state = BatchState.DONE;
+//            if (first) {
+//              return IterOutcome.OK_NEW_SCHEMA;
+//            }
             return IterOutcome.NONE;
         } catch (ClassTransformationException | SchemaChangeException | IOException e) {
             context.fail(e);
@@ -308,10 +307,14 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {
 
         //Setup the underlying hash table
-        IterOutcome rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right);
-      if (hashTable == null) {
-        rightUpstream = IterOutcome.OK_NEW_SCHEMA;
-      }
+
+      // skip first batch if count is zero, as it may be an empty schema batch
+        if (right.getRecordCount() == 0) {
+          for (VectorWrapper w : right) {
+            w.clear();
+          }
+          rightUpstream = next(right);
+        }
 
         boolean moreData = true;
 
@@ -475,7 +478,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     }
 
     public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
-        super(popConfig, context);
+        super(popConfig, context, true);
         this.left = left;
         this.right = right;
         this.joinType = popConfig.getJoinType();

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index 3bc8daa..a7fa5aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -35,6 +35,12 @@ public final class JoinStatus {
     INCOMING, SV4;
   }
 
+  private static enum InitState {
+    INIT, // initial state
+    CHECK, // need to check if batches are empty
+    READY // read to do work
+  }
+
   private static final int LEFT_INPUT = 0;
   private static final int RIGHT_INPUT = 1;
 
@@ -55,7 +61,7 @@ public final class JoinStatus {
   private final JoinRelType joinType;
 
   public boolean ok = true;
-  private boolean initialSet = false;
+  private InitState initialSet = InitState.INIT;
   private boolean leftRepeating = false;
 
   public JoinStatus(RecordBatch left, RecordBatch right, MergeJoinBatch output) {
@@ -75,10 +81,23 @@ public final class JoinStatus {
   }
 
   public final void ensureInitial() {
-    if(!initialSet) {
-      this.lastLeft = nextLeft();
-      this.lastRight = nextRight();
-      initialSet = true;
+    switch(initialSet) {
+      case INIT:
+        this.lastLeft = nextLeft();
+        this.lastRight = nextRight();
+        initialSet = InitState.CHECK;
+        break;
+      case CHECK:
+        if (lastLeft != IterOutcome.NONE && left.getRecordCount() == 0) {
+          this.lastLeft = nextLeft();
+        }
+        if (lastRight != IterOutcome.NONE && right.getRecordCount() == 0) {
+          this.lastRight = nextRight();
+        }
+        initialSet = InitState.READY;
+        // fall through
+      default:
+        break;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 518971d..87b12b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -111,10 +111,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
   private final JoinRelType joinType;
   private JoinWorker worker;
   public MergeJoinBatchBuilder batchBuilder;
-  private boolean done = false;
 
   protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
-    super(popConfig, context);
+    super(popConfig, context, true);
 
     if (popConfig.getConditions().size() == 0) {
       throw new UnsupportedOperationException("Merge Join currently does not support cartesian join.  This join operator was configured with 0 conditions");
@@ -136,24 +135,13 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     return status.getOutPosition();
   }
 
-  @Override
-  public IterOutcome buildSchema() throws SchemaChangeException {
-    left.buildSchema();
-    right.buildSchema();
-    try {
-      allocateBatch(true);
-      worker = generateNewWorker();
-    } catch (IOException | ClassTransformationException e) {
-      throw new SchemaChangeException(e);
-    }
-    return IterOutcome.OK_NEW_SCHEMA;
+  public void buildSchema() throws SchemaChangeException {
+    status.ensureInitial();
+    allocateBatch(true);
   }
 
   @Override
   public IterOutcome innerNext() {
-    if (done) {
-      return IterOutcome.NONE;
-    }
     // we do this in the here instead of the constructor because don't necessary want to start consuming on construction.
     status.ensureInitial();
 
@@ -214,7 +202,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
       case NO_MORE_DATA:
         logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : (first ? "OK_NEW_SCHEMA" :"NONE")));
         setRecordCountInContainer();
-        done = true;
+        state = BatchState.DONE;
         return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.NONE);
       case SCHEMA_CHANGED:
         worker = null;
@@ -437,35 +425,37 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     // allocate new batch space.
     container.zeroVectors();
 
-    //estimation of joinBatchSize : max of left/right size, expanded by a factor of 16, which is then bounded by MAX_BATCH_SIZE.
-    int leftCount = worker == null ? left.getRecordCount() : (status.isLeftPositionAllowed() ? left.getRecordCount() : 0);
-    int rightCount = worker == null ? left.getRecordCount() : (status.isRightPositionAllowed() ? right.getRecordCount() : 0);
-    int joinBatchSize = Math.min(Math.max(leftCount, rightCount) * 16, MAX_BATCH_SIZE);
+    boolean leftAllowed = status.getLastLeft() != IterOutcome.NONE;
+    boolean rightAllowed = status.getLastRight() != IterOutcome.NONE;
 
     if (newSchema) {
     // add fields from both batches
-      for (VectorWrapper<?> w : left) {
-        MajorType inputType = w.getField().getType();
-        MajorType outputType;
-        if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) {
-          outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
-        } else {
-          outputType = inputType;
+      if (leftAllowed) {
+        for (VectorWrapper<?> w : left) {
+          MajorType inputType = w.getField().getType();
+          MajorType outputType;
+          if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) {
+            outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
+          } else {
+            outputType = inputType;
+          }
+          MaterializedField newField = MaterializedField.create(w.getField().getPath(), outputType);
+          container.addOrGet(newField);
         }
-        MaterializedField newField = MaterializedField.create(w.getField().getPath(), outputType);
-       container.addOrGet(newField);
       }
 
-      for (VectorWrapper<?> w : right) {
-        MajorType inputType = w.getField().getType();
-        MajorType outputType;
-        if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) {
-          outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
-        } else {
-          outputType = inputType;
+      if (rightAllowed) {
+        for (VectorWrapper<?> w : right) {
+          MajorType inputType = w.getField().getType();
+          MajorType outputType;
+          if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) {
+            outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
+          } else {
+            outputType = inputType;
+          }
+          MaterializedField newField = MaterializedField.create(w.getField().getPath(), outputType);
+          container.addOrGet(newField);
         }
-        MaterializedField newField = MaterializedField.create(w.getField().getPath(), outputType);
-        container.addOrGet(newField);
       }
     }
 
@@ -489,7 +479,8 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
 
         // materialize value vector readers from join expression
         LogicalExpression materializedLeftExpr;
-        if (worker == null || status.isLeftPositionAllowed()) {
+        if (status.getLastLeft() != IterOutcome.NONE) {
+//          if (status.isLeftPositionAllowed()) {
           materializedLeftExpr = ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, context.getFunctionRegistry());
         } else {
           materializedLeftExpr = new TypedNullConstant(Types.optional(MinorType.INT));
@@ -500,7 +491,8 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
         }
 
         LogicalExpression materializedRightExpr;
-        if (worker == null || status.isRightPositionAllowed()) {
+//        if (status.isRightPositionAllowed()) {
+        if (status.getLastRight() != IterOutcome.NONE) {
           materializedRightExpr = ExpressionTreeMaterializer.materialize(rightFieldExpr, right, collector, context.getFunctionRegistry());
         } else {
           materializedRightExpr = new TypedNullConstant(Types.optional(MinorType.INT));

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 02e1a92..7e66893 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -43,6 +43,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   private boolean noEndLimit;
   private boolean skipBatch;
   private boolean done = false;
+  private boolean first = true;
   List<TransferPair> transfers = Lists.newArrayList();
 
   public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
@@ -93,10 +94,10 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
       return IterOutcome.NONE;
     }
 
-    if(!noEndLimit && recordsLeft <= 0) {
+    if(!first && !noEndLimit && recordsLeft <= 0) {
       incoming.kill(true);
 
-      IterOutcome upStream = incoming.next();
+      IterOutcome upStream = next(incoming);
 
       while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
 
@@ -104,7 +105,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
         for (VectorWrapper<?> wrapper : incoming) {
           wrapper.getValueVector().clear();
         }
-        upStream = incoming.next();
+        upStream = next(incoming);
       }
 
       return IterOutcome.NONE;
@@ -119,11 +120,18 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
 
   @Override
   protected IterOutcome doWork() {
-    for(TransferPair tp : transfers) {
-      tp.transfer();
+    if (first) {
+      first = false;
     }
     skipBatch = false;
     int recordCount = incoming.getRecordCount();
+    if (recordCount == 0) {
+      skipBatch = true;
+      return IterOutcome.OK;
+    }
+    for(TransferPair tp : transfers) {
+      tp.transfer();
+    }
     if(recordCount <= recordsToSkip) {
       recordsToSkip -= recordCount;
       skipBatch = true;

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 8da8f96..acbb755 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.PriorityQueue;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -107,7 +108,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   private int[] batchOffsets;
   private PriorityQueue <Node> pqueue;
   private RawFragmentBatch emptyBatch = null;
-  private boolean done = false;
+  private RawFragmentBatch[] tempBatchHolder; //
 
   public static enum Metric implements MetricDef{
     BYTES_RECEIVED,
@@ -123,7 +124,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   public MergingRecordBatch(FragmentContext context,
                             MergingReceiverPOP config,
                             RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException {
-    super(config, context, new OperatorContext(config, context, false));
+    super(config, context, true, new OperatorContext(config, context, false));
     //super(config, context);
     this.fragProviders = fragProviders;
     this.context = context;
@@ -151,9 +152,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     if (fragProviders.length == 0) {
       return IterOutcome.NONE;
     }
-    if (done) {
-      return IterOutcome.NONE;
-    }
     boolean schemaChanged = false;
 
     if (prevBatchWasFull) {
@@ -175,11 +173,18 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
 
       // set up each (non-empty) incoming record batch
       List<RawFragmentBatch> rawBatches = Lists.newArrayList();
-      boolean firstBatch = true;
+      int p = 0;
       for (RawFragmentBatchProvider provider : fragProviders) {
         RawFragmentBatch rawBatch = null;
         try {
-          rawBatch = getNext(provider);
+          // check if there is a batch in temp holder before calling getNext(), as it may have been used when building schema
+          if (tempBatchHolder[p] != null) {
+            rawBatch = tempBatchHolder[p];
+            tempBatchHolder[p] = null;
+          } else {
+            rawBatch = getNext(provider);
+          }
+          p++;
           if (rawBatch == null && context.isCancelled()) {
             return IterOutcome.STOP;
           }
@@ -190,7 +195,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
         if (rawBatch.getHeader().getDef().getRecordCount() != 0) {
           rawBatches.add(rawBatch);
         } else {
-          if (emptyBatch == null) {
+          // save an empty batch to use for schema purposes. ignore batch if it contains no fields, and thus no schema
+          if (emptyBatch == null && rawBatch.getHeader().getDef().getFieldCount() != 0) {
             emptyBatch = rawBatch;
           }
           try {
@@ -406,9 +412,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     }
 
     if (pqueue.isEmpty()) {
-      if (!done) {
-        done = !done;
-      }
+      state = BatchState.DONE;
     }
 
     if (schemaChanged) {
@@ -429,22 +433,33 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     return outgoingContainer.getSchema();
   }
 
-  @Override
-  public IterOutcome buildSchema() throws SchemaChangeException {
-    stats.startProcessing();
+  public void buildSchema() {
+    // find frag provider that has data to use to build schema, and put in tempBatchHolder for later use
+    tempBatchHolder = new RawFragmentBatch[fragProviders.length];
+    int i = 0;
     try {
-      RawFragmentBatch batch = getNext(fragProviders[0]);
-      for (SerializedField field : batch.getHeader().getDef().getFieldList()) {
-        outgoingContainer.addOrGet(MaterializedField.create(field));
+      while (true) {
+        if (i >= fragProviders.length) {
+          state = BatchState.DONE;
+          return;
+        }
+        RawFragmentBatch batch = getNext(fragProviders[i]);
+        if (batch.getHeader().getDef().getFieldCount() == 0) {
+          i++;
+          continue;
+        }
+        tempBatchHolder[i] = batch;
+        for (SerializedField field : batch.getHeader().getDef().getFieldList()) {
+          ValueVector v = outgoingContainer.addOrGet(MaterializedField.create(field));
+          v.allocateNew();
+        }
+        break;
       }
     } catch (IOException e) {
-      throw new SchemaChangeException(e);
-    } finally {
-      stats.stopProcessing();
+      throw new DrillRuntimeException(e);
     }
     outgoingContainer = VectorContainer.canonicalize(outgoingContainer);
     outgoingContainer.buildSchema(SelectionVectorMode.NONE);
-    return IterOutcome.OK_NEW_SCHEMA;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 9e3cfe5..f09acaa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -110,25 +110,17 @@ public class PartitionSenderRootExec extends BaseRootExec {
     return true;
   }
 
-  @Override
-  public void buildSchema() throws SchemaChangeException {
-    incoming.buildSchema();
-    stats.startProcessing();
+  private void buildSchema() throws SchemaChangeException {
+    createPartitioner();
     try {
-      createPartitioner();
-      try {
-        partitioner.flushOutgoingBatches(false, true);
-      } catch (IOException e) {
-        throw new SchemaChangeException(e);
-      }
-    } finally {
-      stats.stopProcessing();
+      partitioner.flushOutgoingBatches(false, true);
+    } catch (IOException e) {
+      throw new SchemaChangeException(e);
     }
   }
 
   @Override
   public boolean innerNext() {
-    boolean newSchema = false;
 
     if (!ok) {
       stop();
@@ -146,7 +138,6 @@ public class PartitionSenderRootExec extends BaseRootExec {
 
     logger.debug("Partitioner.next(): got next record batch with status {}", out);
     if (first && out == IterOutcome.OK) {
-      first = false;
       out = IterOutcome.OK_NEW_SCHEMA;
     }
     switch(out){
@@ -179,6 +170,11 @@ public class PartitionSenderRootExec extends BaseRootExec {
             partitioner.clear();
           }
           createPartitioner();
+          // flush to send schema downstream
+          if (first) {
+            first = false;
+            partitioner.flushOutgoingBatches(false, true);
+          }
         } catch (IOException e) {
           incoming.kill(false);
           logger.error("Error while flushing outgoing batches", e);

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index a16e29f..20f6195 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -364,6 +364,7 @@ public abstract class PartitionerTemplate implements Partitioner {
 
         // allocate a new value vector
         ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), allocator);
+        v.getValueVector().makeTransferPair(outgoingVector);
         outgoingVector.allocateNew();
         vectorContainer.add(outgoingVector);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 132c41e..4c9b33b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -58,31 +58,6 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
   }
 
   @Override
-  public IterOutcome buildSchema() throws SchemaChangeException {
-    stats.startProcessing();
-    try {
-      stats.stopProcessing();
-      try {
-        incoming.buildSchema();
-      } finally {
-        stats.startProcessing();
-      }
-      stats.startSetup();
-      try {
-        for (VectorWrapper w : incoming) {
-          container.addOrGet(w.getField());
-        }
-      } finally {
-        stats.stopSetup();
-      }
-    } finally {
-      stats.stopProcessing();
-    }
-    container.buildSchema(incoming.getSchema().getSelectionVectorMode());
-    return IterOutcome.OK_NEW_SCHEMA;
-  }
-
-  @Override
   public IterOutcome innerNext() {
     if (!running) {
       producer.start();

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 27cb1f2..f822e55 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -17,7 +17,9 @@
  */
 package org.apache.drill.exec.physical.impl.project;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.PrintStream;
 import java.util.HashMap;
 import java.util.List;
 
@@ -61,8 +63,8 @@ import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.BatchPrinter;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
 import com.carrotsearch.hppc.IntOpenHashSet;
@@ -83,6 +85,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   private boolean buildingSchema = true;
 
   private static final String EMPTY_STRING = "";
+  private boolean first = true;
 
   private class ClassifierResult {
     public boolean isStar = false;
@@ -136,9 +139,29 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
 
   @Override
   protected IterOutcome doWork() {
-//    VectorUtil.showVectorAccessibleContent(incoming, ",");
     int incomingRecordCount = incoming.getRecordCount();
 
+    if (first && incomingRecordCount == 0) {
+      if (complexWriters != null) {
+        IterOutcome next = null;
+        while (incomingRecordCount == 0) {
+          next = next(incoming);
+          if (next != IterOutcome.OK && next != IterOutcome.OK_NEW_SCHEMA) {
+            return next;
+          }
+          incomingRecordCount = incoming.getRecordCount();
+        }
+        if (next == IterOutcome.OK_NEW_SCHEMA) {
+          try {
+            setupNewSchema();
+          } catch (SchemaChangeException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    }
+    first = false;
+
     container.zeroVectors();
 
     if (!doAlloc()) {
@@ -265,13 +288,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   }
 
   @Override
-  public IterOutcome buildSchema() throws SchemaChangeException {
-    incoming.buildSchema();
-    setupNewSchema();
-    return IterOutcome.OK_NEW_SCHEMA;
-  }
-
-  @Override
   protected boolean setupNewSchema() throws SchemaChangeException {
     if (allocationVectors != null) {
       for (ValueVector v : allocationVectors) {
@@ -400,11 +416,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
         ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef());
         cg.addExpr(expr);
-        if (buildingSchema) {
-          buildingSchema = false;
-          MaterializedField f = MaterializedField.create(outputField.getPath().getAsUnescapedPath(), Types.required(MinorType.MAP));
-          container.addOrGet(f);
-        }
       } else{
         // need to do evaluation.
         ValueVector vector = container.addOrGet(outputField, callBack);

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index f2c1e89..1fa759c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -17,7 +17,9 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.PrintStream;
 import java.util.List;
 
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -34,6 +36,7 @@ import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.util.BatchPrinter;
 import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -60,13 +63,6 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
   }
 
   @Override
-  public IterOutcome buildSchema() throws SchemaChangeException {
-    incoming.buildSchema();
-    setupNewSchema();
-    return IterOutcome.OK_NEW_SCHEMA;
-  }
-
-  @Override
   protected boolean setupNewSchema() throws SchemaChangeException {
     container.zeroVectors();
     switch(incoming.getSchema().getSelectionVectorMode()){

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 42492ab..78b53a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -47,7 +47,7 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
   private int outRecordCount;
 
   public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children, FragmentContext context) throws OutOfMemoryException {
-    super(config, context);
+    super(config, context, false);
     this.incoming = children;
     this.incomingIterator = incoming.iterator();
     current = incomingIterator.next();
@@ -78,11 +78,8 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
     }
   }
 
-  @Override
-  public IterOutcome buildSchema() throws SchemaChangeException {
-    incoming.get(0).buildSchema();
+  public void buildSchema() throws SchemaChangeException {
     setupSchema();
-    return IterOutcome.OK_NEW_SCHEMA;
   }
 
   @Override
@@ -103,11 +100,6 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
         return IterOutcome.NONE;
       }
       current = incomingIterator.next();
-      try {
-        current.buildSchema();
-      } catch (SchemaChangeException e) {
-        throw new RuntimeException(e);
-      }
       upstream = current.next();
       if (upstream == IterOutcome.OK) {
         upstream = IterOutcome.OK_NEW_SCHEMA;
@@ -132,18 +124,16 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
 
   private void doTransfer() {
     outRecordCount = current.getRecordCount();
+    // skip empty batches
+    if (outRecordCount == 0) {
+      return;
+    }
     if (container.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) {
       this.sv = current.getSelectionVector2();
     }
     for (TransferPair transfer : transfers) {
       transfer.transfer();
     }
-
-//    for (VectorWrapper<?> vw : this.container) {
-//      ValueVector.Mutator m = vw.getValueVector().getMutator();
-//      m.setValueCount(outRecordCount);
-//    }
-
   }
 
   private void setupSchema() {

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 25fec41..52b892e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -95,11 +95,6 @@ public class UnorderedReceiverBatch implements RecordBatch {
   }
 
   @Override
-  public IterOutcome buildSchema() throws SchemaChangeException {
-    return next();
-  }
-
-  @Override
   public int getRecordCount() {
     return batchLoader.getRecordCount();
   }
@@ -108,9 +103,8 @@ public class UnorderedReceiverBatch implements RecordBatch {
   public void kill(boolean sendUpstream) {
     if (sendUpstream) {
       informSenders();
-    } else {
-      fragProvider.kill(context);
     }
+    fragProvider.kill(context);
   }
 
   @Override
@@ -148,7 +142,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
         batch = fragProvider.getNext();
 
         // skip over empty batches. we do this since these are basically control messages.
-        while (batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0 && !first) {
+        while (batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0 && (!first || batch.getHeader().getDef().getFieldCount() == 0)) {
           batch = fragProvider.getNext();
         }
       } finally {

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 7f5ab2a..66ec22f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -76,12 +76,6 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
   }
 
   @Override
-  public IterOutcome buildSchema() throws SchemaChangeException {
-    state = incoming.buildSchema();
-    return state;
-  }
-
-  @Override
   public int getRecordCount() {
     validateReadState();
     return incoming.getRecordCount();

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java
index e0e9d42..38c6884 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java
@@ -252,7 +252,7 @@ public class StreamingWindowFrameRecordBatch extends AbstractSingleRecordBatch<W
     }
 
     if (out == StreamingWindowFramer.AggOutcome.RETURN_AND_COMPLETE) {
-      done = true;
+      state = BatchState.DONE;
     }
 
     return framer.getOutcome();

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 3f2692e..f7fed46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -109,9 +109,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   private long totalSizeInMemory = 0;
   private long highWaterMark = Long.MAX_VALUE;
   private int targetRecordCount;
+  private boolean stop = false;
 
   public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
-    super(popConfig, context);
+    super(popConfig, context, true);
     this.incoming = incoming;
     DrillConfig config = context.getConfig();
     Configuration conf = new Configuration();
@@ -193,24 +194,24 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     incoming.cleanup();
   }
 
-  @Override
-  public IterOutcome buildSchema() throws SchemaChangeException {
-    stats.startProcessing();
-    try {
-      stats.stopProcessing();
-      try {
-        incoming.buildSchema();
-      } finally {
-        stats.startProcessing();
-      }
-      for (VectorWrapper w : incoming) {
-        container.addOrGet(w.getField());
-      }
-      container.buildSchema(SelectionVectorMode.NONE);
-      container.setRecordCount(0);
-      return IterOutcome.OK_NEW_SCHEMA;
-    } finally {
-      stats.stopProcessing();
+  public void buildSchema() throws SchemaChangeException {
+    IterOutcome outcome = next(incoming);
+    switch (outcome) {
+      case OK:
+      case OK_NEW_SCHEMA:
+        for (VectorWrapper w : incoming) {
+          ValueVector v = container.addOrGet(w.getField());
+          v.allocateNew();
+        }
+        container.buildSchema(SelectionVectorMode.NONE);
+        container.setRecordCount(0);
+        return;
+      case STOP:
+        stop = true;
+      case NONE:
+        state = BatchState.DONE;
+      default:
+        return;
     }
   }
 
@@ -228,7 +229,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       } else {
         Stopwatch w = new Stopwatch();
         w.start();
-//        int count = selector.next();
         int count = copier.next(targetRecordCount);
         if (count > 0) {
           long t = w.elapsed(TimeUnit.MICROSECONDS);
@@ -249,7 +249,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       outer: while (true) {
         Stopwatch watch = new Stopwatch();
         watch.start();
-        IterOutcome upstream = incoming.next();
+        IterOutcome upstream;
+        if (first) {
+          upstream = IterOutcome.OK_NEW_SCHEMA;
+        } else {
+          upstream = next(incoming);
+        }
         if (upstream == IterOutcome.OK && sorter == null) {
           upstream = IterOutcome.OK_NEW_SCHEMA;
         }
@@ -275,15 +280,15 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
           }
           // fall through.
         case OK:
-          if (!first && incoming.getRecordCount() == 0) {
+          if (first) {
+            first = false;
+          }
+          if (incoming.getRecordCount() == 0) {
             for (VectorWrapper w : incoming) {
               w.clear();
             }
             break;
           }
-          if (first) {
-            first = false;
-          }
           totalSizeInMemory += getBufferSize(incoming);
           SelectionVector2 sv2;
           if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) {
@@ -336,6 +341,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         }
       }
 
+      if (totalcount == 0) {
+        return IterOutcome.NONE;
+      }
       if (spillCount == 0) {
         Stopwatch watch = new Stopwatch();
         watch.start();

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index f77ae3d..2bb29e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.record;
 
 import java.util.Iterator;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
@@ -38,24 +39,36 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   protected final OperatorContext oContext;
   protected final OperatorStats stats;
 
+  protected BatchState state;
+
   protected AbstractRecordBatch(T popConfig, FragmentContext context) throws OutOfMemoryException {
-    super();
-    this.context = context;
-    this.popConfig = popConfig;
-    this.oContext = new OperatorContext(popConfig, context, true);
-    this.stats = oContext.getStats();
-    this.container = new VectorContainer(this.oContext);
+    this(popConfig, context, true, new OperatorContext(popConfig, context, true));
   }
 
-  protected AbstractRecordBatch(T popConfig, FragmentContext context, OperatorContext oContext) throws OutOfMemoryException {
+  protected AbstractRecordBatch(T popConfig, FragmentContext context, boolean buildSchema) throws OutOfMemoryException {
+    this(popConfig, context, buildSchema, new OperatorContext(popConfig, context, true));
+  }
+
+  protected AbstractRecordBatch(T popConfig, FragmentContext context, boolean buildSchema, OperatorContext oContext) throws OutOfMemoryException {
     super();
     this.context = context;
     this.popConfig = popConfig;
     this.oContext = oContext;
     this.stats = oContext.getStats();
     this.container = new VectorContainer(this.oContext);
+    if (buildSchema) {
+      state = BatchState.BUILD_SCHEMA;
+    } else {
+      state = BatchState.FIRST;
+    }
   }
 
+  protected static enum BatchState {
+    BUILD_SCHEMA, // Need to build schema and return
+    FIRST, // This is still the first data batch
+    NOT_FIRST, // The first data batch has alread been returned
+    DONE // All work is done, no more data to be sent
+  }
 
   @Override
   public Iterator<VectorWrapper<?>> iterator() {
@@ -103,7 +116,33 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   public final IterOutcome next() {
     try {
       stats.startProcessing();
-      return innerNext();
+//      if (state == BatchState.BUILD_SCHEMA) {
+//        buildSchema();
+//        if (state == BatchState.BUILD_SCHEMA.DONE) {
+//          return IterOutcome.NONE;
+//        } else {
+//          state = BatchState.FIRST;
+//          return IterOutcome.OK_NEW_SCHEMA;
+//        }
+//      }
+      switch (state) {
+        case BUILD_SCHEMA: {
+          buildSchema();
+          if (state == BatchState.DONE) {
+            return IterOutcome.NONE;
+          } else {
+            state = BatchState.FIRST;
+            return IterOutcome.OK_NEW_SCHEMA;
+          }
+        }
+        case DONE: {
+          return IterOutcome.NONE;
+        }
+        default:
+          return innerNext();
+      }
+    } catch (SchemaChangeException e) {
+      throw new DrillRuntimeException(e);
     } finally {
       stats.stopProcessing();
     }
@@ -116,9 +155,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
     return container.getSchema();
   }
 
-  @Override
-  public IterOutcome buildSchema() throws SchemaChangeException {
-    throw new UnsupportedOperationException("buildSchema() not yet implemented");
+  protected void buildSchema() throws SchemaChangeException {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 1ef0345..f895f47 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -21,19 +21,18 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
 
 public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> extends AbstractRecordBatch<T> {
   final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
 
   protected final RecordBatch incoming;
-  private boolean first = true;
-  protected boolean done = false;
   protected boolean outOfMemory = false;
   protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
 
   public AbstractSingleRecordBatch(T popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
-    super(popConfig, context);
+    super(popConfig, context, false);
     this.incoming = incoming;
   }
 
@@ -45,30 +44,35 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
   @Override
   public IterOutcome innerNext() {
     // Short circuit if record batch has already sent all data and is done
-    if (done) {
+    if (state == BatchState.DONE) {
       return IterOutcome.NONE;
     }
 
     IterOutcome upstream = next(incoming);
-    if (!first && upstream == IterOutcome.OK && incoming.getRecordCount() == 0) {
+    if (state != BatchState.FIRST && upstream == IterOutcome.OK && incoming.getRecordCount() == 0) {
       do {
         for (VectorWrapper w : incoming) {
           w.clear();
         }
       } while ((upstream = next(incoming)) == IterOutcome.OK && incoming.getRecordCount() == 0);
     }
-    if (first && upstream == IterOutcome.OK) {
+    if ((state == BatchState.FIRST) && upstream == IterOutcome.OK) {
       upstream = IterOutcome.OK_NEW_SCHEMA;
     }
     switch (upstream) {
     case NONE:
     case NOT_YET:
     case STOP:
+      if (state == BatchState.FIRST) {
+        container.buildSchema(SelectionVectorMode.NONE);
+      }
       return upstream;
     case OUT_OF_MEMORY:
       return upstream;
     case OK_NEW_SCHEMA:
-      first = false;
+      if (state == BatchState.FIRST) {
+        state = BatchState.NOT_FIRST;
+      }
       try {
         stats.startSetup();
         if (!setupNewSchema()) {
@@ -84,9 +88,15 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
       }
       // fall through.
     case OK:
-      assert !first : "First batch should be OK_NEW_SCHEMA";
+      assert state != BatchState.FIRST : "First batch should be OK_NEW_SCHEMA";
       container.zeroVectors();
-      doWork();
+      IterOutcome out = doWork();
+
+      // since doWork method does not know if there is a new schema, it will always return IterOutcome.OK if it was successful.
+      // But if upstream is IterOutcome.OK_NEW_SCHEMA, we should return that
+      if (out != IterOutcome.OK) {
+        upstream = out;
+      }
 
       if (outOfMemory) {
         outOfMemory = false;
@@ -105,13 +115,6 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
   }
 
   @Override
-  public IterOutcome buildSchema() throws SchemaChangeException {
-    incoming.buildSchema();
-    setupNewSchema();
-    return IterOutcome.OK_NEW_SCHEMA;
-  }
-
-  @Override
   public void cleanup() {
 //    logger.debug("Cleaning up.");
     super.cleanup();

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 318600f..0a8ece5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -68,14 +68,6 @@ public interface RecordBatch extends VectorAccessible {
    */
   public BatchSchema getSchema();
 
-
-  /**
-   * To be called out the beginning of fragment execution. This will build the schema to return downstream, and to the client
-   *
-   * @return OK_NEW_SCHEMA if succesful.
-   */
-  public IterOutcome buildSchema() throws SchemaChangeException;
-
   /**
    * Provide the number of records that are within this record count
    *

http://git-wip-us.apache.org/repos/asf/drill/blob/3581a327/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index d85abd5..5bc3da1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -96,22 +96,27 @@ public class WorkManager implements Closeable {
     // executor = Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS)
     executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-"));
     eventThread.start();
-    dContext.getMetrics().register(
-        MetricRegistry.name("drill.exec.work.running_fragments." + dContext.getEndpoint().getUserPort()),
-        new Gauge<Integer>() {
-          @Override
-          public Integer getValue() {
-            return runningFragments.size();
-          }
-        });
-    dContext.getMetrics().register(
-        MetricRegistry.name("drill.exec.work.pendingTasks" + dContext.getEndpoint().getUserPort()),
-        new Gauge<Integer>() {
-          @Override
-          public Integer getValue() {
-            return pendingTasks.size();
-          }
-        });
+    // TODO remove try block once metrics moved from singleton, For now catch to avoid unit test failures
+    try {
+      dContext.getMetrics().register(
+              MetricRegistry.name("drill.exec.work.running_fragments." + dContext.getEndpoint().getUserPort()),
+              new Gauge<Integer>() {
+                @Override
+                public Integer getValue() {
+                  return runningFragments.size();
+                }
+              });
+      dContext.getMetrics().register(
+              MetricRegistry.name("drill.exec.work.pendingTasks" + dContext.getEndpoint().getUserPort()),
+              new Gauge<Integer>() {
+                @Override
+                public Integer getValue() {
+                  return pendingTasks.size();
+                }
+              });
+    } catch (IllegalArgumentException e) {
+      logger.warn("Exception while registering metrics", e);
+    }
   }
 
   public WorkEventBus getWorkBus() {


Mime
View raw message