drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [3/3] drill git commit: DRILL-5546: Handle schema change exception failure caused by empty input or empty batch.
Date Tue, 05 Sep 2017 21:05:08 GMT
DRILL-5546: Handle schema change exception failure caused by empty input or empty batch.

1. Modify ScanBatch's logic when it iterates list of RecordReader.
   1) Skip RecordReader if it returns 0 row && present same schema. A new schema (by calling Mutator.isNewSchema() ) means either a new top level field is added, or a field in a nested field is added, or an existing field type is changed.
   2) Implicit columns are presumed to have constant schema, and are added to outgoing container before any regular column is added in.
   3) ScanBatch will return NONE directly (called as "fast NONE"), if all its RecordReaders haver empty input and thus are skipped, in stead of returing OK_NEW_SCHEMA first.

2. Modify IteratorValidatorBatchIterator to allow
   1) fast NONE ( before seeing a OK_NEW_SCHEMA)
   2) batch with empty list of columns.

2. Modify JsonRecordReader when it get 0 row. Do not insert a nullable-int column for 0 row input. Together with ScanBatch, Drill will skip empty json files.

3. Modify binary operators such as join, union to handle fast none for either one side or both sides. Abstract the logic in AbstractBinaryRecordBatch, except for MergeJoin as its implementation is quite different from others.

4. Fix and refactor union all operator.
  1) Correct union operator hanndling 0 input rows. Previously, it will ignore inputs with 0 row and put nullable-int into output schema, which causes various of schema change issue in down-stream operator. The new behavior is to take schema with 0 into account
  in determining the output schema, in the same way with > 0 input rows. By doing that, we ensure Union operator will not behave like a schema-lossy operator.
  2) Add a UnionInputIterator to simplify the logic to iterate the left/right inputs, removing significant chunk of duplicate codes in previous implementation.
  The new union all operator reduces the code size into half, comparing the old one.

5. Introduce UntypedNullVector to handle convertFromJson() function, when the input batch contains 0 row.
  Problem: The function convertFromJSon() is different from other regular functions in that it only knows the output schema after evaluation is performed. When input has 0 row, Drill essentially does not have
  a way to know the output type, and previously will assume Map type. That works under the assumption other operators like Union would ignore batch with 0 row, which is no longer
  the case in the current implementation.
  Solution: Use MinorType.NULL at the output type for convertFromJSON() when input contains 0 row. The new UntypedNullVector is used to represent a column with MinorType.NULL.

6. HBaseGroupScan convert star column into list of row_key and column family. HBaseRecordReader should reject column star since it expectes star has been converted somewhere else.
  In HBase a column family always has map type, and a non-rowkey column always has nullable varbinary type, this ensures that HBaseRecordReader across different HBase regions will have the same top level schema, even if the region is
  empty or prune all the rows due to filter pushdown optimization. In other words, we will not see different top level schema from different HBaseRecordReader for the same table.
  However, such change will not be able to handle hard schema change : c1 exists in cf1 in one region, but not in another region. Further work is required to handle hard schema change.

7. Modify scan cost estimation when the query involves * column. This is to remove the planning randomness since previously two different operators could have same cost.

8. Add a new flag 'outputProj' to Project operator, to indicate if Project is for the query's final output. Such Project is added by TopProjectVisitor, to handle fast NONE when all the inputs to the query are empty
and are skipped.
  1) column star is replaced with empty list
  2) regular column reference is replaced with nullable-int column
  3) An expression will go through ExpressionTreeMaterializer, and use the type of materialized expression as the output type
  4) Return an OK_NEW_SCHEMA with the schema using the above logic, then return a NONE to down-stream operator.

9. Add unit test to test operators handling empty input.

10. Add unit test to test query when inputs are all empty.

DRILL-5546: Revise code based on review comments.

Handle implicit column in scan batch. Change interface in ScanBatch's constructor.
 1) Ensure either the implicit column list is empty, or all the reader has the same set of implicit columns.
 2) We could skip the implicit columns when check if there is a schema change coming from record reader.
 3) ScanBatch accept a list in stead of iterator, since we may need go through the implicit column list multiple times, and verify the size of two lists are same.

ScanBatch code review comments. Add more unit tests.

Share code path in ProjectBatch to handle normal setupNewSchema() and handleNullInput().
 - Move SimpleRecordBatch out of TopNBatch to make it sharable across different places.
 - Add Unit test verify schema for star column query against multilevel tables.

Unit test framework change
 - Fix memory leak in unit test framework.
 - Allow SchemaTestBuilder to pass in BatchSchema.

close #906


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/fde0a1df
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/fde0a1df
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/fde0a1df

Branch: refs/heads/master
Commit: fde0a1df1734e0742b49aabdd28b02202ee2b044
Parents: e1649dd
Author: Jinfeng Ni <jni@apache.org>
Authored: Wed May 17 16:08:00 2017 -0700
Committer: Jinfeng Ni <jni@apache.org>
Committed: Tue Sep 5 12:07:23 2017 -0700

