drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h.@apache.org
Subject [8/8] drill git commit: DRILL-2288: Fix ScanBatch violation of IterOutcome protocol and downstream chain of bugs.
Date Tue, 10 Nov 2015 21:49:43 GMT
DRILL-2288: Fix ScanBatch violation of IterOutcome protocol and downstream chain of bugs.

Increments:

2288:  Pt. 1 Core:  Added unit test.  [Drill2288GetColumnsMetadataWhenNoRowsTest, empty.json]

2288:  Pt. 1 Core:  Changed HBase test table #1's # of regions from 1 to 2.  [HBaseTestsSuite]

Also added TODO(DRILL-3954) comment about # of regions.

2288:  Pt. 2 Core:  Documented IterOutcome much more clearly.  [RecordBatch]

Also edited some related Javadoc.

2288:  Pt. 2 Hyg.:  Edited doc., added @Override, etc.  [AbstractRecordBatch, RecordBatch]

Purged unused SetupOutcome.
Added @Override.
Edited comments.
Fix some comments to doc. comments.

2288:  Pt. 3 Core&Hyg.:  Added validation of IterOutcome sequence.  [IteratorValidatorBatchIterator]

Also:
Renamed internal members for clarity.
Added comments.

2288:  Pt. 4 Core:  Fixed a NONE -> OK_NEW_SCHEMA in ScanBatch.next().  [ScanBatch]

(With nearby comments.)

2288:  Pt. 4 Hyg.:  Edited comments, reordered, whitespace.  [ScanBatch]

Reordered
Added comments.
Aligned.

2288:  Pt. 4 Core+:  Fixed UnionAllRecordBatch to receive IterOutcome sequence right.  (3659)  [UnionAllRecordBatch]

2288:  Pt. 5 Core:  Fixed ScanBatch.Mutator.isNewSchema() to stop spurious "new schema" reports (fix short-circuit OR, to call resetting method right).  [ScanBatch]

2288:  Pt. 5 Hyg.:  Renamed, edited comments, reordered.  [ScanBatch, SchemaChangeCallBack, AbstractSingleRecordBatch]

Renamed getSchemaChange -> getSchemaChangedAndReset.
Renamed schemaChange -> schemaChanged.
Added doc. comments.
Aligned.

2288:  Pt. 6 Core:  Avoided dummy Null.IntVec. column in JsonReader when not needed (MapWriter.isEmptyMap()).  [JsonReader, 3 vector files]

2288:  Pt. 6 Hyg.:  Edited comments, message.  Fixed message formatting.  [RecordReader, JSONFormatPlugin, JSONRecordReader, AbstractMapVector, JsonReader]

Fixed message formatting.
Edited comments.
Edited message.
Fixed spurious line break.

2288:  Pt. 7 Core:  Added column families in HBaseRecordReader* to avoid dummy Null.IntVec. clash.  [HBaseRecordReader]

2288:  Pt. 8 Core.1:  Cleared recordCount in OrderedPartitionRecordBatch.innerNext().  [OrderedPartitionRecordBatch]

2288:  Pt. 8 Core.2:  Cleared recordCount in ProjectRecordBatch.innerNext.  [ProjectRecordBatch]

2288:  Pt. 8 Core.3:  Cleared recordCount in TopNBatch.innerNext.  [TopNBatch]

2288:  Pt. 9 Core:  Had UnorderedReceiverBatch reset RecordBatchLoader's record count.  [UnorderedReceiverBatch, RecordBatchLoader]

2288:  Pt. 9 Hyg.:  Added comments.  [RecordBatchLoader]

2288:  Pt. 10 Core:  Worked around mismatched map child vectors in MapVector.getObject().  [MapVector]

2288:  Pt. 11 Core:  Added OK_NEW_SCHEMA schema comparison for HashAgg.  [HashAggTemplate]

2288:  Pt. 12 Core:  Fixed memory leak in BaseTestQuery's printing.

Fixed bad skipping of RecordBatchLoader.clear(...) and
QueryDataBatch.load(...) for zero-row batches in printResult(...).

Also, dropped suppression of call to
VectorUtil.showVectorAccessibleContent(...) (so zero-row batches are
as visible as others).

2288:  Pt. 13 Core:  Fixed test that used unhandled periods in column alias identifiers.

2288:  Misc.:  Added # of rows to showVectorAccessibleContent's output.  [VectorUtil]

2288:  Misc.:  Added simple/partial toString() [VectorContainer, AbstractRecordReader, JSONRecordReader, BaseValueVector, FieldSelection, AbstractBaseWriter]

2288:  Misc. Hyg.:  Added doc. comments to VectorContainer.  [VectorContainer]

2288:  Misc. Hyg.:  Edited comment.  [DrillStringUtils]

2288:  Misc. Hyg.:  Clarified message for unhandled identifier containing period.

2288:  Pt. 3 Core&Hyg. Upd.:  Added schema comparison result to logging.  [IteratorValidatorBatchIterator]

2288:  Pt. 7 Core Upd.:  Handled HBase columns too re NullableIntVectors.  [HBaseRecordReader, TestTableGenerator, TestHBaseFilterPushDown]

Created map-child vectors for requested columns.
Added unit test method testDummyColumnsAreAvoided, adding new row to test table,
updated some row counts.

2288:  Pt. 7 Hyg. Upd.:  Edited comment.  [HBaseRecordReader]

2288:  Pt. 11 Core Upd.:  REVERTED all of bad OK_NEW_SCHEMA schema comparison for HashAgg.  [HashAggTemplate]

This reverts commit 0939660f4620c03da97f4e1bf25a27514e6d0b81.

2288:  Pt. 6 Core Upd.:  Added isEmptyMap override in new (just-rebased-in) PromotableWriter.  [PromotableWriter]

Adjusted definition and default implementation of isEmptyMap (to handle MongoDB
storage plugin's use of JsonReader).

2288:  Pt. 6 Hyg. Upd.:  Purged old atLeastOneWrite flag.  [JsonReader]

2288:  Pt. 14:  Disabled newly dying test testNestedFlatten().


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a0be3ae0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a0be3ae0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a0be3ae0

Branch: refs/heads/master
Commit: a0be3ae0a5a69634be98cc517bcc31c11ffec91d
Parents: 194680c
Author: dbarclay <dbarclay@maprtech.com>
Authored: Tue Oct 27 19:25:25 2015 -0700
Committer: Hanifi Gunes <hanifigunes@gmail.com>
Committed: Mon Nov 9 12:17:03 2015 -0800

----------------------------------------------------------------------
 .../drill/common/expression/FieldReference.java |   6 +-
 .../drill/common/util/DrillStringUtils.java     |   8 +-
 .../exec/store/hbase/HBaseRecordReader.java     |  34 ++-
 .../drill/hbase/HBaseRecordReaderTest.java      |   2 +-
 .../org/apache/drill/hbase/HBaseTestsSuite.java |   4 +-
 .../drill/hbase/TestHBaseFilterPushDown.java    |  28 +-
 .../drill/hbase/TestHBaseProjectPushDown.java   |  12 +-
 .../apache/drill/hbase/TestTableGenerator.java  |   6 +
 .../codegen/templates/AbstractFieldWriter.java  |  11 +
 .../src/main/codegen/templates/BaseWriter.java  |  11 +
 .../src/main/codegen/templates/MapWriters.java  |   5 +
 .../drill/exec/physical/impl/ScanBatch.java     |  80 ++++--
 .../exec/physical/impl/TopN/TopNBatch.java      |   1 +
 .../OrderedPartitionRecordBatch.java            |   1 +
 .../impl/project/ProjectRecordBatch.java        |   1 +
 .../impl/union/UnionAllRecordBatch.java         |  43 +--
 .../UnorderedReceiverBatch.java                 |   1 +
 .../IteratorValidatorBatchIterator.java         | 262 ++++++++++++++++---
 .../drill/exec/record/AbstractRecordBatch.java  |  18 +-
 .../exec/record/AbstractSingleRecordBatch.java  |   2 +-
 .../apache/drill/exec/record/RecordBatch.java   | 244 ++++++++++++++---
 .../drill/exec/record/RecordBatchLoader.java    |  28 ++
 .../drill/exec/record/VectorContainer.java      |  16 ++
 .../drill/exec/store/AbstractRecordReader.java  |   8 +
 .../apache/drill/exec/store/RecordReader.java   |   3 +-
 .../exec/store/easy/json/JSONFormatPlugin.java  |   3 +-
 .../exec/store/easy/json/JSONRecordReader.java  |  12 +-
 .../org/apache/drill/exec/util/VectorUtil.java  |   2 +
 .../drill/exec/vector/BaseValueVector.java      |   5 +
 .../drill/exec/vector/SchemaChangeCallBack.java |  26 +-
 .../exec/vector/complex/AbstractMapVector.java  |   4 +-
 .../drill/exec/vector/complex/MapVector.java    |   9 +-
 .../exec/vector/complex/fn/FieldSelection.java  |   9 +
 .../exec/vector/complex/fn/JsonReader.java      |  37 +--
 .../vector/complex/impl/AbstractBaseWriter.java |   5 +
 .../vector/complex/impl/PromotableWriter.java   |   5 +
 .../java/org/apache/drill/BaseTestQuery.java    |   3 -
 .../complex/writer/TestComplexTypeReader.java   |   2 +
 ...ill2288GetColumnsMetadataWhenNoRowsTest.java | 201 ++++++++++++++
 exec/jdbc/src/test/resources/empty.json         |   0
 40 files changed, 977 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/common/src/main/java/org/apache/drill/common/expression/FieldReference.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/expression/FieldReference.java b/common/src/main/java/org/apache/drill/common/expression/FieldReference.java
index d97be14..7d0e86f 100644
--- a/common/src/main/java/org/apache/drill/common/expression/FieldReference.java
+++ b/common/src/main/java/org/apache/drill/common/expression/FieldReference.java
@@ -55,7 +55,11 @@ public class FieldReference extends SchemaPath {
 
   private void checkSimpleString(CharSequence value) {
     if (value.toString().contains(".")) {
-      throw new UnsupportedOperationException("Field references must be singular names.");
+      throw new UnsupportedOperationException(
+          String.format(
+              "Unhandled field reference \"%s\"; a field reference identifier"
+              + " must not have the form of a qualified name (i.e., with \".\").",
+              value));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/common/src/main/java/org/apache/drill/common/util/DrillStringUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/util/DrillStringUtils.java b/common/src/main/java/org/apache/drill/common/util/DrillStringUtils.java
index 83bfdc1..b016184 100644
--- a/common/src/main/java/org/apache/drill/common/util/DrillStringUtils.java
+++ b/common/src/main/java/org/apache/drill/common/util/DrillStringUtils.java
@@ -53,11 +53,11 @@ public class DrillStringUtils {
   }
 
   /**
-   * Escapes the characters in a {@code String} using Java String rules.
+   * Escapes the characters in a {@code String} according to Java string literal
+   * rules.
    *
-   * Deals correctly with quotes and control-chars (tab, backslash, cr, ff, etc.)
-   *
-   * So a tab becomes the characters {@code '\\'} and
+   * Deals correctly with quotes and control-chars (tab, backslash, cr, ff,
+   * etc.) so, for example, a tab becomes the characters {@code '\\'} and
    * {@code 't'}.
    *
    * Example:

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index ba10592..32780f8 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -18,10 +18,12 @@
 package org.apache.drill.exec.store.hbase;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -44,6 +46,7 @@ import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -133,7 +136,13 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
     familyVectorMap = new HashMap<String, MapVector>();
 
     try {
-      // Add Vectors to output in the order specified when creating reader
+      logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.",
+          hbaseTableName, hbaseConf.get(HConstants.ZOOKEEPER_QUORUM),
+          hbaseConf.get(HBASE_ZOOKEEPER_PORT), hbaseConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+      hTable = new HTable(hbaseConf, hbaseTableName);
+
+      // Add top-level column-family map vectors to output in the order specified
+      // when creating reader (order of first appearance in query).
       for (SchemaPath column : getColumns()) {
         if (column.equals(ROW_KEY_PATH)) {
           MaterializedField field = MaterializedField.create(column, ROW_KEY_TYPE);
@@ -142,10 +151,25 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
           getOrCreateFamilyVector(column.getRootSegment().getPath(), false);
         }
       }
-      logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.",
-          hbaseTableName, hbaseConf.get(HConstants.ZOOKEEPER_QUORUM),
-          hbaseConf.get(HBASE_ZOOKEEPER_PORT), hbaseConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
-      hTable = new HTable(hbaseConf, hbaseTableName);
+
+      // Add map and child vectors for any HBase column families and/or HBase
+      // columns that are requested (in order to avoid later creation of dummy
+      // NullableIntVectors for them).
+      final Set<Map.Entry<byte[], NavigableSet<byte []>>> familiesEntries =
+          hbaseScan.getFamilyMap().entrySet();
+      for (Map.Entry<byte[], NavigableSet<byte []>> familyEntry : familiesEntries) {
+        final String familyName = new String(familyEntry.getKey(),
+                                             StandardCharsets.UTF_8);
+        final MapVector familyVector = getOrCreateFamilyVector(familyName, false);
+        final Set<byte []> children = familyEntry.getValue();
+        if (null != children) {
+          for (byte[] childNameBytes : children) {
+            final String childName = new String(childNameBytes,
+                                                StandardCharsets.UTF_8);
+            getOrCreateColumnVector(familyVector, childName);
+          }
+        }
+      }
       resultScanner = hTable.getScanner(hbaseScan);
     } catch (SchemaChangeException | IOException e) {
       throw new ExecutionSetupException(e);

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
index 79db8b6..6414f8b 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
@@ -24,7 +24,7 @@ public class HBaseRecordReaderTest extends BaseHBaseTest {
   @Test
   public void testLocalDistributed() throws Exception {
     String planName = "/hbase/hbase_scan_screen_physical.json";
-    runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 7);
+    runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 8);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
index 2063503..4ecb4da 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java
@@ -161,12 +161,14 @@ public class HBaseTestsSuite {
   }
 
   private static void createTestTables() throws Exception {
+    // TODO(DRILL-3954):  Change number of regions from 1 to multiple for other
+    // tables and remaining problems not addressed by DRILL-2288 fixes.
     /*
      * We are seeing some issues with (Drill) Filter operator if a group scan span
      * multiple fragments. Hence the number of regions in the HBase table is set to 1.
      * Will revert to multiple region once the issue is resolved.
      */
-    TestTableGenerator.generateHBaseDataset1(admin, TEST_TABLE_1, 1);
+    TestTableGenerator.generateHBaseDataset1(admin, TEST_TABLE_1, 2);
     TestTableGenerator.generateHBaseDataset3(admin, TEST_TABLE_3, 1);
     TestTableGenerator.generateHBaseDatasetCompositeKeyDate(admin, TEST_TABLE_COMPOSITE_DATE, 1);
     TestTableGenerator.generateHBaseDatasetCompositeKeyTime(admin, TEST_TABLE_COMPOSITE_TIME, 1);

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
index 05fb0b7..7ef7954 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
@@ -18,6 +18,7 @@
 package org.apache.drill.hbase;
 
 import org.apache.drill.PlanTestBase;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestHBaseFilterPushDown extends BaseHBaseTest {
@@ -517,7 +518,7 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
         + "WHERE\n"
         + "  row_key > 'b4'";
 
-    runHBaseSQLVerifyCount(sql, 3);
+    runHBaseSQLVerifyCount(sql, 4);
 
     final String[] expectedPlan = {".*startRow=b4\\\\x00.*stopRow=,.*"};
     final String[] excludedPlan ={};
@@ -589,7 +590,7 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
         + "WHERE\n"
         + "  (row_key >= 'b5' OR row_key <= 'a2') AND (t.f.c1 >= '1' OR t.f.c1 is null)";
 
-    runHBaseSQLVerifyCount(sql, 4);
+    runHBaseSQLVerifyCount(sql, 5);
 
     final String[] expectedPlan = {".*startRow=, stopRow=, filter=FilterList OR.*GREATER_OR_EQUAL, b5.*LESS_OR_EQUAL, a2.*"};
     final String[] excludedPlan ={};
@@ -623,7 +624,7 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
         + "WHERE\n"
         + "  convert_from(row_key, 'UTF8') > 'b4'";
 
-    runHBaseSQLVerifyCount(sql, 3);
+    runHBaseSQLVerifyCount(sql, 4);
 
     final String[] expectedPlan = {".*startRow=b4\\\\x00, stopRow=,.*"};
     final String[] excludedPlan ={};
@@ -755,5 +756,26 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
 
   }
 
+  @Test
+  public void testDummyColumnsAreAvoided() throws Exception {
+    setColumnWidth(10);
+    // Key aspects:
+    // - HBase columns c2 and c3 are referenced in the query
+    // - column c2 appears in rows in one region but not in rows in a second
+    //   region, and c3 appears only in the second region
+    // - a downstream operation (e.g., sorting) doesn't handle schema changes
+    final String sql = "SELECT\n"
+        + "  row_key, \n"
+        + "  t.f .c2, t.f .c3, \n"
+        + "  t.f2.c2, t.f2.c3 \n"
+        + "FROM\n"
+        + "  hbase.`[TABLE_NAME]` t\n"
+        + "WHERE\n"
+        + "  row_key = 'a3' OR row_key = 'b7' \n"
+        + "ORDER BY row_key";
+
+    runHBaseSQLVerifyCount(sql, 2);
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
index b27b2a0..befe1d8 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java
@@ -28,7 +28,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
         + "row_key\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName"
-        , 7);
+        , 8);
   }
 
   @Test
@@ -45,10 +45,14 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
   public void testRowKeyAndColumnPushDown() throws Exception{
     setColumnWidths(new int[] {8, 9, 6, 2, 6});
     runHBaseSQLVerifyCount("SELECT\n"
-        + "row_key, t.f.c1*31 as `t.f.c1*31`, t.f.c2 as `t.f.c2`, 5 as `5`, 'abc' as `'abc'`\n"
+        // Note:  Can't currently use period in column alias (not even with
+        // qualified identifier) because Drill internals don't currently encode
+        // names sufficiently.
+        + "row_key, t.f.c1 * 31 as `t dot f dot c1 * 31`, "
+        + "t.f.c2 as `t dot f dot c2`, 5 as `5`, 'abc' as `'abc'`\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` t"
-        , 7);
+        , 8);
   }
 
   @Test
@@ -58,7 +62,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest {
         + "row_key, f, f2\n"
         + "FROM\n"
         + "  hbase.`[TABLE_NAME]` tableName"
-        , 7);
+        , 8);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
index e738bba..77e9d64 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java
@@ -118,6 +118,12 @@ public class TestTableGenerator {
     p.add("f".getBytes(), "c8".getBytes(), "5".getBytes());
     p.add("f2".getBytes(), "c9".getBytes(), "6".getBytes());
     table.put(p);
+
+    p = new Put("b7".getBytes());
+    p.add("f".getBytes(), "c1".getBytes(), "1".getBytes());
+    p.add("f".getBytes(), "c2".getBytes(), "2".getBytes());
+    table.put(p);
+
     table.flushCommits();
     table.close();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java b/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java
index 2da7141..7ab5dce 100644
--- a/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java
@@ -72,6 +72,17 @@ abstract class AbstractFieldWriter extends AbstractBaseWriter implements FieldWr
     fail("${name}");
   }
 