----------------------------------------------------------------------
 .../org/apache/drill/common/types/Types.java    |   9 +
 .../store/mapr/db/MapRDBScanBatchCreator.java   |   2 +-
 .../mapr/db/json/MaprDBJsonRecordReader.java    |   5 +-
 .../drill/exec/store/hbase/HBaseGroupScan.java  |  93 ++-
 .../exec/store/hbase/HBaseRecordReader.java     |   5 +-
 .../exec/store/hbase/HBaseScanBatchCreator.java |   2 +-
 .../hive/HiveDrillNativeScanBatchCreator.java   |   6 +-
 .../apache/drill/exec/store/hive/HiveScan.java  |   4 +-
 .../exec/store/hive/HiveScanBatchCreator.java   |   2 +-
 .../drill/exec/store/jdbc/JdbcBatchCreator.java |   2 +-
 .../exec/store/kudu/KuduScanBatchCreator.java   |   2 +-
 .../exec/store/mongo/MongoRecordReader.java     |   3 +-
 .../exec/store/mongo/MongoScanBatchCreator.java |   2 +-
 .../src/main/codegen/templates/TypeHelper.java  |   2 +
 .../drill/exec/physical/config/Project.java     |  23 +-
 .../drill/exec/physical/impl/ScanBatch.java     | 407 +++++-----
 .../exec/physical/impl/TopN/TopNBatch.java      |  73 +-
 .../exec/physical/impl/join/HashJoinBatch.java  |  33 +-
 .../exec/physical/impl/join/MergeJoinBatch.java |   6 +
 .../physical/impl/join/NestedLoopJoinBatch.java |  31 +-
 .../impl/project/ProjectRecordBatch.java        | 117 ++-
 .../impl/union/UnionAllRecordBatch.java         | 737 ++++++-------------
 .../IteratorValidatorBatchIterator.java         |  18 +-
 .../impl/values/ValuesBatchCreator.java         |   2 +-
 .../exec/planner/logical/DrillScanRel.java      |  10 +-
 .../exec/planner/physical/ProjectPrel.java      |  31 +-
 .../physical/visitor/TopProjectVisitor.java     |  20 +-
 .../exec/record/AbstractBinaryRecordBatch.java  |  75 ++
 .../exec/record/AbstractSingleRecordBatch.java  |  26 +
 .../drill/exec/record/SimpleRecordBatch.java    |  98 +++
 .../drill/exec/store/AbstractRecordReader.java  |  20 +-
 .../apache/drill/exec/store/ColumnExplorer.java |   3 +-
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |   2 +-
 .../exec/store/direct/DirectBatchCreator.java   |   2 +-
 .../exec/store/easy/json/JSONRecordReader.java  |   6 +-
 .../store/ischema/InfoSchemaBatchCreator.java   |   2 +-
 .../exec/store/mock/MockScanBatchCreator.java   |   2 +-
 .../drill/exec/store/parquet/Metadata.java      |   4 +-
 .../store/parquet/ParquetReaderUtility.java     |   4 +-
 .../store/parquet/ParquetScanBatchCreator.java  |   2 +-
 .../parquet/columnreaders/ParquetSchema.java    |   3 +-
 .../exec/store/sys/SystemTableBatchCreator.java |   2 +-
 .../org/apache/drill/exec/util/Utilities.java   |  23 +
 .../org/apache/drill/exec/util/VectorUtil.java  |  15 +-
 .../java/org/apache/drill/DrillTestWrapper.java |  24 +-
 .../test/java/org/apache/drill/TestBuilder.java |  10 +
 .../org/apache/drill/TestExampleQueries.java    |   3 +-
 .../java/org/apache/drill/TestUnionAll.java     |   9 +-
 .../org/apache/drill/TestUnionDistinct.java     |   9 +-
 .../apache/drill/exec/TestEmptyInputSql.java    | 203 +++++
 .../partitionsender/TestPartitionSender.java    |   2 +-
 .../physical/impl/union/TestSimpleUnion.java    |   2 +-
 .../physical/unit/MiniPlanUnitTestBase.java     |  75 +-
 .../physical/unit/PhysicalOpUnitTestBase.java   |  10 +-
 .../drill/exec/physical/unit/TestMiniPlan.java  |  69 +-
 .../physical/unit/TestNullInputMiniPlan.java    | 572 ++++++++++++++
 .../exec/store/TestImplicitFileColumns.java     |  69 ++
 .../scan/emptyInput/emptyCsv/empty.csv          |   0
 .../scan/emptyInput/emptyCsvH/empty.csvh        |   0
 .../scan/emptyInput/emptyJson/empty.json        |   0
 .../scan/emptyInput/emptyJson/empty2.json       |   0
 .../src/test/resources/scan/jsonTbl/1990/1.json |   2 +
 .../src/test/resources/scan/jsonTbl/1991/2.json |   1 +
 .../main/codegen/templates/BasicTypeHelper.java |  21 +-
 .../main/codegen/templates/ValueHolders.java    |   8 +-
 .../drill/exec/record/MaterializedField.java    |  10 +
 .../drill/exec/vector/UntypedNullHolder.java    |  46 ++
 .../drill/exec/vector/UntypedNullVector.java    | 270 +++++++
 68 files changed, 2278 insertions(+), 1073 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/common/src/main/java/org/apache/drill/common/types/Types.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/types/Types.java b/common/src/main/java/org/apache/drill/common/types/Types.java
index 692d8f5..7c7026b 100644
--- a/common/src/main/java/org/apache/drill/common/types/Types.java
+++ b/common/src/main/java/org/apache/drill/common/types/Types.java
@@ -480,6 +480,10 @@ public class Types {
     return type.getMinorType() == MinorType.LATE;
   }
 
+  public static boolean isUntypedNull(final MajorType type) {
+    return type.getMinorType() == MinorType.NULL;
+  }
+
   public static MajorType withMode(final MinorType type, final DataMode mode) {
     return MajorType.newBuilder().setMode(mode).setMinorType(type).build();
   }
@@ -719,4 +723,9 @@ public class Types {
     }
     return typeBuilder;
   }
+
+  public static boolean isLaterType(MajorType type) {
+    return type.getMinorType() == MinorType.LATE;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
index d4a3f06..e770c96 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
@@ -55,7 +55,7 @@ public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan>{
         throw new ExecutionSetupException(e);
       }
     }
-    return new ScanBatch(subScan, context, readers.iterator());
+    return new ScanBatch(subScan, context, readers);
   }
 
   private HBaseSubScanSpec getHBaseSubScanSpec(MapRDBSubScanSpec scanSpec) {

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
index 5921249..ca31767 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig;
 import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.complex.impl.MapOrListWriterImpl;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
@@ -124,13 +125,13 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
   protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns) {
     Set<SchemaPath> transformed = Sets.newLinkedHashSet();
     if (disablePushdown) {
-      transformed.add(AbstractRecordReader.STAR_COLUMN);
+      transformed.add(Utilities.STAR_COLUMN);
       includeId = true;
       return transformed;
     }
 
     if (isStarQuery()) {
-      transformed.add(AbstractRecordReader.STAR_COLUMN);
+      transformed.add(Utilities.STAR_COLUMN);
       includeId = true;
       if (isSkipQuery()) {
     	// `SELECT COUNT(*)` query

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index e474c11..1ee1da8 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -17,22 +17,17 @@
  */
 package org.apache.drill.exec.store.hbase;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -43,9 +38,9 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -55,18 +50,24 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
 
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
 
 @JsonTypeName("hbase-scan")
 public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConstants {
@@ -144,7 +145,7 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
   public GroupScan clone(List<SchemaPath> columns) {
     HBaseGroupScan newScan = new HBaseGroupScan(this);
     newScan.columns = columns == null ? ALL_COLUMNS : columns;;
-    newScan.verifyColumns();
+    newScan.verifyColumnsAndConvertStar();
     return newScan;
   }
 
@@ -176,19 +177,37 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
     } catch (IOException e) {
       throw new DrillRuntimeException("Error getting region info for table: " + hbaseScanSpec.getTableName(), e);
     }
-    verifyColumns();
+    verifyColumnsAndConvertStar();
   }
 
-  private void verifyColumns() {
-    if (AbstractRecordReader.isStarQuery(columns)) {
-      return;
-    }
+  private void verifyColumnsAndConvertStar() {
+    boolean hasStarCol = false;
+    LinkedHashSet<SchemaPath> requestedColumns = new LinkedHashSet<>();
+
     for (SchemaPath column : columns) {
-      if (!(column.equals(ROW_KEY_PATH) || hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) {
-        DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .",
-            column.getRootSegment().getPath(), hTableDesc.getNameAsString());
+      // convert * into [row_key, cf1, cf2, ..., cf_n].
+      if (column.equals(Utilities.STAR_COLUMN)) {
+        hasStarCol = true;
+        Set<byte[]> families = hTableDesc.getFamiliesKeys();
+        requestedColumns.add(ROW_KEY_PATH);
+        for (byte[] family : families) {
+          SchemaPath colFamily = SchemaPath.getSimplePath(Bytes.toString(family));
+          requestedColumns.add(colFamily);
+        }
+      } else {
+        if (!(column.equals(ROW_KEY_PATH) ||
+            hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) {
+          DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .",
+              column.getRootSegment().getPath(), hTableDesc.getNameAsString());
+        }
+        requestedColumns.add(column);
       }
     }
+
+    // since star column has been converted, reset this.cloumns.
+    if (hasStarCol) {
+      this.columns = new ArrayList<>(requestedColumns);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 3f308ce..d6c02b5 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -126,8 +126,9 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
             HBaseUtils.andFilterAtIndex(hbaseScan.getFilter(), HBaseUtils.LAST_FILTER, new FirstKeyOnlyFilter()));
       }
     } else {
-      rowKeyOnly = false;
-      transformed.add(ROW_KEY_PATH);
+      throw new IllegalArgumentException("HBaseRecordReader does not allow column *. Column * should have been converted to list of <row_key, column family1, column family2, ..., column family_n");
+//      rowKeyOnly = false;
+//      transformed.add(ROW_KEY_PATH);
     }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
index 3a098fc..8e815b9 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
@@ -50,7 +50,7 @@ public class HBaseScanBatchCreator implements BatchCreator<HBaseSubScan>{
         throw new ExecutionSetupException(e1);
       }
     }
-    return new ScanBatch(subScan, context, readers.iterator());
+    return new ScanBatch(subScan, context, readers);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
index 6c10d25..0e5314b 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
@@ -33,12 +33,12 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -67,7 +67,7 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa
     final String partitionDesignator = context.getOptions()
         .getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
     List<Map<String, String>> implicitColumns = Lists.newLinkedList();
-    boolean selectAllQuery = AbstractRecordReader.isStarQuery(columns);
+    boolean selectAllQuery = Utilities.isStarQuery(columns);
 
     final boolean hasPartitions = (partitions != null && partitions.size() > 0);
 
@@ -173,7 +173,7 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa
         ImpersonationUtil.createProxyUgi(config.getUserName(), context.getQueryUserName())));
     }
 
-    return new ScanBatch(config, context, oContext, readers.iterator(), implicitColumns);
+    return new ScanBatch(config, context, oContext, readers, implicitColumns);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index c6cc8a2..42fb3e2 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -36,11 +36,11 @@ import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.hive.HiveMetadataProvider.HiveStats;
 import org.apache.drill.exec.store.hive.HiveMetadataProvider.InputSplitWrapper;
 import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -241,7 +241,7 @@ public class HiveScan extends AbstractGroupScan {
 
   protected int getSerDeOverheadFactor() {
     final int projectedColumnCount;
-    if (AbstractRecordReader.isStarQuery(columns)) {
+    if (Utilities.isStarQuery(columns)) {
       Table hiveTable = hiveReadEntry.getTable();
       projectedColumnCount = hiveTable.getSd().getColsSize() + hiveTable.getPartitionKeysSize();
     } else {

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
index 47ea323..e287f68 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
@@ -92,6 +92,6 @@ public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
     } catch(Exception e) {
       logger.error("No constructor for {}, thrown {}", readerClass.getName(), e);
     }
-    return new ScanBatch(config, context, readers.iterator());
+    return new ScanBatch(config, context, readers);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
index fa44b55..1782e1a 100755
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcBatchCreator.java
@@ -37,6 +37,6 @@ public class JdbcBatchCreator implements BatchCreator<JdbcSubScan> {
     Preconditions.checkArgument(children.isEmpty());
     JdbcStoragePlugin plugin = config.getPlugin();
     RecordReader reader = new JdbcRecordReader(context, plugin.getSource(), config.getSql(), plugin.getName());
-    return new ScanBatch(config, context, Collections.singletonList(reader).iterator());
+    return new ScanBatch(config, context, Collections.singletonList(reader));
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
index b3c2c4e..fc1db5d 100644
--- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
+++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduScanBatchCreator.java
@@ -51,7 +51,7 @@ public class KuduScanBatchCreator implements BatchCreator<KuduSubScan>{
         throw new ExecutionSetupException(e1);
       }
     }
-    return new ScanBatch(subScan, context, readers.iterator());
+    return new ScanBatch(subScan, context, readers);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index c9ce5bb..77def0a 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.bson.BsonRecordReader;
+import org.apache.drill.exec.util.Utilities;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.complex.fn.JsonReader;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
@@ -109,7 +110,7 @@ public class MongoRecordReader extends AbstractRecordReader {
     } else {
       // Tale all the fields including the _id
       this.fields.remove(DrillMongoConstants.ID);
-      transformed.add(AbstractRecordReader.STAR_COLUMN);
+      transformed.add(Utilities.STAR_COLUMN);
     }
     return transformed;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
index 49b1750..9935184 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoScanBatchCreator.java
@@ -60,7 +60,7 @@ public class MongoScanBatchCreator implements BatchCreator<MongoSubScan> {
       }
     }
     logger.info("Number of record readers initialized : " + readers.size());
-    return new ScanBatch(subScan, context, readers.iterator());
+    return new ScanBatch(subScan, context, readers);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/codegen/templates/TypeHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/TypeHelper.java b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
index 8390e30..5ccda85 100644
--- a/exec/java-exec/src/main/codegen/templates/TypeHelper.java
+++ b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
@@ -89,6 +89,8 @@ public class TypeHelper extends BasicTypeHelper {
 </#list>
       case GENERIC_OBJECT:
         return model._ref(ObjectHolder.class);
+    case NULL:
+      return model._ref(UntypedNullHolder.class);
       default:
         break;
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
index 7b58ecd..b0188ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
@@ -36,8 +36,19 @@ public class Project extends AbstractSingle{
 
   private final List<NamedExpression> exprs;
 
+  /**
+   * {@link org.apache.drill.exec.planner.physical.ProjectPrel for the meaning of flag 'outputProj'}
+   */
+  private boolean outputProj = false;
+
   @JsonCreator
-  public Project(@JsonProperty("exprs") List<NamedExpression> exprs, @JsonProperty("child") PhysicalOperator child) {
+  public Project(@JsonProperty("exprs") List<NamedExpression> exprs, @JsonProperty("child") PhysicalOperator child, @JsonProperty("outputProj") boolean outputProj) {
+    super(child);
+    this.exprs = exprs;
+    this.outputProj = outputProj;
+  }
+
+  public Project(List<NamedExpression> exprs, PhysicalOperator child) {
     super(child);
     this.exprs = exprs;
   }
@@ -46,6 +57,14 @@ public class Project extends AbstractSingle{
     return exprs;
   }
 
+  /**
+   * @Return true if Project is for the query's final output. Such Project is added by TopProjectVisitor,
+   * to handle fast NONE when all the inputs to the query are empty and are skipped.
+   */
+  public boolean isOutputProj() {
+    return outputProj;
+  }
+
   @Override
   public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
     return physicalVisitor.visitProject(this, value);
@@ -53,7 +72,7 @@ public class Project extends AbstractSingle{
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new Project(exprs, child);
+    return new Project(exprs, child, outputProj);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 803bd48..64be129 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -17,16 +17,13 @@
  */
 package org.apache.drill.exec.physical.impl;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import io.netty.buffer.DrillBuf;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -55,10 +52,11 @@ import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.common.map.CaseInsensitiveMap;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Record batch used for a particular scan. Operators against one or more
@@ -78,50 +76,56 @@ public class ScanBatch implements CloseableRecordBatch {
   private BatchSchema schema;
   private final Mutator mutator;
   private boolean done = false;
-  private boolean hasReadNonEmptyFile = false;
-  private Map<String, ValueVector> implicitVectors;
   private Iterator<Map<String, String>> implicitColumns;
   private Map<String, String> implicitValues;
   private final BufferAllocator allocator;
-
+  private final List<Map<String, String>> implicitColumnList;
+  private String currentReaderClassName;
+  /**
+   *
+   * @param subScanConfig
+   * @param context
+   * @param oContext
+   * @param readerList
+   * @param implicitColumnList : either an emptylist when all the readers do not have implicit
+   *                        columns, or there is a one-to-one mapping between reader and implicitColumns.
+   */
   public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
-                   OperatorContext oContext, Iterator<RecordReader> readers,
-                   List<Map<String, String>> implicitColumns) {
+                   OperatorContext oContext, List<RecordReader> readerList,
+                   List<Map<String, String>> implicitColumnList) {
     this.context = context;
-    this.readers = readers;
+    this.readers = readerList.iterator();
+    this.implicitColumns = implicitColumnList.iterator();
     if (!readers.hasNext()) {
       throw UserException.systemError(
           new ExecutionSetupException("A scan batch must contain at least one reader."))
         .build(logger);
     }
-    currentReader = readers.next();
+
     this.oContext = oContext;
     allocator = oContext.getAllocator();
     mutator = new Mutator(oContext, allocator, container);
 
+    oContext.getStats().startProcessing();
     try {
-      oContext.getStats().startProcessing();
-      currentReader.setup(oContext, mutator);
-    } catch (ExecutionSetupException e) {
-      try {
-        currentReader.close();
-      } catch(final Exception e2) {
-        logger.error("Close failed for reader " + currentReader.getClass().getSimpleName(), e2);
-      }
-      throw UserException.systemError(e)
-            .addContext("Setup failed for", currentReader.getClass().getSimpleName())
+      if (!verifyImplcitColumns(readerList.size(), implicitColumnList)) {
+        Exception ex = new ExecutionSetupException("Either implicit column list does not have same cardinality as reader list, "
+            + "or implicit columns are not same across all the record readers!");
+        throw UserException.systemError(ex)
+            .addContext("Setup failed for", readerList.get(0).getClass().getSimpleName())
             .build(logger);
+      }
+
+      this.implicitColumnList = implicitColumnList;
+      addImplicitVectors();
+      currentReader = null;
     } finally {
       oContext.getStats().stopProcessing();
     }
-    this.implicitColumns = implicitColumns.iterator();
-    this.implicitValues = this.implicitColumns.hasNext() ? this.implicitColumns.next() : null;
-
-    addImplicitVectors();
   }
 
   public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
-                   Iterator<RecordReader> readers)
+                   List<RecordReader> readers)
       throws ExecutionSetupException {
     this(subScanConfig, context,
         context.newOperatorContext(subScanConfig),
@@ -152,16 +156,6 @@ public class ScanBatch implements CloseableRecordBatch {
     }
   }
 
-  private void releaseAssets() {
-    container.zeroVectors();
-  }
-
-  private void clearFieldVectorMap() {
-    for (final ValueVector v : mutator.fieldVectorMap().values()) {
-      v.clear();
-    }
-  }
-
   @Override
   public IterOutcome next() {
     if (done) {
@@ -169,82 +163,57 @@ public class ScanBatch implements CloseableRecordBatch {
     }
     oContext.getStats().startProcessing();
     try {
-      try {
+      while (true) {
+        if (currentReader == null && !getNextReaderIfHas()) {
+            releaseAssets(); // All data has been read. Release resource.
+            done = true;
+            return IterOutcome.NONE;
+        }
         injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class);
-
         currentReader.allocate(mutator.fieldVectorMap());
-      } catch (OutOfMemoryException e) {
-        clearFieldVectorMap();
-        throw UserException.memoryError(e).build(logger);
-      }
-      while ((recordCount = currentReader.next()) == 0) {
-        try {
-          if (!readers.hasNext()) {
-            // We're on the last reader, and it has no (more) rows.
-            currentReader.close();
-            releaseAssets();
-            done = true;  // have any future call to next() return NONE
 
-            if (mutator.isNewSchema()) {
-              // This last reader has a new schema (e.g., we have a zero-row
-              // file or other source).  (Note that some sources have a non-
-              // null/non-trivial schema even when there are no rows.)
-
-              container.buildSchema(SelectionVectorMode.NONE);
-              schema = container.getSchema();
-
-              return IterOutcome.OK_NEW_SCHEMA;
-            }
-            return IterOutcome.NONE;
-          }
-          // At this point, the reader that hit its end is not the last reader.
-
-          // If all the files we have read so far are just empty, the schema is not useful
-          if (! hasReadNonEmptyFile) {
-            container.clear();
-            clearFieldVectorMap();
-            mutator.clear();
-          }
+        recordCount = currentReader.next();
+        Preconditions.checkArgument(recordCount >= 0, "recordCount from RecordReader.next() should not be negative");
+        boolean isNewSchema = mutator.isNewSchema();
+        populateImplicitVectorsAndSetCount();
+        oContext.getStats().batchReceived(0, recordCount, isNewSchema);
 
+        if (recordCount == 0) {
           currentReader.close();
-          currentReader = readers.next();
-          implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null;
-          currentReader.setup(oContext, mutator);
-          try {
-            currentReader.allocate(mutator.fieldVectorMap());
-          } catch (OutOfMemoryException e) {
-            clearFieldVectorMap();
-            throw UserException.memoryError(e).build(logger);
-          }
-          addImplicitVectors();
-        } catch (ExecutionSetupException e) {
-          releaseAssets();
-          throw UserException.systemError(e).build(logger);
+          currentReader = null; // indicate currentReader is complete,
+                                // and fetch next reader in next loop iterator if required.
         }
-      }
-
-      // At this point, the current reader has read 1 or more rows.
-
-      hasReadNonEmptyFile = true;
-      populateImplicitVectors();
-
-      for (VectorWrapper<?> w : container) {
-        w.getValueVector().getMutator().setValueCount(recordCount);
-      }
 
-      // this is a slight misuse of this metric but it will allow Readers to report how many records they generated.
-      final boolean isNewSchema = mutator.isNewSchema();
-      oContext.getStats().batchReceived(0, getRecordCount(), isNewSchema);
+        if (isNewSchema) {
+          // Even when recordCount = 0, we should return return OK_NEW_SCHEMA if current reader presents a new schema.
+          // This could happen when data sources have a non-trivial schema with 0 row.
+          container.buildSchema(SelectionVectorMode.NONE);
+          schema = container.getSchema();
+          return IterOutcome.OK_NEW_SCHEMA;
+        }
 
-      if (isNewSchema) {
-        container.buildSchema(SelectionVectorMode.NONE);
-        schema = container.getSchema();
-        return IterOutcome.OK_NEW_SCHEMA;
-      } else {
-        return IterOutcome.OK;
+        // Handle case of same schema.
+        if (recordCount == 0) {
+            continue; // Skip to next loop iteration if reader returns 0 row and has same schema.
+        } else {
+          // return OK if recordCount > 0 && ! isNewSchema
+          return IterOutcome.OK;
+        }
       }
     } catch (OutOfMemoryException ex) {
+      clearFieldVectorMap();
       throw UserException.memoryError(ex).build(logger);
+    } catch (ExecutionSetupException e) {
+      if (currentReader != null) {
+        try {
+          currentReader.close();
+        } catch (final Exception e2) {
+          logger.error("Close failed for reader " + currentReaderClassName, e2);
+        }
+      }
+      throw UserException.systemError(e)
+          .addContext("Setup failed for", currentReaderClassName)
+          .build(logger);
     } catch (Exception ex) {
       throw UserException.systemError(ex).build(logger);
     } finally {
@@ -252,21 +221,38 @@ public class ScanBatch implements CloseableRecordBatch {
     }
   }
 
+  private void releaseAssets() {
+    container.zeroVectors();
+  }
+
+  private void clearFieldVectorMap() {
+    for (final ValueVector v : mutator.fieldVectorMap().values()) {
+      v.clear();
+    }
+    for (final ValueVector v : mutator.implicitFieldVectorMap.values()) {
+      v.clear();
+    }
+  }
+
+  private boolean getNextReaderIfHas() throws ExecutionSetupException {
+    if (readers.hasNext()) {
+      currentReader = readers.next();
+      implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null;
+      currentReader.setup(oContext, mutator);
+      currentReaderClassName = currentReader.getClass().getSimpleName();
+      return true;
+    } else {
+      return false;
+    }
+  }
+
   private void addImplicitVectors() {
     try {
-      if (implicitVectors != null) {
-        for (ValueVector v : implicitVectors.values()) {
-          v.clear();
-        }
-      }
-      implicitVectors = Maps.newHashMap();
-
-      if (implicitValues != null) {
-        for (String column : implicitValues.keySet()) {
+      if (!implicitColumnList.isEmpty()) {
+        for (String column : implicitColumnList.get(0).keySet()) {
           final MaterializedField field = MaterializedField.create(column, Types.optional(MinorType.VARCHAR));
           @SuppressWarnings("resource")
-          final ValueVector v = mutator.addField(field, NullableVarCharVector.class);
-          implicitVectors.put(column, v);
+          final ValueVector v = mutator.addField(field, NullableVarCharVector.class, true /*implicit field*/);
         }
       }
     } catch(SchemaChangeException e) {
@@ -277,24 +263,11 @@ public class ScanBatch implements CloseableRecordBatch {
     }
   }
 
-  private void populateImplicitVectors() {
-    if (implicitValues != null) {
-      for (Map.Entry<String, String> entry : implicitValues.entrySet()) {
-        @SuppressWarnings("resource")
-        final NullableVarCharVector v = (NullableVarCharVector) implicitVectors.get(entry.getKey());
-        String val;
-        if ((val = entry.getValue()) != null) {
-          AllocationHelper.allocate(v, recordCount, val.length());
-          final byte[] bytes = val.getBytes();
-          for (int j = 0; j < recordCount; j++) {
-            v.getMutator().setSafe(j, bytes, 0, bytes.length);
-          }
-          v.getMutator().setValueCount(recordCount);
-        } else {
-          AllocationHelper.allocate(v, recordCount, 0);
-          v.getMutator().setValueCount(recordCount);
-        }
-      }
+  private void populateImplicitVectorsAndSetCount() {
+    mutator.populateImplicitVectors(implicitValues, recordCount);
+    for (Map.Entry<String, ValueVector> entry: mutator.fieldVectorMap().entrySet()) {
+      logger.debug("set record count {} for vv {}", recordCount, entry.getKey());
+      entry.getValue().getMutator().setValueCount(recordCount);
     }
   }
 
@@ -329,14 +302,20 @@ public class ScanBatch implements CloseableRecordBatch {
 
   @VisibleForTesting
   public static class Mutator implements OutputMutator {
-    /** Whether schema has changed since last inquiry (via #isNewSchema}).  Is
-     *  true before first inquiry. */
-    private boolean schemaChanged = true;
-
-    /** Fields' value vectors indexed by fields' keys. */
-    private final CaseInsensitiveMap<ValueVector> fieldVectorMap =
+    /** Flag keeping track whether top-level schema has changed since last inquiry (via #isNewSchema}).
+     * It's initialized to false, or reset to false after #isNewSchema or after #clear, until a new value vector
+     * or a value vector with different type is added to fieldVectorMap.
+     **/
+    private boolean schemaChanged;
+
+    /** Regular fields' value vectors indexed by fields' keys. */
+    private final CaseInsensitiveMap<ValueVector> regularFieldVectorMap =
             CaseInsensitiveMap.newHashMap();
 
+    /** Implicit fields' value vectors index by fields' keys. */
+    private final CaseInsensitiveMap<ValueVector> implicitFieldVectorMap =
+        CaseInsensitiveMap.newHashMap();
+
     private final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
     private final BufferAllocator allocator;
 
@@ -348,46 +327,27 @@ public class ScanBatch implements CloseableRecordBatch {
       this.oContext = oContext;
       this.allocator = allocator;
       this.container = container;
+      this.schemaChanged = false;
     }
 
     public Map<String, ValueVector> fieldVectorMap() {
-      return fieldVectorMap;
+      return regularFieldVectorMap;
+    }
+
+    public Map<String, ValueVector> implicitFieldVectorMap() {
+      return implicitFieldVectorMap;
     }
 
     @SuppressWarnings("resource")
     @Override
     public <T extends ValueVector> T addField(MaterializedField field,
                                               Class<T> clazz) throws SchemaChangeException {
-      // Check if the field exists.
-      ValueVector v = fieldVectorMap.get(field.getName());
-      if (v == null || v.getClass() != clazz) {
-        // Field does not exist--add it to the map and the output container.
-        v = TypeHelper.getNewVector(field, allocator, callBack);
-        if (!clazz.isAssignableFrom(v.getClass())) {
-          throw new SchemaChangeException(
-            String.format(
-              "The class that was provided, %s, does not correspond to the "
-                + "expected vector type of %s.",
-              clazz.getSimpleName(), v.getClass().getSimpleName()));
-        }
-
-        final ValueVector old = fieldVectorMap.put(field.getName(), v);
-        if (old != null) {
-          old.clear();
-          container.remove(old);
-        }
-
-        container.add(v);
-        // Added new vectors to the container--mark that the schema has changed.
-        schemaChanged = true;
-      }
-
-      return clazz.cast(v);
+      return addField(field, clazz, false);
     }
 
     @Override
     public void allocate(int recordCount) {
-      for (final ValueVector v : fieldVectorMap.values()) {
+      for (final ValueVector v : regularFieldVectorMap.values()) {
         AllocationHelper.allocate(v, recordCount, 50, 10);
       }
     }
@@ -423,10 +383,82 @@ public class ScanBatch implements CloseableRecordBatch {
     }
 
     public void clear() {
-      fieldVectorMap.clear();
+      regularFieldVectorMap.clear();
+      implicitFieldVectorMap.clear();
+      schemaChanged = false;
+    }
+
+    private <T extends ValueVector> T addField(MaterializedField field,
+        Class<T> clazz, boolean isImplicitField) throws SchemaChangeException {
+      Map<String, ValueVector> fieldVectorMap;
+
+      if (isImplicitField) {
+        fieldVectorMap = implicitFieldVectorMap;
+      } else {
+        fieldVectorMap = regularFieldVectorMap;
+      }
+
+      if (!isImplicitField && implicitFieldVectorMap.containsKey(field.getName()) ||
+          isImplicitField && regularFieldVectorMap.containsKey(field.getName())) {
+        throw new SchemaChangeException(
+            String.format(
+                "It's not allowed to have regular field and implicit field share common name %s. "
+                    + "Either change regular field name in datasource, or change the default implicit field names.",
+                field.getName()));
+      }
+
+      // Check if the field exists.
+      ValueVector v = fieldVectorMap.get(field.getName());
+      if (v == null || v.getClass() != clazz) {
+        // Field does not exist--add it to the map and the output container.
+        v = TypeHelper.getNewVector(field, allocator, callBack);
+        if (!clazz.isAssignableFrom(v.getClass())) {
+          throw new SchemaChangeException(
+              String.format(
+                  "The class that was provided, %s, does not correspond to the "
+                      + "expected vector type of %s.",
+                  clazz.getSimpleName(), v.getClass().getSimpleName()));
+        }
+
+        final ValueVector old = fieldVectorMap.put(field.getName(), v);
+        if (old != null) {
+          old.clear();
+          container.remove(old);
+        }
+
+        container.add(v);
+        // Only mark schema change for regular vectors added to the container; implicit schema is constant.
+        if (!isImplicitField) {
+          schemaChanged = true;
+        }
+      }
+
+      return clazz.cast(v);
+    }
+
+    private void populateImplicitVectors(Map<String, String> implicitValues, int recordCount) {
+      if (implicitValues != null) {
+        for (Map.Entry<String, String> entry : implicitValues.entrySet()) {
+          @SuppressWarnings("resource")
+          final NullableVarCharVector v = (NullableVarCharVector) implicitFieldVectorMap.get(entry.getKey());
+          String val;
+          if ((val = entry.getValue()) != null) {
+            AllocationHelper.allocate(v, recordCount, val.length());
+            final byte[] bytes = val.getBytes();
+            for (int j = 0; j < recordCount; j++) {
+              v.getMutator().setSafe(j, bytes, 0, bytes.length);
+            }
+            v.getMutator().setValueCount(recordCount);
+          } else {
+            AllocationHelper.allocate(v, recordCount, 0);
+            v.getMutator().setValueCount(recordCount);
+          }
+        }
+      }
     }
   }
 
+
   @Override
   public Iterator<VectorWrapper<?>> iterator() {
     return container.iterator();
@@ -440,11 +472,10 @@ public class ScanBatch implements CloseableRecordBatch {
   @Override
   public void close() throws Exception {
     container.clear();
-    for (final ValueVector v : implicitVectors.values()) {
-      v.clear();
-    }
     mutator.clear();
-    currentReader.close();
+    if (currentReader != null) {
+      currentReader.close();
+    }
   }
 
   @Override
@@ -453,4 +484,34 @@ public class ScanBatch implements CloseableRecordBatch {
         String.format("You should not call getOutgoingContainer() for class %s",
                       this.getClass().getCanonicalName()));
   }
+
+  /**
+   * Verify list of implicit column values is valid input:
+   *   - Either implicit column list is empty;
+   *   - Or implicit column list has same sie as reader list, and the key set is same across all the readers.
+   * @param numReaders
+   * @param implicitColumnList
+   * @return return true if
+   */
+  private boolean verifyImplcitColumns(int numReaders, List<Map<String, String>> implicitColumnList) {
+    if (implicitColumnList.isEmpty()) {
+      return true;
+    }
+
+    if (numReaders != implicitColumnList.size()) {
+      return false;
+    }
+
+    Map<String, String> firstMap = implicitColumnList.get(0);
+
+    for (int i = 1; i< implicitColumnList.size(); i++) {
+      Map<String, String> nonFirstMap = implicitColumnList.get(i);
+
+      if (!firstMap.keySet().equals(nonFirstMap.keySet())) {
+        return false;
+      }
+    }
+
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index d2497f1..e77c186 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -52,6 +52,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaUtil;
+import org.apache.drill.exec.record.SimpleRecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
@@ -66,6 +67,8 @@ import com.google.common.base.Stopwatch;
 import com.sun.codemodel.JConditional;
 import com.sun.codemodel.JExpr;
 
+import static org.bouncycastle.asn1.x500.style.RFC4519Style.l;
+
 public class TopNBatch extends AbstractRecordBatch<TopN> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopNBatch.class);
 
@@ -290,8 +293,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     VectorContainer newContainer = new VectorContainer(oContext);
     @SuppressWarnings("resource")
     SelectionVector4 selectionVector4 = priorityQueue.getHeapSv4();
-    SimpleRecordBatch batch = new SimpleRecordBatch(c, selectionVector4, context);
-    SimpleRecordBatch newBatch = new SimpleRecordBatch(newContainer, null, context);
+    SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
+    SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context);
     if (copier == null) {
       copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(),  newContainer, newBatch, null);
     } else {
@@ -391,8 +394,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     final VectorContainer newContainer = new VectorContainer(oContext);
     @SuppressWarnings("resource")
     final SelectionVector4 selectionVector4 = priorityQueue.getHeapSv4();
-    final SimpleRecordBatch batch = new SimpleRecordBatch(c, selectionVector4, context);
-    final SimpleRecordBatch newBatch = new SimpleRecordBatch(newContainer, null, context);
+    final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
+    final SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context);
     copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(),  newContainer, newBatch, null);
     @SuppressWarnings("resource")
     SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator());
@@ -440,26 +443,12 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   }
 
 
-  public static class SimpleRecordBatch implements RecordBatch {
-
-    private VectorContainer container;
+  public static class SimpleSV4RecordBatch extends SimpleRecordBatch {
     private SelectionVector4 sv4;
-    private FragmentContext context;
 
-    public SimpleRecordBatch(VectorContainer container, SelectionVector4 sv4, FragmentContext context) {
-      this.container = container;
+    public SimpleSV4RecordBatch(VectorContainer container, SelectionVector4 sv4, FragmentContext context) {
+      super(container, context);
       this.sv4 = sv4;
-      this.context = context;
-    }
-
-    @Override
-    public FragmentContext getContext() {
-      return context;
-    }
-
-    @Override
-    public BatchSchema getSchema() {
-      return container.getSchema();
     }
 
     @Override
@@ -467,54 +456,14 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
       if (sv4 != null) {
         return sv4.getCount();
       } else {
-        return container.getRecordCount();
+        return super.getRecordCount();
       }
     }
 
     @Override
-    public void kill(boolean sendUpstream) {
-    }
-
-    @Override
-    public SelectionVector2 getSelectionVector2() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
     public SelectionVector4 getSelectionVector4() {
       return sv4;
     }
-
-    @Override
-    public TypedFieldId getValueVectorId(SchemaPath path) {
-      return container.getValueVectorId(path);
-    }
-
-    @Override
-    public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
-      return container.getValueAccessorById(clazz, ids);
-    }
-
-    @Override
-    public IterOutcome next() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public WritableBatch getWritableBatch() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Iterator<VectorWrapper<?>> iterator() {
-      return container.iterator();
-    }
-
-    @Override
-    public VectorContainer getOutgoingContainer() {
-      throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
-    }
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 1f74ba1..8c899aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.physical.impl.common.HashTableStats;
 import org.apache.drill.exec.physical.impl.common.IndexPointer;
 import org.apache.drill.exec.physical.impl.common.Comparator;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -64,16 +65,10 @@ import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JExpression;
 import com.sun.codemodel.JVar;
 
-public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
+public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
   public static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
   public static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000;
 
-  // Probe side record batch
-  private final RecordBatch left;
-
-  // Build side record batch
-  private final RecordBatch right;
-
   // Join type, INNER, LEFT, RIGHT or OUTER
   private final JoinRelType joinType;
 
@@ -145,9 +140,6 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
   // indicates if we have previously returned an output batch
   boolean firstOutputBatch = true;
 
-  IterOutcome leftUpstream = IterOutcome.NONE;
-  IterOutcome rightUpstream = IterOutcome.NONE;
-
   private final HashTableStats htStats = new HashTableStats();
 
   public enum Metric implements MetricDef {
@@ -172,16 +164,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
 
   @Override
   protected void buildSchema() throws SchemaChangeException {
-    leftUpstream = next(left);
-    rightUpstream = next(right);
-
-    if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
-      state = BatchState.STOP;
-      return;
-    }
-
-    if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
-      state = BatchState.OUT_OF_MEMORY;
+    if (! prefetchFirstBatchFromBothSides()) {
       return;
     }
 
@@ -503,11 +486,11 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     }
   }
 
-  public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left,
-      RecordBatch right) throws OutOfMemoryException {
-    super(popConfig, context, true);
-    this.left = left;
-    this.right = right;
+  public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
+      RecordBatch left, /*Probe side record batch*/
+      RecordBatch right /*Build side record batch*/
+  ) throws OutOfMemoryException {
+    super(popConfig, context, true, left, right);
     joinType = popConfig.getJoinType();
     conditions = popConfig.getConditions();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index e599702..a1b8dc2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -151,6 +151,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
       state = BatchState.OUT_OF_MEMORY;
       return;
     }
+
+    if (leftOutcome == IterOutcome.NONE && rightOutcome == IterOutcome.NONE) {
+      state = BatchState.DONE;
+      return;
+    }
+
     allocateBatch(true);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 35cc710..b390e41 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -42,6 +42,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
 import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
@@ -62,7 +63,7 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 /*
  * RecordBatch implementation for the nested loop join operator
  */
-public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP> {
+public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoinPOP> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class);
 
   // Maximum number records in the outgoing batch
@@ -72,24 +73,12 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
   protected static final int LEFT_INPUT = 0;
   protected static final int RIGHT_INPUT = 1;
 
-  // Left input to the nested loop join operator
-  private final RecordBatch left;
-
   // Schema on the left side
   private BatchSchema leftSchema = null;
 
-  // state (IterOutcome) of the left input
-  private IterOutcome leftUpstream = IterOutcome.NONE;
-
-  // Right input to the nested loop join operator.
-  private final RecordBatch right;
-
   // Schema on the right side
   private BatchSchema rightSchema = null;
 
-  // state (IterOutcome) of the right input
-  private IterOutcome rightUpstream = IterOutcome.NONE;
-
   // Runtime generated class implementing the NestedLoopJoin interface
   private NestedLoopJoin nljWorker = null;
 
@@ -134,11 +123,9 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
       EMIT_LEFT_CONSTANT, EMIT_LEFT);
 
   protected NestedLoopJoinBatch(NestedLoopJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
-    super(popConfig, context);
+    super(popConfig, context, left, right);
     Preconditions.checkNotNull(left);
     Preconditions.checkNotNull(right);
-    this.left = left;
-    this.right = right;
   }
 
   /**
@@ -352,18 +339,8 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
    */
   @Override
   protected void buildSchema() throws SchemaChangeException {
-
     try {
-      leftUpstream = next(LEFT_INPUT, left);
-      rightUpstream = next(RIGHT_INPUT, right);
-
-      if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
-        state = BatchState.STOP;
-        return;
-      }
-
-      if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
-        state = BatchState.OUT_OF_MEMORY;
+      if (! prefetchFirstBatchFromBothSides()) {
         return;
       }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/fde0a1df/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 9a72fcb..30efeec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -17,10 +17,10 @@
  */
 package org.apache.drill.exec.physical.impl.project;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-
+import com.carrotsearch.hppc.IntHashSet;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.collections.map.CaseInsensitiveMap;
 import org.apache.drill.common.expression.ConvertExpression;
 import org.apache.drill.common.expression.ErrorCollector;
@@ -35,6 +35,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.expression.ValueExpressions;
 import org.apache.drill.common.expression.fn.CastFunctions;
 import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -51,24 +52,31 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.planner.StarColumnHelper;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.SimpleRecordBatch;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.UntypedNullHolder;
+import org.apache.drill.exec.vector.UntypedNullVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
-import com.carrotsearch.hppc.IntHashSet;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 
 public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
@@ -165,8 +173,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
             // Only need to add the schema for the complex exprs because others should already have
             // been setup during setupNewSchema
             for (FieldReference fieldReference : complexFieldReferencesList) {
-              container.addOrGet(fieldReference.getRootSegment().getPath(),
-                  Types.required(MinorType.MAP), MapVector.class);
+              MaterializedField field = MaterializedField.create(fieldReference.getAsNamePart().getName(), UntypedNullHolder.TYPE);
+              container.add(new UntypedNullVector(field, container.getAllocator()));
             }
             container.buildSchema(SelectionVectorMode.NONE);
             wasNone = true;
@@ -302,8 +310,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     return expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
   }
 
-  @Override
-  protected boolean setupNewSchema() throws SchemaChangeException {
+  private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaChangeException {
     if (allocationVectors != null) {
       for (final ValueVector v : allocationVectors) {
         v.clear();
@@ -322,7 +329,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
     final ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
-//    cg.getCodeGenerator().saveCodeForDebugging(true);
+    //    cg.getCodeGenerator().saveCodeForDebugging(true);
 
     final IntHashSet transferFieldIds = new IntHashSet();
 
@@ -335,14 +342,14 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       result.clear();
 
       if (classify && namedExpression.getExpr() instanceof SchemaPath) {
-        classifyExpr(namedExpression, incoming, result);
+        classifyExpr(namedExpression, incomingBatch, result);
 
         if (result.isStar) {
           // The value indicates which wildcard we are processing now
           final Integer value = result.prefixMap.get(result.prefix);
           if (value != null && value == 1) {
             int k = 0;
-            for (final VectorWrapper<?> wrapper : incoming) {
+            for (final VectorWrapper<?> wrapper : incomingBatch) {
               final ValueVector vvIn = wrapper.getValueVector();
               if (k > result.outputNames.size() - 1) {
                 assert false;
@@ -363,7 +370,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
             }
           } else if (value != null && value > 1) { // subsequent wildcards should do a copy of incoming valuevectors
             int k = 0;
-            for (final VectorWrapper<?> wrapper : incoming) {
+            for (final VectorWrapper<?> wrapper : incomingBatch) {
               final ValueVector vvIn = wrapper.getValueVector();
               final SchemaPath originalPath = SchemaPath.getSimplePath(vvIn.getField().getName());
               if (k > result.outputNames.size() - 1) {
@@ -378,9 +385,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
                 continue;
               }
 
-              final LogicalExpression expr = ExpressionTreeMaterializer.materialize(originalPath, incoming, collector, context.getFunctionRegistry());
+              final LogicalExpression expr = ExpressionTreeMaterializer.materialize(originalPath, incomingBatch, collector, context.getFunctionRegistry() );
               if (collector.hasErrors()) {
-                throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
+                throw new SchemaChangeException(String.format("Failure while trying to materialize incomingBatch schema.  Errors:\n %s.", collector.toErrorString()));
               }
 
               final MaterializedField outputField = MaterializedField.create(name, expr.getMajorType());
@@ -417,23 +424,23 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         }
       }
 
-      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming,
-              collector, context.getFunctionRegistry(), true, unionTypeEnabled);
+      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incomingBatch,
+          collector, context.getFunctionRegistry(), true, unionTypeEnabled);
       final MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType());
       if (collector.hasErrors()) {
         throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
       }
 
       // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
-      if (expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE
+      if (expr instanceof ValueVectorReadExpression && incomingBatch.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE
           && !((ValueVectorReadExpression) expr).hasReadPath()
           && !isAnyWildcard
           && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])) {
 
         final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
         final TypedFieldId id = vectorRead.getFieldId();
-        final ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
-        Preconditions.checkNotNull(incoming);
+        final ValueVector vvIn = incomingBatch.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
+        Preconditions.checkNotNull(incomingBatch);
 
         final FieldReference ref = getRef(namedExpression);
         final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(),
@@ -473,7 +480,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
           final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
           if (!vectorRead.hasReadPath()) {
             final TypedFieldId id = vectorRead.getFieldId();
-            final ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
+            final ValueVector vvIn = incomingBatch.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
             vvIn.makeTransferPair(vector);
           }
         }
@@ -485,12 +492,17 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       CodeGenerator<Projector> codeGen = cg.getCodeGenerator();
       codeGen.plainJavaCapable(true);
       // Uncomment out this line to debug the generated code.
-//      codeGen.saveCodeForDebugging(true);
+      //      codeGen.saveCodeForDebugging(true);
       this.projector = context.getImplementationClass(codeGen);
-      projector.setup(context, incoming, this, transfers);
+      projector.setup(context, incomingBatch, this, transfers);
     } catch (ClassTransformationException | IOException e) {
       throw new SchemaChangeException("Failure while attempting to load generated class", e);
     }
+  }
+
+  @Override
+  protected boolean setupNewSchema() throws SchemaChangeException {
+    setupNewSchemaFromInput(this.incoming);
     if (container.isSchemaChanged()) {
       container.buildSchema(SelectionVectorMode.NONE);
       return true;
@@ -624,11 +636,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
 
     final int incomingSchemaSize = incoming.getSchema().getFieldCount();
 
-    // for debugging..
-    // if (incomingSchemaSize > 9) {
-    // assert false;
-    // }
-
     // input is '*' and output is 'prefix_*'
     if (exprIsStar && refHasPrefix && refEndsWithStar) {
       final String[] components = ref.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
@@ -768,4 +775,50 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
       }
     }
   }
+
+  /**
+   * Handle Null input specially when Project operator is for query output. This happens when input return 0 batch
+   * (returns a FAST NONE directly).
+   *
+   * <p>
+   * Project operator has to return a batch with schema derived using the following 3 rules:
+   * </p>
+   * <ul>
+   *  <li>Case 1:  *  ==>  expand into an empty list of columns. </li>
+   *  <li>Case 2:  regular column reference ==> treat as nullable-int column </li>
+   *  <li>Case 3:  expressions => Call ExpressionTreeMaterialization over an empty vector contain.
+   *           Once the expression is materialized without error, use the output type of materialized
+   *           expression. </li>
+   * </ul>
+   *
+   * <p>
+   * The batch is constructed with the above rules, and recordCount = 0.
+   * Returned with OK_NEW_SCHEMA to down-stream operator.
+   * </p>
+   */
+  @Override
+  protected IterOutcome handleNullInput() {
+    if (! popConfig.isOutputProj()) {
+      return super.handleNullInput();
+    }
+
+    VectorContainer emptyVC = new VectorContainer();
+    emptyVC.buildSchema(SelectionVectorMode.NONE);
+    RecordBatch emptyIncomingBatch = new SimpleRecordBatch(emptyVC, context);
+
+    try {
+      setupNewSchemaFromInput(emptyIncomingBatch);
+    } catch (SchemaChangeException e) {
+      kill(false);
+      logger.error("Failure during query", e);
+      context.fail(e);
+      return IterOutcome.STOP;
+    }
+
+    doAlloc(0);
+    container.buildSchema(SelectionVectorMode.NONE);
+    wasNone = true;
+    return IterOutcome.OK_NEW_SCHEMA;
+  }
+
 }


Mime
View raw message