+  /**
+   * This implementation returns {@code false}.
+   * <p>  
+   *   Must be overridden by map writers.
+   * </p>  
+   */
+  @Override
+  public boolean isEmptyMap() {
+    return false;
+  }
+
   @Override
   public MapWriter map() {
     fail("Map");

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/codegen/templates/BaseWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/BaseWriter.java b/exec/java-exec/src/main/codegen/templates/BaseWriter.java
index da27e66..8a9ea56 100644
--- a/exec/java-exec/src/main/codegen/templates/BaseWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/BaseWriter.java
@@ -38,6 +38,17 @@ package org.apache.drill.exec.vector.complex.writer;
 
     MaterializedField getField();
 
+    /**
+     * Whether this writer is a map writer and is empty (has no children).
+     * 
+     * <p>
+     *   Intended only for use in determining whether to add dummy vector to
+     *   avoid empty (zero-column) schema, as in JsonReader.
+     * </p>
+     * 
+     */
+    boolean isEmptyMap();
+
     <#list vv.types as type><#list type.minor as minor>
     <#assign lowerName = minor.class?uncap_first />
     <#if lowerName == "int" ><#assign lowerName = "integer" /></#if>

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/codegen/templates/MapWriters.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/MapWriters.java b/exec/java-exec/src/main/codegen/templates/MapWriters.java
index 27ffbdd..93f2edb 100644
--- a/exec/java-exec/src/main/codegen/templates/MapWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/MapWriters.java
@@ -70,6 +70,11 @@ public class ${mode}MapWriter extends AbstractFieldWriter {
   }
 
   @Override
+  public boolean isEmptyMap() {
+    return 0 == container.size();
+  }
+
+  @Override
   public MaterializedField getField() {
       return container.getField();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 1ac4f7b..dbb5e00 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -67,9 +67,13 @@ public class ScanBatch implements CloseableRecordBatch {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
   private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ScanBatch.class);
 
-  private final Map<MaterializedField.Key, ValueVector> fieldVectorMap = Maps.newHashMap();
-
+  /** Main collection of fields' value vectors. */
   private final VectorContainer container = new VectorContainer();
+
+  /** Fields' value vectors indexed by fields' keys. */
+  private final Map<MaterializedField.Key, ValueVector> fieldVectorMap =
+      Maps.newHashMap();
+
   private int recordCount;
   private final FragmentContext context;
   private final OperatorContext oContext;
@@ -85,8 +89,12 @@ public class ScanBatch implements CloseableRecordBatch {
   private boolean done = false;
   private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
   private boolean hasReadNonEmptyFile = false;
-  public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, OperatorContext oContext,
-                   Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException {
+
+
+  public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
+                   OperatorContext oContext, Iterator<RecordReader> readers,
+                   List<String[]> partitionColumns,
+                   List<Integer> selectedPartitionColumns) throws ExecutionSetupException {
     this.context = context;
     this.readers = readers;
     if (!readers.hasNext()) {
@@ -123,7 +131,8 @@ public class ScanBatch implements CloseableRecordBatch {
     addPartitionVectors();
   }
 
-  public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers)
+  public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
+                   Iterator<RecordReader> readers)
       throws ExecutionSetupException {
     this(subScanConfig, context,
         context.newOperatorContext(subScanConfig, false /* ScanBatch is not subject to fragment memory limit */),
@@ -183,18 +192,27 @@ public class ScanBatch implements CloseableRecordBatch {
       while ((recordCount = currentReader.next()) == 0) {
         try {
           if (!readers.hasNext()) {
+            // We're on the last reader, and it has no (more) rows.
             currentReader.close();
             releaseAssets();
-            done = true;
+            done = true;  // have any future call to next() return NONE
+
             if (mutator.isNewSchema()) {
+              // This last reader has a new schema (e.g., we have a zero-row
+              // file or other source).  (Note that some sources have a non-
+              // null/non-trivial schema even when there are no rows.)
+
               container.buildSchema(SelectionVectorMode.NONE);
               schema = container.getSchema();
+
+              return IterOutcome.OK_NEW_SCHEMA;
             }
             return IterOutcome.NONE;
           }
+          // At this point, the reader that hit its end is not the last reader.
 
           // If all the files we have read so far are just empty, the schema is not useful
-          if(!hasReadNonEmptyFile) {
+          if (! hasReadNonEmptyFile) {
             container.clear();
             for (ValueVector v : fieldVectorMap.values()) {
               v.clear();
@@ -221,6 +239,7 @@ public class ScanBatch implements CloseableRecordBatch {
           return IterOutcome.STOP;
         }
       }
+      // At this point, the current reader has read 1 or more rows.
 
       hasReadNonEmptyFile = true;
       populatePartitionVectors();
@@ -264,7 +283,7 @@ public class ScanBatch implements CloseableRecordBatch {
       for (int i : selectedPartitionColumns) {
         final MaterializedField field =
             MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i),
-                Types.optional(MinorType.VARCHAR));
+                                     Types.optional(MinorType.VARCHAR));
         final ValueVector v = mutator.addField(field, NullableVarCharVector.class);
         partitionVectors.add(v);
       }
@@ -313,19 +332,26 @@ public class ScanBatch implements CloseableRecordBatch {
   }
 
   private class Mutator implements OutputMutator {
-    private boolean schemaChange = true;
+    /** Whether schema has changed since last inquiry (via #isNewSchema}).  Is
+     *  true before first inquiry. */
+    private boolean schemaChanged = true;
+
 
+    @SuppressWarnings("unchecked")
     @Override
-    public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
-      // Check if the field exists
+    public <T extends ValueVector> T addField(MaterializedField field,
+                                              Class<T> clazz) throws SchemaChangeException {
+      // Check if the field exists.
       ValueVector v = fieldVectorMap.get(field.key());
       if (v == null || v.getClass() != clazz) {
-        // Field does not exist add it to the map and the output container
+        // Field does not exist--add it to the map and the output container.
         v = TypeHelper.getNewVector(field, oContext.getAllocator(), callBack);
         if (!clazz.isAssignableFrom(v.getClass())) {
-          throw new SchemaChangeException(String.format(
-              "The class that was provided %s does not correspond to the expected vector type of %s.",
-              clazz.getSimpleName(), v.getClass().getSimpleName()));
+          throw new SchemaChangeException(
+              String.format(
+                  "The class that was provided, %s, does not correspond to the "
+                  + "expected vector type of %s.",
+                  clazz.getSimpleName(), v.getClass().getSimpleName()));
         }
 
         final ValueVector old = fieldVectorMap.put(field.key(), v);
@@ -335,8 +361,8 @@ public class ScanBatch implements CloseableRecordBatch {
         }
 
         container.add(v);
-        // Adding new vectors to the container mark that the schema has changed
-        schemaChange = true;
+        // Added new vectors to the container--mark that the schema has changed.
+        schemaChanged = true;
       }
 
       return clazz.cast(v);
@@ -349,11 +375,21 @@ public class ScanBatch implements CloseableRecordBatch {
       }
     }
 
+    /**
+     * Reports whether schema has changed (field was added or re-added) since
+     * last call to {@link #isNewSchema}.  Returns true at first call.
+     */
     @Override
     public boolean isNewSchema() {
-      // Check if top level schema has changed, second condition checks if one of the deeper map schema has changed
-      if (schemaChange || callBack.getSchemaChange()) {
-        schemaChange = false;
+      // Check if top-level schema or any of the deeper map schemas has changed.
+
+      // Note:  Callback's getSchemaChangedAndReset() must get called in order
+      // to reset it and avoid false reports of schema changes in future.  (Be
+      // careful with short-circuit OR (||) operator.)
+
+      final boolean deeperSchemaChanged = callBack.getSchemaChangedAndReset();
+      if (schemaChanged || deeperSchemaChanged) {
+        schemaChanged = false;
         return true;
       }
       return false;
@@ -392,6 +428,8 @@ public class ScanBatch implements CloseableRecordBatch {
 
   @Override
   public VectorContainer getOutgoingContainer() {
-    throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
+    throw new UnsupportedOperationException(
+        String.format("You should not call getOutgoingContainer() for class %s",
+                      this.getClass().getCanonicalName()));
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 3ef6bfe..aca7549 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -159,6 +159,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
 
   @Override
   public IterOutcome innerNext() {
+    recordCount = 0;
     if (state == BatchState.DONE) {
       return IterOutcome.NONE;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 3061f99..629a3e2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -462,6 +462,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
 
   @Override
   public IterOutcome innerNext() {
+    recordCount = 0;
     container.zeroVectors();
 
     // if we got IterOutcome.NONE while getting partition vectors, and there are no batches on the queue, then we are

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index ab01db4..ce7c5ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -124,6 +124,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
 
   @Override
   public IterOutcome innerNext() {
+    recordCount = 0;
     if (hasRemainder) {
       handleRemainder();
       return IterOutcome.OK;

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 445568b..357269d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -103,7 +103,7 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
   public IterOutcome innerNext() {
     try {
       IterOutcome upstream = unionAllInput.nextBatch();
-      logger.debug("Upstream of Union-All: ", upstream.toString());
+      logger.debug("Upstream of Union-All: {}", upstream);
       switch(upstream) {
         case NONE:
         case OUT_OF_MEMORY:
@@ -306,28 +306,36 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
           case OUT_OF_MEMORY:
             return iterLeft;
 
-          case NONE:
-            throw new SchemaChangeException("The left input of Union-All should not come from an empty data source");
-
           default:
-            throw new IllegalStateException(String.format("Unknown state %s.", iterLeft));
+            throw new IllegalStateException(
+                String.format("Unexpected state %s.", iterLeft));
         }
 
         IterOutcome iterRight = rightSide.nextBatch();
         switch(iterRight) {
           case OK_NEW_SCHEMA:
             // Unless there is no record batch on the left side of the inputs,
-            // always start processing from the left side
+            // always start processing from the left side.
             unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
-            inferOutputFields();
-            break;
 
-          case NONE:
-            // If the right input side comes from an empty data source,
-            // use the left input side's schema directly
-            unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
-            inferOutputFieldsFromLeftSide();
-            rightIsFinish = true;
+            // If the record count of the first batch from right input is zero,
+            // there are two possibilities:
+            // 1. The right side is an empty input (e.g., file).
+            // 2. There will be more records carried by later batches.
+            if (rightSide.getRecordBatch().getRecordCount() == 0) {
+              iterRight = rightSide.nextBatch();
+
+              if (iterRight == IterOutcome.NONE) {
+                // Case 1: The right side was an empty input.
+                inferOutputFieldsFromLeftSide();
+                rightIsFinish = true;
+              } else {
+                // Case 2: There are more records carried by the latter batches.
+                inferOutputFields();
+              }
+            } else {
+              inferOutputFields();
+            }
             break;
 
           case STOP:
@@ -335,7 +343,8 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
             return iterRight;
 
           default:
-            throw new IllegalStateException(String.format("Unknown state %s.", iterRight));
+            throw new IllegalStateException(
+                String.format("Unexpected state %s.", iterRight));
         }
 
         upstream = IterOutcome.OK_NEW_SCHEMA;
@@ -387,7 +396,7 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
               return upstream;
 
             default:
-              throw new SchemaChangeException("Schema change detected in the left input of Union-All. This is not currently supported");
+              throw new IllegalStateException(String.format("Unknown state %s.", iterOutcome));
           }
         } else {
           IterOutcome iterOutcome = leftSide.nextBatch();
@@ -535,4 +544,4 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
       }
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index caabfce..fafa14e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -150,6 +150,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
 
   @Override
   public IterOutcome next() {
+    batchLoader.resetRecordCount();
     stats.startProcessing();
     try{
       RawFragmentBatch batch;

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index ed7da9b..01c3c92 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -33,36 +33,113 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.util.BatchPrinter;
 import org.apache.drill.exec.vector.VectorValidator;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.*;
+
+
 public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorBatchIterator.class);
+  private static final org.slf4j.Logger logger =
+      org.slf4j.LoggerFactory.getLogger(IteratorValidatorBatchIterator.class);
 
   static final boolean VALIDATE_VECTORS = false;
 
-  private IterOutcome state = IterOutcome.NOT_YET;
+  /** For logging/debuggability only. */
+  private static volatile int instanceCount;
+
+  /** For logging/debuggability only. */
+  private final int instNum;
+  {
+    instNum = ++instanceCount;
+  }
+
+  /**
+   * The upstream batch, calls to which and return values from which are
+   * checked by this validator.
+   */
   private final RecordBatch incoming;
-  private boolean first = true;
+
+  /** Incoming batch's type (simple class name); for logging/debuggability
+   *  only. */
+  private final String batchTypeName;
+
+  /** Exception state of incoming batch; last value thrown by its next()
+   *  method. */
+  private Throwable exceptionState = null;
+
+  /** Main state of incoming batch; last value returned by its next() method. */
+  private IterOutcome batchState = null;
+
+  /** Last schema retrieved after OK_NEW_SCHEMA or OK from next().  Null if none
+   *  yet. Currently for logging/debuggability only. */
+  private BatchSchema lastSchema = null;
+
+  /** Last schema retrieved after OK_NEW_SCHEMA from next().  Null if none yet.
+   *  Currently for logging/debuggability only. */
+  private BatchSchema lastNewSchema = null;
+
+  /**
+   * {@link IterOutcome} return value sequence validation state.
+   * (Only needs enough to validate returns of OK.)
+   */
+  private enum ValidationState {
+    /** Initial state:  Have not gotten any OK_NEW_SCHEMA yet and not
+     *  terminated.  OK is not allowed yet. */
+    INITIAL_NO_SCHEMA,
+    /** Have gotten OK_NEW_SCHEMA already and not terminated.  OK is allowed
+     *  now. */
+    HAVE_SCHEMA,
+    /** Terminal state:  Have seen NONE or STOP.  Nothing more is allowed. */
+    TERMINAL
+  }
+
+  /** High-level IterOutcome sequence state. */
+  private ValidationState validationState = ValidationState.INITIAL_NO_SCHEMA;
+
 
   public IteratorValidatorBatchIterator(RecordBatch incoming) {
     this.incoming = incoming;
+    batchTypeName = incoming.getClass().getSimpleName();
+
+    // (Log construction and close() at same level to bracket instance's activity.)
+    logger.trace( "[#{}; on {}]: Being constructed.", instNum, batchTypeName);
   }
 
-  private void validateReadState() {
-    switch (state) {
+  @Override
+  public String toString() {
+    return
+        super.toString()
+        + "["
+        + "instNum = " + instNum
+        + ", validationState = " + validationState
+        + ", batchState = " + batchState
+        + ", ... "
+        + "; incoming = " + incoming
+        + "]";
+  }
+
+  private void validateReadState(String operation) {
+    if (batchState == null) {
+      throw new IllegalStateException(
+          String.format(
+              "Batch data read operation (%s) attempted before first next() call"
+              + " on batch [#%d, %s].",
+              operation, instNum, batchTypeName));
+    }
+    switch (batchState) {
     case OK:
     case OK_NEW_SCHEMA:
       return;
     default:
       throw new IllegalStateException(
-          String
-              .format(
-                  "You tried to do a batch data read operation when you were in a state of %s.  You can only do this type of operation when you are in a state of OK or OK_NEW_SCHEMA.",
-                  state.name()));
+          String.format(
+              "Batch data read operation (%s) attempted when last next() call"
+              + " on batch [#%d, %s] returned %s (not %s or %s).",
+              operation, instNum, batchTypeName, batchState, OK, OK_NEW_SCHEMA));
     }
   }
 
   @Override
   public Iterator<VectorWrapper<?>> iterator() {
-    validateReadState();
+    validateReadState("iterator()");
     return incoming.iterator();
   }
 
@@ -78,7 +155,7 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
 
   @Override
   public int getRecordCount() {
-    validateReadState();
+    validateReadState("getRecordCount()");
     return incoming.getRecordCount();
   }
 
@@ -89,19 +166,19 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
 
   @Override
   public SelectionVector2 getSelectionVector2() {
-    validateReadState();
+    validateReadState("getSelectionVector2()");
     return incoming.getSelectionVector2();
   }
 
   @Override
   public SelectionVector4 getSelectionVector4() {
-    validateReadState();
+    validateReadState("getSelectionVector4()");
     return incoming.getSelectionVector4();
   }
 
   @Override
   public TypedFieldId getValueVectorId(SchemaPath path) {
-    validateReadState();
+    validateReadState("getValueVectorId(SchemaPath)");
     return incoming.getValueVectorId(path);
   }
 
@@ -113,48 +190,161 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
 
   @Override
   public IterOutcome next() {
-    if (state == IterOutcome.NONE ) {
-      throw new IllegalStateException("The incoming iterator has previously moved to a state of NONE. You should not be attempting to call next() again.");
-    }
-    state = incoming.next();
-    if (first) {
-      first = !first;
-    }
+    logger.trace( "[#{}; on {}]: next() called.", instNum, batchTypeName);
+    final IterOutcome prevBatchState = batchState;
+    try {
 
-    if (state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) {
-      BatchSchema schema = incoming.getSchema();
-      if (schema == null) {
-        return state;
+      // Check whether next() should even have been called in current state.
+      if (null != exceptionState) {
+        throw new IllegalStateException(
+            String.format(
+                "next() [on #%d; %s] called again after it threw %s (after"
+                + " returning %s).  Caller should not have called next() again.",
+                instNum, batchTypeName, exceptionState, batchState));
       }
-
-      if (schema.getFieldCount() == 0) {
-        throw new IllegalStateException ("Incoming batch has an empty schema. This is not allowed.");
+      // (Note:  This could use validationState.)
+      if (batchState == NONE || batchState == STOP) {
+        throw new IllegalStateException(
+            String.format(
+                "next() [on #%d, %s] called again after it returned %s."
+                + "  Caller should not have called next() again.",
+                instNum, batchTypeName, batchState));
       }
-      if (incoming.getRecordCount() > MAX_BATCH_SIZE) {
-        throw new IllegalStateException (String.format("Incoming batch of %s has size %d, which is beyond the limit of %d",  incoming.getClass().getName(), incoming.getRecordCount(), MAX_BATCH_SIZE));
+
+      // Now get result from upstream next().
+      batchState = incoming.next();
+
+      logger.trace("[#{}; on {}]: incoming next() return: ({} ->) {}",
+                   instNum, batchTypeName, prevBatchState, batchState);
+
+      // Check state transition and update high-level state.
+      switch (batchState) {
+        case OK_NEW_SCHEMA:
+          // OK_NEW_SCHEMA is allowed at any time, except if terminated (checked
+          // above).
+          // OK_NEW_SCHEMA moves to have-seen-schema state.
+          validationState = ValidationState.HAVE_SCHEMA;
+          break;
+        case OK:
+          // OK is allowed as long as OK_NEW_SCHEMA was seen, except if terminated
+          // (checked above).
+          if (validationState != ValidationState.HAVE_SCHEMA) {
+            throw new IllegalStateException(
+                String.format(
+                    "next() returned %s without first returning %s [#%d, %s]",
+                    batchState, OK_NEW_SCHEMA, instNum, batchTypeName));
+          }
+          // OK doesn't change high-level state.
+          break;
+        case NONE:
+          // NONE is allowed as long as OK_NEW_SCHEMA was seen, except if
+          // already terminated (checked above).
+          if (validationState != ValidationState.HAVE_SCHEMA) {
+            throw new IllegalStateException(
+                String.format(
+                    "next() returned %s without first returning %s [#%d, %s]",
+                    batchState, OK_NEW_SCHEMA, instNum, batchTypeName));
+          }
+          // NONE moves to terminal high-level state.
+          validationState = ValidationState.TERMINAL;
+          break;
+        case STOP:
+          // STOP is allowed at any time, except if already terminated (checked
+          // above).
+          // STOP moves to terminal high-level state.
+          validationState = ValidationState.TERMINAL;
+          break;
+        case NOT_YET:
+        case OUT_OF_MEMORY:
+          // NOT_YET and OUT_OF_MEMORY are allowed at any time, except if
+          // terminated (checked above).
+          // NOT_YET and OUT_OF_MEMORY OK don't change high-level state.
+          break;
+        default:
+          throw new AssertionError(
+              "Unhandled new " + IterOutcome.class.getSimpleName() + " value "
+              + batchState);
+          //break;
       }
 
-      if (VALIDATE_VECTORS) {
-        VectorValidator.validate(incoming);
+      // Validate schema when available.
+      if (batchState == OK || batchState == OK_NEW_SCHEMA) {
+        final BatchSchema prevLastSchema = lastSchema;
+        final BatchSchema prevLastNewSchema = lastNewSchema;
+
+        lastSchema = incoming.getSchema();
+        if (batchState == OK_NEW_SCHEMA) {
+          lastNewSchema = lastSchema;
+        }
+
+        if (logger.isTraceEnabled()) {
+          logger.trace("[#{}; on {}]: incoming next() return: #records = {}, "
+                       + "\n  schema:"
+                       + "\n    {}, "
+                       + "\n  prev. new ({}):"
+                       + "\n    {}",
+                       instNum, batchTypeName, incoming.getRecordCount(),
+                       lastSchema,
+                       lastSchema.equals(prevLastNewSchema) ? "equal" : "not equal",
+                       prevLastNewSchema);
+          }
+
+        if (lastSchema == null) {
+          throw new IllegalStateException(
+              String.format(
+                  "Incoming batch [#%d, %s] has a null schema. This is not allowed.",
+                  instNum, batchTypeName));
+        }
+        if (lastSchema.getFieldCount() == 0) {
+          throw new IllegalStateException(
+              String.format(
+                  "Incoming batch [#%d, %s] has an empty schema. This is not allowed.",
+                  instNum, batchTypeName));
+        }
+        if (incoming.getRecordCount() > MAX_BATCH_SIZE) {
+          throw new IllegalStateException(
+              String.format(
+                  "Incoming batch [#%d, %s] has size %d, which is beyond the"
+                  + " limit of %d",
+                  instNum, batchTypeName, incoming.getRecordCount(), MAX_BATCH_SIZE
+                  ));
+        }
+
+        if (VALIDATE_VECTORS) {
+          VectorValidator.validate(incoming);
+        }
       }
-    }
 
-    return state;
+      return batchState;
+    }
+    catch (RuntimeException | Error e) {
+      exceptionState = e;
+      logger.trace("[#{}, on {}]: incoming next() exception: ({} ->) {}",
+                   instNum, batchTypeName, prevBatchState, exceptionState,
+                   exceptionState);
+      throw e;
+    }
   }
 
   @Override
   public WritableBatch getWritableBatch() {
-    validateReadState();
+    validateReadState("getWritableBatch()");
     return incoming.getWritableBatch();
   }
 
   @Override
   public void close() {
+    // (Log construction and close() calls at same logging level to bracket
+    // instance's activity.)
+    logger.trace( "[#{}; on {}]: close() called, state = {} / {}.",
+                  instNum, batchTypeName, batchState, exceptionState);
   }
 
   @Override
   public VectorContainer getOutgoingContainer() {
-    throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
+    throw new UnsupportedOperationException(
+        String.format("You should not call getOutgoingContainer() for class %s",
+                      this.getClass().getCanonicalName()));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index aaa6f9e..f06c397 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -74,12 +74,18 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
   }
 
   protected static enum BatchState {
-    BUILD_SCHEMA, // Need to build schema and return
-    FIRST, // This is still the first data batch
-    NOT_FIRST, // The first data batch has already been returned
-    STOP, // The query most likely failed, we need to propagate STOP to the root
-    OUT_OF_MEMORY, // Out of Memory while building the Schema...Ouch!
-    DONE // All work is done, no more data to be sent
+    /** Need to build schema and return. */
+    BUILD_SCHEMA,
+    /** This is still the first data batch. */
+    FIRST,
+    /** The first data batch has already been returned. */
+    NOT_FIRST,
+    /** The query most likely failed, we need to propagate STOP to the root. */
+    STOP,
+    /** Out of Memory while building the Schema...Ouch! */
+    OUT_OF_MEMORY,
+    /** All work is done, no more data to be sent. */
+    DONE
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index e84057b..4f91317 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -104,7 +104,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
       }
 
       // Check if schema has changed
-      if (callBack.getSchemaChange()) {
+      if (callBack.getSchemaChangedAndReset()) {
         return IterOutcome.OK_NEW_SCHEMA;
       }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 6f10a1c..8229e58 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -23,60 +23,214 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
 /**
- * A record batch contains a set of field values for a particular range of records. In the case of a record batch
- * composed of ValueVectors, ideally a batch fits within L2 cache (~256k per core). The set of value vectors do not
- * change unless the next() IterOutcome is a *_NEW_SCHEMA type.
- *
- * A key thing to know is that the Iterator provided by record batch must align with the rank positions of the field ids
- * provided utilizing getValueVectorId();
+ * A record batch contains a set of field values for a particular range of
+ * records.
+ * <p>
+ *   In the case of a record batch composed of ValueVectors, ideally a batch
+ *   fits within L2 cache (~256kB per core).  The set of value vectors does
+ *   not change except during a call to {@link #next()} that returns
+ *   {@link IterOutcome#OK_NEW_SCHEMA} value.
+ * </p>
+ * <p>
+ *   A key thing to know is that the Iterator provided by a record batch must
+ *   align with the rank positions of the field IDs provided using
+ *   {@link getValueVectorId}.
+ * </p>
  */
 public interface RecordBatch extends VectorAccessible {
 
-  /* max batch size, limited by 2-byte-length in SV2 : 65536 = 2^16 */
+  /** max batch size, limited by 2-byte length in SV2: 65536 = 2^16 */
   public static final int MAX_BATCH_SIZE = 65536;
 
   /**
-   * Describes the outcome of a RecordBatch being incremented forward.
+   * Describes the outcome of incrementing RecordBatch forward by a call to
+   * {@link #next()}.
+   * <p>
+   *   Key characteristics of the return value sequence:
+   * </p>
+   * <ul>
+   *   <li>
+   *     {@code OK_NEW_SCHEMA} always appears unless {@code STOP} appears.  (A
+   *     batch returns {@code OK_NEW_SCHEMA} before returning {@code NONE} even
+   *     if the batch has zero rows.)
+   *   </li>
+   *   <li>{@code OK_NEW_SCHEMA} always appears before {@code OK} appears.</li>
+   *   <li>
+   *     The last value is always {@code NONE} or {@code STOP}, and {@code NONE}
+   *     and {@code STOP} appear only as the last value.
+   *   </li>
+   * </ul>
+   * <p>
+   *  <strong>Details</strong>:
+   * </p>
+   * <p>
+   *   For normal completion, the basic sequence of return values from calls to
+   *   {@code next()} on a {@code RecordBatch} is:
+   * </p>
+   * <ol>
+   *   <li>
+   *     an {@link #OK_NEW_SCHEMA} value followed by zero or more {@link #OK}
+   *     values,
+   *   </li>
+   *   <li>
+   *     zero or more subsequences each having an {@code OK_NEW_SCHEMA} value
+   *     followed by zero or more {@code OK} values, and then
+   *   </li>
+   *   <li>
+   *     a {@link #NONE} value.
+   *   </li>
+   * </ol>
+   * <p>
+   *   In addition to that basic sequence, {@link #NOT_YET} and
+   *   {@link #OUT_OF_MEMORY} values can appear anywhere in the subsequence
+   *   before the terminal value ({@code NONE} or {@code STOP}).
+   * </p>
+   * <p>
+   *   For abnormal termination, the sequence is truncated (before the
+   *   {@code NONE}) and ends with {@link #STOP}.  That is, the sequence begins
+   *   with a subsequence that is some prefix of a normal-completion sequence
+   *   and that does not contain {@code NONE}, and ends with {@code STOP}.
+   * </p>
+   * <p>
+   *   (The normal-completion return sequence is matched by the following
+   *   regular-expression-style grammar:
+   *   <pre>
+   *     ( ( NOT_YET | OUT_OF_MEMORY )*  OK_NEW_SCHEMA
+   *       ( NOT_YET | OUT_OF_MEMORY )*  OK )*
+   *     )+
+   *     ( NOT_YET | OUT_OF_+MEMORY )*  NONE
+   *   </pre>
+   *   )
+   * </p>
    */
   public static enum IterOutcome {
-    NONE, // No more records were found.
-    OK, // A new range of records have been provided.
-    OK_NEW_SCHEMA, // A full collection of records
-    STOP, // Informs parent nodes that the query has terminated. In this case, a consumer can consume their QueryContext
-          // to understand the current state of things.
-    NOT_YET, // used by batches that haven't received incoming data yet.
-    OUT_OF_MEMORY // an upstream operator was unable to allocate memory. A batch receiving this should release memory if it can
-  }
+    /**
+     * Normal completion of batch.
+     * <p>
+     *   The call to {@link #next()}
+     *   read no records,
+     *   the batch has and will have no more results to return,
+     *   and {@code next()} must not be called again.
+     * </p>
+     * <p>
+     *   This value will be returned only after {@link #OK_NEW_SCHEMA} has been
+     *   returned at least once (not necessarily <em>immediately</em> after).
+     * </p>
+     */
+    NONE,
+
+    /**
+     * Zero or more records with same schema.
+     * <p>
+     *   The call to {@link #next()}
+     *   read zero or more records,
+     *   the schema has not changed since the last time {@code OK_NEW_SCHEMA}
+     *     was returned,
+     *   and the batch will have more results to return (at least completion or
+     *     abnormal termination ({@code NONE} or {@code STOP})).
+     *     ({@code next()} should be called again.)
+     * </p>
+     * <p>
+     *   This will be returned only after {@link #OK_NEW_SCHEMA} has been
+     *   returned at least once (not necessarily <em>immediately</em> after).
+     * </p>
+     */
+    OK,
+
+    /**
+     * New schema, maybe with records.
+     * <p>
+     *   The call to {@link #next()}
+     *   changed the schema and vector structures
+     *   and read zero or more records,
+     *   and the batch will have more results to return (at least completion or
+     *     abnormal termination ({@code NONE} or {@code STOP})).
+     *     ({@code next()} should be called again.)
+     * </p>
+     */
+    OK_NEW_SCHEMA,
 
-  public static enum SetupOutcome {
-    OK, OK_NEW_SCHEMA, FAILED
+    /**
+     * Non-completion (abnormal) termination.
+     * <p>
+     *   The call to {@link #next()}
+     *   reports that the query has terminated other than by normal completion,
+     *   and that the caller must not call any of the schema-access or
+     *   data-access methods nor call {@code next()} again.
+     * </p>
+     * <p>
+     *   The caller can consume its QueryContext to understand the current state
+     *   of things.
+     * </p>
+     */
+    STOP,
+
+    /**
+     * No data yet.
+     * <p>
+     *   The call to {@link #next()}
+     *   read no data,
+     *   and the batch will have more results to return in the future (at least
+     *     completion or abnormal termination ({@code NONE} or {@code STOP})).
+     *   The caller should call {@code next()} again, but should do so later
+     *     (including by returning {@code NOT_YET} to its caller).
+     * </p>
+     * <p>
+     *   Normally, the caller should perform any locally available work while
+     *   waiting for incoming data from the callee, for example, doing partial
+     *   sorts on already received data while waiting for additional data to
+     *   sort.
+     * </p>
+     * <p>
+     *   Used by batches that haven't received incoming data yet.
+     * </p>
+     */
+    NOT_YET,
+
+    /**
+     * Out of memory (not fatal).
+     * <p>
+     *   The call to {@link #next()},
+     *   including upstream operators, was unable to allocate memory
+     *   and did not read any records,
+     *   and the batch will have more results to return (at least completion or
+     *     abnormal termination ({@code NONE} or {@code STOP})).
+     *   The caller should release memory if it can (including by returning
+     *     {@code OUT_OF_MEMORY} to its caller) and call {@code next()} again.
+     * </p>
+     */
+    OUT_OF_MEMORY
   }
 
   /**
-   * Access the FragmentContext of the current query fragment. Useful for reporting failure information or other query
-   * level information.
-   *
-   * @return
+   * Gets the FragmentContext of the current query fragment.  Useful for
+   * reporting failure information or other query-level information.
    */
   public FragmentContext getContext();
 
   /**
-   * Provide the schema of the current RecordBatch. This changes if and only if a *_NEW_SCHEMA IterOutcome is provided.
-   *
-   * @return
+   * Gets the current schema of this record batch.
+   * <p>
+   *   May be called only when the most recent call to {@link #next}, if any,
+   *   returned {@link #OK_NEW_SCHEMA} or {@link #OK}.
+   * </p>
+   * <p>
+   *   The schema changes when and only when {@link #next} returns
+   *   {@link #OK_NEW_SCHEMA}.
+   * </p>
    */
+  @Override
   public BatchSchema getSchema();
 
   /**
-   * Provide the number of records that are within this record count
-   *
-   * @return
+   * Gets the number of records that are within this record.
    */
+  @Override
   public int getRecordCount();
 
   /**
-   * Inform child nodes that this query should be terminated. Child nodes should utilize the QueryContext to determine
-   * what has happened.
+   * Informs child nodes that this query should be terminated.  Child nodes
+   * should use the QueryContext to determine what has happened.
    */
   public void kill(boolean sendUpstream);
 
@@ -88,32 +242,44 @@ public interface RecordBatch extends VectorAccessible {
   public VectorContainer getOutgoingContainer();
 
   /**
-   * Get the value vector type and id for the given schema path. The TypedFieldId should store a fieldId which is the
-   * same as the ordinal position of the field within the Iterator provided this classes implementation of
-   * Iterable<ValueVector>.
+   * Gets the value vector type and ID for the given schema path.  The
+   * TypedFieldId should store a fieldId which is the same as the ordinal
+   * position of the field within the Iterator provided this class's
+   * implementation of Iterable<ValueVector>.
    *
    * @param path
    *          The path where the vector should be located.
    * @return The local field id associated with this vector. If no field matches this path, this will return a null
    *         TypedFieldId
    */
+  @Override
   public abstract TypedFieldId getValueVectorId(SchemaPath path);
+
   @Override
   public abstract VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids);
 
   /**
-   * Update the data in each Field reading interface for the next range of records. Once a RecordBatch returns an
-   * IterOutcome.NONE, the consumer should no longer next(). Behavior at this point is undetermined and likely to throw
-   * an exception.
+   * Updates the data in each Field reading interface for the next range of
+   * records.
+   * <p>
+   *   Once a RecordBatch's {@code next()} has returned {@link IterOutcome#NONE}
+   *   or {@link IterOutcome#STOP}, the consumer should no longer call
+   *   {@code next()}.  Behavior at this point is undefined and likely to
+   *   throw an exception.
+   * </p>
+   * <p>
+   *   See {@link IterOutcome} for the protocol (possible sequences of return
+   *   values).
+   * </p>
+   *
    *
    * @return An IterOutcome describing the result of the iteration.
    */
   public IterOutcome next();
 
   /**
-   * Get a writable version of this batch. Takes over owernship of existing buffers.
-   *
-   * @return
+   * Gets a writable version of this batch.  Takes over ownership of existing
+   * buffers.
    */
   public WritableBatch getWritableBatch();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 55ae309..ed86358 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -40,6 +40,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+/**
+ * Holds record batch loaded from record batch message.
+ */
 public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapper<?>>{
   private final static Logger logger = LoggerFactory.getLogger(RecordBatchLoader.class);
 
@@ -48,6 +51,10 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
   private int valueCount;
   private BatchSchema schema;
 
+
+  /**
+   * Constructs a loader using the given allocator for vector buffer allocation.
+   */
   public RecordBatchLoader(BufferAllocator allocator) {
     this.allocator = Preconditions.checkNotNull(allocator);
   }
@@ -72,6 +79,11 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
     valueCount = def.getRecordCount();
     boolean schemaChanged = schema == null;
 
+    // Load vectors from the batch buffer, while tracking added and/or removed
+    // vectors (relative to the previous call) in order to determine whether the
+    // the schema has changed since the previous call.
+
+    // Set up to recognize previous fields that no longer exist.
     final Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap();
     for(final VectorWrapper<?> wrapper : container) {
       final ValueVector vector = wrapper.getValueVector();
@@ -87,15 +99,18 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
         ValueVector vector = oldFields.remove(fieldDef);
 
         if (vector == null) {
+          // Field did not exist previously--is schema change.
           schemaChanged = true;
           vector = TypeHelper.getNewVector(fieldDef, allocator);
         } else if (!vector.getField().getType().equals(fieldDef.getType())) {
+          // Field had different type before--is schema change.
           // clear previous vector
           vector.clear();
           schemaChanged = true;
           vector = TypeHelper.getNewVector(fieldDef, allocator);
         }
 
+        // Load the vector.
         if (field.getValueCount() == 0) {
           AllocationHelper.allocate(vector, 0, 0, 0);
         } else {
@@ -179,10 +194,23 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
     return schema;
   }
 
+  public void resetRecordCount() {
+    valueCount = 0;
+  }
+
+  /**
+   * Clears this loader, which clears the internal vector container (see
+   * {@link VectorContainer#clear}) and resets the record count to zero.
+   */
   public void clear() {
     container.clear();
+    resetRecordCount();
   }
 
+  /**
+   * Sorts vectors into canonical order (by field name).  Updates schema and
+   * internal vector container.
+   */
   public void canonicalize() {
     //logger.debug( "RecordBatchLoader : before schema " + schema);
     container = VectorContainer.canonicalize(container);

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index ef22d52..815e2d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -55,6 +55,16 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
     this.oContext = oContext;
   }
 
+  @Override
+  public String toString() {
+    return super.toString()
+        + "[recordCount = " + recordCount
+        + ", schemaChanged = " + schemaChanged
+        + ", schema = " + schema
+        + ", wrappers = " + wrappers
+        + ", ...]";
+  }
+
   /**
    * Get the OperatorContext.
    *
@@ -164,6 +174,9 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
     return vc;
   }
 
+  /**
+   * Sorts vectors into canonical order (by field name) in new VectorContainer.
+   */
   public static VectorContainer canonicalize(VectorContainer original) {
     VectorContainer vc = new VectorContainer();
     List<VectorWrapper<?>> canonicalWrappers = new ArrayList<VectorWrapper<?>>(original.wrappers);
@@ -328,6 +341,9 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
     return recordCount;
   }
 
+  /**
+   * Clears the contained vectors.  (See {@link ValueVector#clear}).
+   */
   public void zeroVectors() {
     for (VectorWrapper<?> w : wrappers) {
       w.clear();

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
index 8ca3ec8..41285c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
@@ -39,6 +39,14 @@ public abstract class AbstractRecordReader implements RecordReader {
   private boolean isStarQuery = false;
   private boolean isSkipQuery = false;
 
+  @Override
+  public String toString() {
+    return super.toString()
+        + "[columns = " + columns
+        + ", isStarQuery = " + isStarQuery
+        + ", isSkipQuery = " + isSkipQuery + "]";
+  }
+
   protected final void setColumns(Collection<SchemaPath> projected) {
     assert Preconditions.checkNotNull(projected, COL_NULL_ERROR).size() > 0 : COL_EMPTY_ERROR;
     if (projected instanceof ColumnList) {

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
index c2ab0d0..f1b55e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -44,7 +44,8 @@ public interface RecordReader extends AutoCloseable {
   void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException;
 
   /**
-   * Increment record reader forward, writing into the provided output batch.
+   * Increments this record reader forward, writing via the provided output
+   * mutator into the output batch.
    *
    * @return The number of additional records added to the output.
    */

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 015fcf6..72f7d84 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -46,8 +46,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 
-public class
-        JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
+public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
 
   private static final boolean IS_COMPRESSIBLE = true;
   private static final String DEFAULT_NAME = "json";

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 0e3c908..8160f1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -66,7 +66,7 @@ public class JSONRecordReader extends AbstractRecordReader {
    * @param fragmentContext
    * @param inputPath
    * @param fileSystem
-   * @param columns
+   * @param columns  pathnames of columns/subfields to read
    * @throws OutOfMemoryException
    */
   public JSONRecordReader(final FragmentContext fragmentContext, final String inputPath, final DrillFileSystem fileSystem,
@@ -79,7 +79,7 @@ public class JSONRecordReader extends AbstractRecordReader {
    * @param fragmentContext
    * @param embeddedContent
    * @param fileSystem
-   * @param columns
+   * @param columns  pathnames of columns/subfields to read
    * @throws OutOfMemoryException
    */
   public JSONRecordReader(final FragmentContext fragmentContext, final JsonNode embeddedContent,
@@ -114,6 +114,14 @@ public class JSONRecordReader extends AbstractRecordReader {
   }
 
   @Override
+  public String toString() {
+    return super.toString()
+        + "[hadoopPath = " + hadoopPath
+        + ", recordCount = " + recordCount
+        + ", runningRecordCount = " + runningRecordCount + ", ...]";
+  }
+
+  @Override
   public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
     try{
       if (hadoopPath != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
index efbd30d..9f4115b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
@@ -37,6 +37,7 @@ public class VectorUtil {
   public static void showVectorAccessibleContent(VectorAccessible va, final String delimiter) {
 
     int rows = va.getRecordCount();
+    System.out.println(rows + " row(s):");
     List<String> columns = Lists.newArrayList();
     for (VectorWrapper<?> vw : va) {
       columns.add(vw.getValueVector().getField().getPath().getAsUnescapedPath());
@@ -138,6 +139,7 @@ public class VectorUtil {
     }
 
     int rows = va.getRecordCount();
+    System.out.println(rows + " row(s):");
     for (int row = 0; row < rows; row++) {
       // header, every 50 rows.
       if (row%50 == 0) {

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
index 7b3ab41..eb5dbcd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
@@ -47,6 +47,11 @@ public abstract class BaseValueVector implements ValueVector {
   }
 
   @Override
+  public String toString() {
+    return super.toString() + "[field = " + field + ", ...]";
+  }
+
+  @Override
   public void clear() {
     getMutator().reset();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java
index de05131..4c2491c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java
@@ -20,16 +20,32 @@ package org.apache.drill.exec.vector;
 
 import org.apache.drill.exec.util.CallBack;
 
+
 public class SchemaChangeCallBack implements CallBack {
-  private boolean schemaChange = false;
+  private boolean schemaChanged = false;
+
+  /**
+   * Constructs a schema-change callback with the schema-changed state set to
+   * {@code false}.
+   */
+  public SchemaChangeCallBack() {
+  }
 
+  /**
+   * Sets the schema-changed state to {@code true}.
+   */
+  @Override
   public void doWork() {
-    schemaChange = true;
+    schemaChanged = true;
   }
 
-  public boolean getSchemaChange() {
-    final boolean current = schemaChange;
-    schemaChange = false;
+  /**
+   * Returns the value of schema-changed state, <strong>resetting</strong> the
+   * schema-changed state to {@code false}.
+   */
+  public boolean getSchemaChangedAndReset() {
+    final boolean current = schemaChanged;
+    schemaChanged = false;
     return current;
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
index 2c93c31..35df691 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
@@ -189,8 +189,8 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
         Preconditions.checkNotNull(vector, "vector cannot be null")
     );
     if (old != null && old != vector) {
-      logger.debug("Field [%s] mutated from [%s] to [%s]", name, old.getClass().getSimpleName(),
-          vector.getClass().getSimpleName());
+      logger.debug("Field [{}] mutated from [{}] to [{}]", name, old.getClass().getSimpleName(),
+                   vector.getClass().getSimpleName());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
index 8b4b858..048358c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -309,7 +309,14 @@ public class MapVector extends AbstractMapVector {
       Map<String, Object> vv = new JsonStringHashMap<>();
       for (String child:getChildFieldNames()) {
         ValueVector v = getChild(child);
-        if (v != null) {
+        // TODO(DRILL-4001):  Resolve this hack:
+        // The index/value count check in the following if statement is a hack
+        // to work around the current fact that RecordBatchLoader.load and
+        // MapVector.load leave child vectors with a length of zero (as opposed
+        // to matching the lengths of siblings and the parent map vector)
+        // because they don't remove (or set the lengths of) vectors from
+        // previous batches that aren't in the current batch.
+        if (v != null && index < v.getAccessor().getValueCount()) {
           Object value = v.getAccessor().getObject(index);
           if (value != null) {
             vv.put(child, value);

http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/FieldSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/FieldSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/FieldSelection.java
index 1857479..dfaf5de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/FieldSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/FieldSelection.java
@@ -59,6 +59,15 @@ class FieldSelection {
     this.mode = mode;
   }
 
+  @Override
+  public String toString() {
+    return
+        super.toString()
+        + "[mode = " + mode
+        + ", children = " + children
+        + ", childrenInsensitive = " + childrenInsensitive + "]";
+  }
+
   /**
    * Create a new tree that has all leaves fixed to support full depth validity.
    */


Mime
View raw message