drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [10/12] drill git commit: DRILL-5356: Refactor Parquet Record Reader
Date Sat, 03 Jun 2017 04:46:05 GMT
DRILL-5356: Refactor Parquet Record Reader

The Parquet reader is Drill's premier data source and has worked very well
for many years. As with any piece of code, it has grown in complexity over
that time and has become hard to understand and maintain.

In work in another project, we found that Parquet is accidentally creating
"low density" batches: record batches with little actual data compared to
the amount of memory allocated. We'd like to fix that.

However, the current complexity of the reader code creates a barrier to
making improvements: the code is so complex that it is often better to
leave bugs unfixed, or risk spending large amounts of time struggling to
make small changes.

This commit offers to help revitalize the Parquet reader. Functionality is
identical to the code in master; but code has been pulled apart into
various classes each of which focuses on one part of the task: building
up a schema, keeping track of read state, a strategy for reading various
combinations of records, etc. The idea is that it is easier to understand
several small, focused classes than one huge, complex class. Indeed, the
idea of small, focused classes is common in the industry; it is nothing new.

Unit tests pass with the change. Since no logic has chanaged, we only moved
lines of code, that is a good indication that everything still works.

Also includes fixes based on review comments.

closes #789


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/676ea889
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/676ea889
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/676ea889

Branch: refs/heads/master
Commit: 676ea889bb69e9e0a733cab29665236d066bd1ab
Parents: 9ab91ff
Author: Paul Rogers <progers@maprtech.com>
Authored: Wed Mar 15 13:49:07 2017 -0700
Committer: Jinfeng Ni <jni@apache.org>
Committed: Fri Jun 2 21:43:14 2017 -0700

----------------------------------------------------------------------
 .../exec/planner/physical/PlannerSettings.java  |   2 +-
 .../exec/store/parquet/ParquetReaderStats.java  |  70 ++-
 .../store/parquet/ParquetReaderUtility.java     |   2 +-
 .../parquet/columnreaders/BatchReader.java      | 169 +++++++
 .../parquet/columnreaders/ColumnReader.java     |   6 +-
 .../columnreaders/FixedWidthRepeatedReader.java |   2 +-
 .../columnreaders/ParquetColumnMetadata.java    | 154 ++++++
 .../columnreaders/ParquetRecordReader.java      | 494 +++----------------
 .../parquet/columnreaders/ParquetSchema.java    | 265 ++++++++++
 .../store/parquet/columnreaders/ReadState.java  | 192 +++++++
 .../parquet2/DrillParquetGroupConverter.java    |   4 +-
 .../store/parquet/ParquetInternalsTest.java     | 158 ++++++
 .../test/resources/parquet/expected/bogus.csv   |  20 +
 .../resources/parquet/expected/fixedWidth.csv   |  20 +
 .../resources/parquet/expected/mixedWidth.csv   |  20 +
 .../test/resources/parquet/expected/star.csv    |  20 +
 .../parquet/expected/variableWidth.csv          |  20 +
 17 files changed, 1168 insertions(+), 450 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/676ea889/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 53d67c0..648adb7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -76,7 +76,7 @@ public class PlannerSettings implements Context{
   public static final OptionValidator HASH_JOIN_SWAP = new BooleanValidator("planner.enable_hashjoin_swap", true);
   public static final OptionValidator HASH_JOIN_SWAP_MARGIN_FACTOR = new RangeDoubleValidator("planner.join.hash_join_swap_margin_factor", 0, 100, 10d);
   public static final String ENABLE_DECIMAL_DATA_TYPE_KEY = "planner.enable_decimal_data_type";
-  public static final OptionValidator ENABLE_DECIMAL_DATA_TYPE = new BooleanValidator(ENABLE_DECIMAL_DATA_TYPE_KEY, false);
+  public static final BooleanValidator ENABLE_DECIMAL_DATA_TYPE = new BooleanValidator(ENABLE_DECIMAL_DATA_TYPE_KEY, false);
   public static final OptionValidator HEP_OPT = new BooleanValidator("planner.enable_hep_opt", true);
   public static final OptionValidator HEP_PARTITION_PRUNING = new BooleanValidator("planner.enable_hep_partition_pruning", true);
   public static final OptionValidator PLANNER_MEMORY_LIMIT = new RangeLongValidator("planner.memory_limit",

http://git-wip-us.apache.org/repos/asf/drill/blob/676ea889/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
index b1dc0be..6a7b967 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,6 +19,10 @@ package org.apache.drill.exec.store.parquet;
 
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader.Metric;
+import org.apache.hadoop.fs.Path;
+
 public class ParquetReaderStats {
 
   public AtomicLong numDictPageLoads = new AtomicLong();
@@ -48,6 +52,66 @@ public class ParquetReaderStats {
   public ParquetReaderStats() {
   }
 
-}
-
+  public void logStats(org.slf4j.Logger logger, Path hadoopPath) {
+    logger.trace(
+        "ParquetTrace,Summary,{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
+        hadoopPath,
+        numDictPageLoads,
+        numDataPageLoads,
+        numDataPagesDecoded,
+        numDictPagesDecompressed,
+        numDataPagesDecompressed,
+        totalDictPageReadBytes,
+        totalDataPageReadBytes,
+        totalDictDecompressedBytes,
+        totalDataDecompressedBytes,
+        timeDictPageLoads,
+        timeDataPageLoads,
+        timeDataPageDecode,
+        timeDictPageDecode,
+        timeDictPagesDecompressed,
+        timeDataPagesDecompressed,
+        timeDiskScanWait,
+        timeDiskScan,
+        timeFixedColumnRead,
+        timeVarColumnRead
+    );
+  }
 
+  public void update(OperatorStats stats){
+    stats.addLongStat(Metric.NUM_DICT_PAGE_LOADS,
+        numDictPageLoads.longValue());
+    stats.addLongStat(Metric.NUM_DATA_PAGE_lOADS, numDataPageLoads.longValue());
+    stats.addLongStat(Metric.NUM_DATA_PAGES_DECODED, numDataPagesDecoded.longValue());
+    stats.addLongStat(Metric.NUM_DICT_PAGES_DECOMPRESSED,
+        numDictPagesDecompressed.longValue());
+    stats.addLongStat(Metric.NUM_DATA_PAGES_DECOMPRESSED,
+        numDataPagesDecompressed.longValue());
+    stats.addLongStat(Metric.TOTAL_DICT_PAGE_READ_BYTES,
+        totalDictPageReadBytes.longValue());
+    stats.addLongStat(Metric.TOTAL_DATA_PAGE_READ_BYTES,
+        totalDataPageReadBytes.longValue());
+    stats.addLongStat(Metric.TOTAL_DICT_DECOMPRESSED_BYTES,
+        totalDictDecompressedBytes.longValue());
+    stats.addLongStat(Metric.TOTAL_DATA_DECOMPRESSED_BYTES,
+        totalDataDecompressedBytes.longValue());
+    stats.addLongStat(Metric.TIME_DICT_PAGE_LOADS,
+        timeDictPageLoads.longValue());
+    stats.addLongStat(Metric.TIME_DATA_PAGE_LOADS,
+        timeDataPageLoads.longValue());
+    stats.addLongStat(Metric.TIME_DATA_PAGE_DECODE,
+        timeDataPageDecode.longValue());
+    stats.addLongStat(Metric.TIME_DICT_PAGE_DECODE,
+        timeDictPageDecode.longValue());
+    stats.addLongStat(Metric.TIME_DICT_PAGES_DECOMPRESSED,
+        timeDictPagesDecompressed.longValue());
+    stats.addLongStat(Metric.TIME_DATA_PAGES_DECOMPRESSED,
+        timeDataPagesDecompressed.longValue());
+    stats.addLongStat(Metric.TIME_DISK_SCAN_WAIT,
+        timeDiskScanWait.longValue());
+    stats.addLongStat(Metric.TIME_DISK_SCAN, timeDiskScan.longValue());
+    stats.addLongStat(Metric.TIME_FIXEDCOLUMN_READ, timeFixedColumnRead.longValue());
+    stats.addLongStat(Metric.TIME_VARCOLUMN_READ, timeVarColumnRead.longValue());
+    stats.addLongStat(Metric.TIME_PROCESS, timeProcess.longValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/676ea889/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 4247d41..7d7c13b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -108,7 +108,7 @@ public class ParquetReaderUtility {
   }
 
   public static void checkDecimalTypeEnabled(OptionManager options) {
-    if (options.getOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY).bool_val == false) {
+    if (! options.getOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE)) {
       throw UserException.unsupportedError()
         .message(ExecErrorConstants.DECIMAL_DISABLE_ERR_MSG)
         .build(logger);

http://git-wip-us.apache.org/repos/asf/drill/blob/676ea889/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
new file mode 100644
index 0000000..651c813
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
+/**
+ * Base strategy for reading a batch of Parquet records.
+ */
+public abstract class BatchReader {
+
+  protected final ReadState readState;
+
+  public BatchReader(ReadState readState) {
+    this.readState = readState;
+  }
+
+  public int readBatch() throws Exception {
+    ColumnReader<?> firstColumnStatus = readState.getFirstColumnReader();
+    long recordsToRead = Math.min(getReadCount(firstColumnStatus), readState.getRecordsToRead());
+    int readCount = readRecords(firstColumnStatus, recordsToRead);
+    readState.fillNullVectors(readCount);
+    return readCount;
+  }
+
+  protected abstract long getReadCount(ColumnReader<?> firstColumnStatus);
+
+  protected abstract int readRecords(ColumnReader<?> firstColumnStatus, long recordsToRead) throws Exception;
+
+  protected void readAllFixedFields(long recordsToRead) throws Exception {
+    Stopwatch timer = Stopwatch.createStarted();
+    if(readState.useAsyncColReader()){
+      readAllFixedFieldsParallel(recordsToRead);
+    } else {
+      readAllFixedFieldsSerial(recordsToRead);
+    }
+    readState.parquetReaderStats().timeFixedColumnRead.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
+  }
+
+  protected void readAllFixedFieldsSerial(long recordsToRead) throws IOException {
+    for (ColumnReader<?> crs : readState.getColumnReaders()) {
+      crs.processPages(recordsToRead);
+    }
+  }
+
+  protected void readAllFixedFieldsParallel(long recordsToRead) throws Exception {
+    ArrayList<Future<Long>> futures = Lists.newArrayList();
+    for (ColumnReader<?> crs : readState.getColumnReaders()) {
+      Future<Long> f = crs.processPagesAsync(recordsToRead);
+      futures.add(f);
+    }
+    Exception exception = null;
+    for(Future<Long> f: futures){
+      if (exception != null) {
+        f.cancel(true);
+      } else {
+        try {
+          f.get();
+        } catch (Exception e) {
+          f.cancel(true);
+          exception = e;
+        }
+      }
+    }
+    if (exception != null) {
+      throw exception;
+    }
+  }
+
+  /**
+   * Strategy for reading mock records. Mock records appear to occur in the case
+   * in which the query has SELECT a, b, but the Parquet file has only c, d.
+   * A mock scan reads dummy columns for all records to ensure that the batch
+   * contains a record for each Parquet record, but with no data per record.
+   * (This explanation is reverse-engineered from the code and may be wrong.
+   * Caveat emptor!)
+   */
+
+  public static class MockBatchReader extends BatchReader {
+
+    public MockBatchReader(ReadState readState) {
+      super(readState);
+    }
+
+    @Override
+    protected long getReadCount(ColumnReader<?> firstColumnStatus) {
+      if (readState.recordsRead() == readState.schema().getGroupRecordCount()) {
+        return 0;
+      }
+      return Math.min(ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH,
+                      readState.schema().getGroupRecordCount() - readState.recordsRead());
+    }
+
+    @Override
+    protected int readRecords(ColumnReader<?> firstColumnStatus, long recordsToRead) {
+      readState.updateCounts((int) recordsToRead);
+      return (int) recordsToRead;
+    }
+  }
+
+  /**
+   * Strategy for reading a record batch when all columns are
+   * fixed-width.
+   */
+
+  public static class FixedWidthReader extends BatchReader {
+
+    public FixedWidthReader(ReadState readState) {
+      super(readState);
+    }
+
+    @Override
+    protected long getReadCount(ColumnReader<?> firstColumnStatus) {
+      return Math.min(readState.schema().getRecordsPerBatch(),
+                      firstColumnStatus.columnChunkMetaData.getValueCount() - firstColumnStatus.totalValuesRead);
+    }
+
+    @Override
+    protected int readRecords(ColumnReader<?> firstColumnStatus, long recordsToRead) throws Exception {
+      readAllFixedFields(recordsToRead);
+      return firstColumnStatus.getRecordsReadInCurrentPass();
+    }
+  }
+
+  /**
+   * Strategy for reading a record batch when at last one column is
+   * variable width.
+   */
+
+  public static class VariableWidthReader extends BatchReader {
+
+    public VariableWidthReader(ReadState readState) {
+      super(readState);
+    }
+
+    @Override
+    protected long getReadCount(ColumnReader<?> firstColumnStatus) {
+      return ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
+    }
+
+    @Override
+    protected int readRecords(ColumnReader<?> firstColumnStatus, long recordsToRead) throws Exception {
+      long fixedRecordsToRead = readState.varLengthReader().readFields(recordsToRead);
+      readAllFixedFields(fixedRecordsToRead);
+      return firstColumnStatus.getRecordsReadInCurrentPass();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/676ea889/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index 5eaf286..98e1d78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -101,11 +101,9 @@ public abstract class ColumnReader<V extends ValueVector> {
     }
     if (columnDescriptor.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
       if (columnDescriptor.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
-        // Here "bits" means "bytes"
         dataTypeLengthInBits = columnDescriptor.getTypeLength() * 8;
       } else {
-        // While here, "bits" means "bits"
-        dataTypeLengthInBits = ParquetRecordReader.getTypeLengthInBits(columnDescriptor.getType());
+        dataTypeLengthInBits = ParquetColumnMetadata.getTypeLengthInBits(columnDescriptor.getType());
       }
     }
     if(threadPool == null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/676ea889/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
index 6db7110..fa21dfa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/drill/blob/676ea889/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
new file mode 100644
index 0000000..bbdf246
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.util.Map;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+/**
+ * Represents a single column read from the Parquet file by the record reader.
+ */
+
+public class ParquetColumnMetadata {
+
+  ColumnDescriptor column;
+  private SchemaElement se;
+  MaterializedField field;
+  int length;
+  private MajorType type;
+  ColumnChunkMetaData columnChunkMetaData;
+  private ValueVector vector;
+
+  public ParquetColumnMetadata(ColumnDescriptor column) {
+    this.column = column;
+  }
+
+  public void resolveDrillType(Map<String, SchemaElement> schemaElements, OptionManager options) {
+    se = schemaElements.get(column.getPath()[0]);
+    type = ParquetToDrillTypeConverter.toMajorType(column.getType(), se.getType_length(),
+        getDataMode(column), se, options);
+    field = MaterializedField.create(toFieldName(column.getPath()), type);
+    length = getDataTypeLength();
+  }
+
+  private String toFieldName(String[] paths) {
+    return SchemaPath.getCompoundPath(paths).getAsUnescapedPath();
+  }
+
+  private TypeProtos.DataMode getDataMode(ColumnDescriptor column) {
+    if (isRepeated()) {
+      return DataMode.REPEATED;
+    } else if (column.getMaxDefinitionLevel() == 0) {
+      return TypeProtos.DataMode.REQUIRED;
+    } else {
+      return TypeProtos.DataMode.OPTIONAL;
+    }
+  }
+
+  /**
+   * @param type
+   * @param type a fixed length type from the parquet library enum
+   * @return the length in pageDataByteArray of the type
+   */
+  public static int getTypeLengthInBits(PrimitiveTypeName type) {
+    switch (type) {
+      case INT64:   return 64;
+      case INT32:   return 32;
+      case BOOLEAN: return 1;
+      case FLOAT:   return 32;
+      case DOUBLE:  return 64;
+      case INT96:   return 96;
+      // binary and fixed length byte array
+      default:
+        throw new IllegalStateException("Length cannot be determined for type " + type);
+    }
+  }
+
+  public static final int UNDEFINED_LENGTH = -1;
+
+  /**
+   * Returns data type length for a given {@see ColumnDescriptor} and it's corresponding
+   * {@see SchemaElement}. Neither is enough information alone as the max
+   * repetition level (indicating if it is an array type) is in the ColumnDescriptor and
+   * the length of a fixed width field is stored at the schema level.
+   *
+   * @return the length if fixed width, else <tt>UNDEFINED_LENGTH</tt> (-1)
+   */
+  private int getDataTypeLength() {
+    if (! isFixedLength()) {
+      return UNDEFINED_LENGTH;
+    } else if (isRepeated()) {
+      return UNDEFINED_LENGTH;
+    } else if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+      return se.getType_length() * 8;
+    } else {
+      return getTypeLengthInBits(column.getType());
+    }
+  }
+
+  public boolean isFixedLength( ) {
+    return column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
+  }
+
+  public boolean isRepeated() {
+    return column.getMaxRepetitionLevel() > 0;
+  }
+
+  ValueVector buildVector(OutputMutator output) throws SchemaChangeException {
+    Class<? extends ValueVector> vectorClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode());
+    vector = output.addField(field, vectorClass);
+    return vector;
+  }
+
+  ColumnReader<?> makeFixedWidthReader(ParquetRecordReader reader, int recordsPerBatch) throws Exception {
+    return ColumnReaderFactory.createFixedColumnReader(reader, true,
+        column, columnChunkMetaData, recordsPerBatch, vector, se);
+  }
+
+  @SuppressWarnings("resource")
+  FixedWidthRepeatedReader makeRepeatedFixedWidthReader(ParquetRecordReader reader, int recordsPerBatch) throws Exception {
+    final RepeatedValueVector repeatedVector = RepeatedValueVector.class.cast(vector);
+    ColumnReader<?> dataReader = ColumnReaderFactory.createFixedColumnReader(reader, true,
+        column, columnChunkMetaData, recordsPerBatch,
+        repeatedVector.getDataVector(), se);
+    return new FixedWidthRepeatedReader(reader, dataReader,
+        getTypeLengthInBits(column.getType()), UNDEFINED_LENGTH, column, columnChunkMetaData, false, repeatedVector, se);
+  }
+
+  VarLengthValuesColumn<?> makeVariableWidthReader(ParquetRecordReader reader) throws ExecutionSetupException {
+    return ColumnReaderFactory.getReader(reader, UNDEFINED_LENGTH, column, columnChunkMetaData, false, vector, se);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/676ea889/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 93c1214..cb75cfc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,50 +17,31 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.parquet.ParquetReaderStats;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.format.SchemaElement;
 import org.apache.parquet.hadoop.CodecFactory;
-import org.apache.parquet.hadoop.metadata.BlockMetaData;
-import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.schema.PrimitiveType;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 
 public class ParquetRecordReader extends AbstractRecordReader {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
@@ -69,11 +50,11 @@ public class ParquetRecordReader extends AbstractRecordReader {
   private static final int NUMBER_OF_VECTORS = 1;
   private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * NUMBER_OF_VECTORS; // 256kb
   private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb
-  private static final char DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH = 32*1024; // 32K
-  private static final int DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH = 64*1024 - 1; // 64K - 1, max SV2 can address
-  private static final int NUM_RECORDS_TO_READ_NOT_SPECIFIED = -1;
+  static final char DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH = 32*1024; // 32K
+  static final int DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH = 64*1024 - 1; // 64K - 1, max SV2 can address
+  static final int NUM_RECORDS_TO_READ_NOT_SPECIFIED = -1;
 
-  // When no column is required by the downstrea operator, ask SCAN to return a DEFAULT column. If such column does not exist,
+  // When no column is required by the downstream operator, ask SCAN to return a DEFAULT column. If such column does not exist,
   // it will return as a nullable-int column. If that column happens to exist, return that column.
   protected static final List<SchemaPath> DEFAULT_COLS_TO_READ = ImmutableList.of(SchemaPath.getSimplePath("_DEFAULT_COL_TO_READ_"));
 
@@ -85,37 +66,23 @@ public class ParquetRecordReader extends AbstractRecordReader {
   // used for clearing the first n bits of a byte
   public static final byte[] startBitMasks = {127, 63, 31, 15, 7, 3, 1};
 
-  private int bitWidthAllFixedFields;
-  private boolean allFieldsFixedLength;
-  private int recordsPerBatch;
   private OperatorContext operatorContext;
 
-  private List<ColumnReader<?>> columnStatuses;
   private FileSystem fileSystem;
   private final long batchSize;
   private long numRecordsToRead; // number of records to read
 
   Path hadoopPath;
-  private VarLenBinaryReader varLengthReader;
   private ParquetMetadata footer;
-  // This is a parallel list to the columns list above, it is used to determine the subset of the project
-  // pushdown columns that do not appear in this file
-  private boolean[] columnsFound;
-  // For columns not found in the file, we need to return a schema element with the correct number of values
-  // at that position in the schema. Currently this requires a vector be present. Here is a list of all of these vectors
-  // that need only have their value count set at the end of each call to next(), as the values default to null.
-  private List<NullableIntVector> nullFilledVectors;
-  // Keeps track of the number of records returned in the case where only columns outside of the file were selected.
-  // No actual data needs to be read out of the file, we only need to return batches until we have 'read' the number of
-  // records specified in the row group metadata
-  long mockRecordsRead;
 
   private final CodecFactory codecFactory;
   int rowGroupIndex;
-  long totalRecordsRead;
   private final FragmentContext fragmentContext;
   ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus;
 
+  ParquetSchema schema;
+  ReadState readState;
+
   public boolean useAsyncColReader;
   public boolean useAsyncPageReader;
   public boolean useBufferedReader;
@@ -127,8 +94,8 @@ public class ParquetRecordReader extends AbstractRecordReader {
   @SuppressWarnings("unused")
   private String name;
 
-
   public ParquetReaderStats parquetReaderStats = new ParquetReaderStats();
+  private BatchReader batchReader;
 
   public enum Metric implements MetricDef {
     NUM_DICT_PAGE_LOADS,         // Number of dictionary pages read
@@ -203,13 +170,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
     this.footer = footer;
     this.dateCorruptionStatus = dateCorruptionStatus;
     this.fragmentContext = fragmentContext;
-    // Callers can pass -1 if they want to read all rows.
-    if (numRecordsToRead == NUM_RECORDS_TO_READ_NOT_SPECIFIED) {
-      this.numRecordsToRead = footer.getBlocks().get(rowGroupIndex).getRowCount();
-    } else {
-      assert (numRecordsToRead >= 0);
-      this.numRecordsToRead = Math.min(numRecordsToRead, footer.getBlocks().get(rowGroupIndex).getRowCount());
-    }
+    this.numRecordsToRead = numRecordsToRead;
     useAsyncColReader =
         fragmentContext.getOptions().getOption(ExecConstants.PARQUET_COLUMNREADER_ASYNC).bool_val;
     useAsyncPageReader =
@@ -255,50 +216,13 @@ public class ParquetRecordReader extends AbstractRecordReader {
   }
 
   public int getBitWidthAllFixedFields() {
-    return bitWidthAllFixedFields;
+    return schema.getBitWidthAllFixedFields();
   }
 
   public long getBatchSize() {
     return batchSize;
   }
 
-  /**
-   * @param type a fixed length type from the parquet library enum
-   * @return the length in pageDataByteArray of the type
-   */
-  public static int getTypeLengthInBits(PrimitiveType.PrimitiveTypeName type) {
-    switch (type) {
-      case INT64:   return 64;
-      case INT32:   return 32;
-      case BOOLEAN: return 1;
-      case FLOAT:   return 32;
-      case DOUBLE:  return 64;
-      case INT96:   return 96;
-      // binary and fixed length byte array
-      default:
-        throw new IllegalStateException("Length cannot be determined for type " + type);
-    }
-  }
-
-  private boolean fieldSelected(MaterializedField field) {
-    // TODO - not sure if this is how we want to represent this
-    // for now it makes the existing tests pass, simply selecting
-    // all available data if no columns are provided
-    if (isStarQuery()) {
-      return true;
-    }
-
-    int i = 0;
-    for (SchemaPath expr : getColumns()) {
-      if ( field.getPath().equalsIgnoreCase(expr.getAsUnescapedPath())) {
-        columnsFound[i] = true;
-        return true;
-      }
-      i++;
-    }
-    return false;
-  }
-
   public OperatorContext getOperatorContext() {
     return operatorContext;
   }
@@ -308,163 +232,50 @@ public class ParquetRecordReader extends AbstractRecordReader {
   }
 
   /**
-   * Returns data type length for a given {@see ColumnDescriptor} and it's corresponding
-   * {@see SchemaElement}. Neither is enough information alone as the max
-   * repetition level (indicating if it is an array type) is in the ColumnDescriptor and
-   * the length of a fixed width field is stored at the schema level.
-   *
-   * @return the length if fixed width, else -1
+   * Prepare the Parquet reader. First determine the set of columns to read (the schema
+   * for this read.) Then, create a state object to track the read across calls to
+   * the reader <tt>next()</tt> method. Finally, create one of three readers to
+   * read batches depending on whether this scan is for only fixed-width fields,
+   * contains at least one variable-width field, or is a "mock" scan consisting
+   * only of null fields (fields in the SELECT clause but not in the Parquet file.)
    */
-  private int getDataTypeLength(ColumnDescriptor column, SchemaElement se) {
-    if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
-      if (column.getMaxRepetitionLevel() > 0) {
-        return -1;
-      }
-      if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
-        return se.getType_length() * 8;
-      } else {
-        return getTypeLengthInBits(column.getType());
-      }
-    } else {
-      return -1;
-    }
-  }
 
-  @SuppressWarnings({ "resource", "unchecked" })
   @Override
   public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException {
     this.operatorContext = operatorContext;
-    if (!isStarQuery()) {
-      columnsFound = new boolean[getColumns().size()];
-      nullFilledVectors = new ArrayList<>();
-    }
-    columnStatuses = new ArrayList<>();
-    List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns();
-    allFieldsFixedLength = true;
-    ColumnDescriptor column;
-    ColumnChunkMetaData columnChunkMetaData;
-    int columnsToScan = 0;
-    mockRecordsRead = 0;
-
-    MaterializedField field;
+    schema = new ParquetSchema(fragmentContext.getOptions(), rowGroupIndex, footer, isStarQuery() ? null : getColumns());
 
     logger.debug("Reading row group({}) with {} records in file {}.", rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(),
         hadoopPath.toUri().getPath());
-    totalRecordsRead = 0;
-
-    // TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below
-    // store a map from column name to converted types if they are non-null
-    Map<String, SchemaElement> schemaElements = ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
-
-    // loop to add up the length of the fixed width columns and build the schema
-    for (int i = 0; i < columns.size(); ++i) {
-      column = columns.get(i);
-      SchemaElement se = schemaElements.get(column.getPath()[0]);
-      MajorType mt = ParquetToDrillTypeConverter.toMajorType(column.getType(), se.getType_length(),
-          getDataMode(column), se, fragmentContext.getOptions());
-      field = MaterializedField.create(toFieldName(column.getPath()), mt);
-      if ( ! fieldSelected(field)) {
-        continue;
-      }
-      columnsToScan++;
-      int dataTypeLength = getDataTypeLength(column, se);
-      if (dataTypeLength == -1) {
-        allFieldsFixedLength = false;
-      } else {
-        bitWidthAllFixedFields += dataTypeLength;
-      }
-    }
-
-    if (columnsToScan != 0  && allFieldsFixedLength) {
-      recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields,
-          footer.getBlocks().get(0).getColumns().get(0).getValueCount()), DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH);
-    }
-    else {
-      recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
-    }
 
     try {
-      ValueVector vector;
-      SchemaElement schemaElement;
-      final ArrayList<VarLengthColumn<? extends ValueVector>> varLengthColumns = new ArrayList<>();
-      // initialize all of the column read status objects
-      boolean fieldFixedLength;
-      // the column chunk meta-data is not guaranteed to be in the same order as the columns in the schema
-      // a map is constructed for fast access to the correct columnChunkMetadata to correspond
-      // to an element in the schema
-      Map<String, Integer> columnChunkMetadataPositionsInList = new HashMap<>();
-      BlockMetaData rowGroupMetadata = footer.getBlocks().get(rowGroupIndex);
-
-      int colChunkIndex = 0;
-      for (ColumnChunkMetaData colChunk : rowGroupMetadata.getColumns()) {
-        columnChunkMetadataPositionsInList.put(Arrays.toString(colChunk.getPath().toArray()), colChunkIndex);
-        colChunkIndex++;
-      }
-      for (int i = 0; i < columns.size(); ++i) {
-        column = columns.get(i);
-        columnChunkMetaData = rowGroupMetadata.getColumns().get(columnChunkMetadataPositionsInList.get(Arrays.toString(column.getPath())));
-        schemaElement = schemaElements.get(column.getPath()[0]);
-        MajorType type = ParquetToDrillTypeConverter.toMajorType(column.getType(), schemaElement.getType_length(),
-            getDataMode(column), schemaElement, fragmentContext.getOptions());
-        field = MaterializedField.create(toFieldName(column.getPath()), type);
-        // the field was not requested to be read
-        if ( ! fieldSelected(field)) {
-          continue;
-        }
-
-        fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
-        vector = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
-        if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
-          if (column.getMaxRepetitionLevel() > 0) {
-            final RepeatedValueVector repeatedVector = RepeatedValueVector.class.cast(vector);
-            ColumnReader<?> dataReader = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength,
-                column, columnChunkMetaData, recordsPerBatch,
-                repeatedVector.getDataVector(), schemaElement);
-            varLengthColumns.add(new FixedWidthRepeatedReader(this, dataReader,
-                getTypeLengthInBits(column.getType()), -1, column, columnChunkMetaData, false, repeatedVector, schemaElement));
-          }
-          else {
-
-           ColumnReader<?> cr = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength,
-                column, columnChunkMetaData, recordsPerBatch, vector,
-                schemaElement) ;
-            columnStatuses.add(cr);
-          }
-        } else {
-          // create a reader and add it to the appropriate list
-          varLengthColumns.add(ColumnReaderFactory.getReader(this, -1, column, columnChunkMetaData, false, vector, schemaElement));
-        }
-      }
-      varLengthReader = new VarLenBinaryReader(this, varLengthColumns);
-
-      if (!isStarQuery()) {
-        List<SchemaPath> projectedColumns = Lists.newArrayList(getColumns());
-        SchemaPath col;
-        for (int i = 0; i < columnsFound.length; i++) {
-          col = projectedColumns.get(i);
-          assert col!=null;
-          if ( ! columnsFound[i] && !col.equals(STAR_COLUMN)) {
-            nullFilledVectors.add((NullableIntVector)output.addField(MaterializedField.create(col.getAsUnescapedPath(),
-                    Types.optional(TypeProtos.MinorType.INT)),
-                (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(TypeProtos.MinorType.INT, DataMode.OPTIONAL)));
-
-          }
-        }
-      }
+      schema.buildSchema(batchSize);
+      readState = new ReadState(schema, parquetReaderStats, numRecordsToRead, useAsyncColReader);
+      readState.buildReader(this, output);
     } catch (Exception e) {
-      handleAndRaise("Failure in setting up reader", e);
+      throw handleException("Failure in setting up reader", e);
+    }
+
+    ColumnReader<?> firstColumnStatus = readState.getFirstColumnReader();
+    if (firstColumnStatus == null) {
+      batchReader = new BatchReader.MockBatchReader(readState);
+    } else if (schema.allFieldsFixedLength()) {
+      batchReader = new BatchReader.FixedWidthReader(readState);
+    } else {
+      batchReader = new BatchReader.VariableWidthReader(readState);
     }
   }
 
-  protected void handleAndRaise(String s, Exception e) {
+  protected DrillRuntimeException handleException(String s, Exception e) {
     String message = "Error in parquet record reader.\nMessage: " + s +
       "\nParquet Metadata: " + footer;
-    throw new DrillRuntimeException(message, e);
+    return new DrillRuntimeException(message, e);
   }
 
   @Override
   public void allocate(Map<String, ValueVector> vectorMap) throws OutOfMemoryException {
     try {
+      int recordsPerBatch = schema.getRecordsPerBatch();
       for (final ValueVector v : vectorMap.values()) {
         AllocationHelper.allocate(v, recordsPerBatch, 50, 10);
       }
@@ -473,251 +284,56 @@ public class ParquetRecordReader extends AbstractRecordReader {
     }
   }
 
-
-  private String toFieldName(String[] paths) {
-    return SchemaPath.getCompoundPath(paths).getAsUnescapedPath();
-  }
-
-  private TypeProtos.DataMode getDataMode(ColumnDescriptor column) {
-    if (column.getMaxRepetitionLevel() > 0 ) {
-      return DataMode.REPEATED;
-    } else if (column.getMaxDefinitionLevel() == 0) {
-      return TypeProtos.DataMode.REQUIRED;
-    } else {
-      return TypeProtos.DataMode.OPTIONAL;
-    }
-  }
-
-  private void resetBatch() {
-    for (final ColumnReader<?> column : columnStatuses) {
-      column.valuesReadInCurrentPass = 0;
-    }
-    for (final VarLengthColumn<?> r : varLengthReader.columns) {
-      r.valuesReadInCurrentPass = 0;
-    }
-  }
-
- public void readAllFixedFields(long recordsToRead) throws IOException {
-   Stopwatch timer = Stopwatch.createStarted();
-   if(useAsyncColReader){
-     readAllFixedFieldsParallel(recordsToRead) ;
-   } else {
-     readAllFixedFieldsSerial(recordsToRead);
-   }
-   parquetReaderStats.timeFixedColumnRead.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
- }
-
-  public void readAllFixedFieldsSerial(long recordsToRead) throws IOException {
-    for (ColumnReader<?> crs : columnStatuses) {
-      crs.processPages(recordsToRead);
-    }
-  }
-
-  public void readAllFixedFieldsParallel(long recordsToRead) throws IOException {
-    ArrayList<Future<Long>> futures = Lists.newArrayList();
-    for (ColumnReader<?> crs : columnStatuses) {
-      Future<Long> f = crs.processPagesAsync(recordsToRead);
-      futures.add(f);
-    }
-    Exception exception = null;
-    for(Future<Long> f: futures){
-      if(exception != null) {
-        f.cancel(true);
-      } else {
-        try {
-          f.get();
-        } catch (Exception e) {
-          f.cancel(true);
-          exception = e;
-        }
-      }
-    }
-    if(exception != null){
-      handleAndRaise(null, exception);
-    }
-  }
+  /**
+   * Read the next record batch from the file using the reader and read state
+   * created previously.
+   */
 
   @Override
   public int next() {
-    resetBatch();
-    long recordsToRead = 0;
+    readState.resetBatch();
     Stopwatch timer = Stopwatch.createStarted();
     try {
-      ColumnReader<?> firstColumnStatus;
-      if (columnStatuses.size() > 0) {
-        firstColumnStatus = columnStatuses.iterator().next();
-      }
-      else{
-        if (varLengthReader.columns.size() > 0) {
-          firstColumnStatus = varLengthReader.columns.iterator().next();
-        }
-        else{
-          firstColumnStatus = null;
-        }
-      }
-      // No columns found in the file were selected, simply return a full batch of null records for each column requested
-      if (firstColumnStatus == null) {
-        if (mockRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount()) {
-          parquetReaderStats.timeProcess.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
-          return 0;
-        }
-        recordsToRead = Math.min(DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH, footer.getBlocks().get(rowGroupIndex).getRowCount() - mockRecordsRead);
-
-        // Pick the minimum of recordsToRead calculated above and numRecordsToRead (based on rowCount and limit).
-        recordsToRead = Math.min(recordsToRead, numRecordsToRead);
-
-        for (final ValueVector vv : nullFilledVectors ) {
-          vv.getMutator().setValueCount( (int) recordsToRead);
-        }
-        mockRecordsRead += recordsToRead;
-        totalRecordsRead += recordsToRead;
-        numRecordsToRead -= recordsToRead;
-        parquetReaderStats.timeProcess.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
-        return (int) recordsToRead;
-      }
-
-      if (allFieldsFixedLength) {
-        recordsToRead = Math.min(recordsPerBatch, firstColumnStatus.columnChunkMetaData.getValueCount() - firstColumnStatus.totalValuesRead);
-      } else {
-        recordsToRead = DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
-
-      }
-
-      // Pick the minimum of recordsToRead calculated above and numRecordsToRead (based on rowCount and limit)
-      recordsToRead = Math.min(recordsToRead, numRecordsToRead);
-
-      if (allFieldsFixedLength) {
-        readAllFixedFields(recordsToRead);
-      } else { // variable length columns
-        long fixedRecordsToRead = varLengthReader.readFields(recordsToRead);
-        readAllFixedFields(fixedRecordsToRead);
-      }
-
-      // if we have requested columns that were not found in the file fill their vectors with null
-      // (by simply setting the value counts inside of them, as they start null filled)
-      if (nullFilledVectors != null) {
-        for (final ValueVector vv : nullFilledVectors ) {
-          vv.getMutator().setValueCount(firstColumnStatus.getRecordsReadInCurrentPass());
-        }
-      }
-
-//      logger.debug("So far read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath());
-      totalRecordsRead += firstColumnStatus.getRecordsReadInCurrentPass();
-      numRecordsToRead -= firstColumnStatus.getRecordsReadInCurrentPass();
-      parquetReaderStats.timeProcess.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
-
-      return firstColumnStatus.getRecordsReadInCurrentPass();
+      return batchReader.readBatch();
     } catch (Exception e) {
-      handleAndRaise("\nHadoop path: " + hadoopPath.toUri().getPath() +
-        "\nTotal records read: " + totalRecordsRead +
-        "\nMock records read: " + mockRecordsRead +
-        "\nRecords to read: " + recordsToRead +
+      throw handleException("\nHadoop path: " + hadoopPath.toUri().getPath() +
+        "\nTotal records read: " + readState.recordsRead() +
         "\nRow group index: " + rowGroupIndex +
         "\nRecords in row group: " + footer.getBlocks().get(rowGroupIndex).getRowCount(), e);
+    } finally {
+      parquetReaderStats.timeProcess.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
     }
-
-    // this is never reached
-    return 0;
   }
 
   @Override
   public void close() {
-    logger.debug("Read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex,
+    long recordsRead = (readState == null) ? 0 : readState.recordsRead();
+    logger.debug("Read {} records out of row group({}) in file '{}'",
+        recordsRead, rowGroupIndex,
         hadoopPath.toUri().getPath());
     // enable this for debugging when it is know that a whole file will be read
     // limit kills upstream operators once it has enough records, so this assert will fail
 //    assert totalRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount();
-    if (columnStatuses != null) {
-      for (final ColumnReader<?> column : columnStatuses) {
-        column.clear();
-      }
-      columnStatuses.clear();
-      columnStatuses = null;
+    if (readState != null) {
+      readState.close();
+      readState = null;
     }
 
     codecFactory.release();
 
-    if (varLengthReader != null) {
-      for (final VarLengthColumn<?> r : varLengthReader.columns) {
-        r.clear();
-      }
-      varLengthReader.columns.clear();
-      varLengthReader = null;
-    }
-
-
-    if(parquetReaderStats != null) {
+    if (parquetReaderStats != null) {
       updateStats();
-      logger.trace(
-          "ParquetTrace,Summary,{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
-          hadoopPath,
-          parquetReaderStats.numDictPageLoads,
-          parquetReaderStats.numDataPageLoads,
-          parquetReaderStats.numDataPagesDecoded,
-          parquetReaderStats.numDictPagesDecompressed,
-          parquetReaderStats.numDataPagesDecompressed,
-          parquetReaderStats.totalDictPageReadBytes,
-          parquetReaderStats.totalDataPageReadBytes,
-          parquetReaderStats.totalDictDecompressedBytes,
-          parquetReaderStats.totalDataDecompressedBytes,
-          parquetReaderStats.timeDictPageLoads,
-          parquetReaderStats.timeDataPageLoads,
-          parquetReaderStats.timeDataPageDecode,
-          parquetReaderStats.timeDictPageDecode,
-          parquetReaderStats.timeDictPagesDecompressed,
-          parquetReaderStats.timeDataPagesDecompressed,
-          parquetReaderStats.timeDiskScanWait,
-          parquetReaderStats.timeDiskScan,
-          parquetReaderStats.timeFixedColumnRead,
-          parquetReaderStats.timeVarColumnRead
-      );
-      parquetReaderStats=null;
+      parquetReaderStats.logStats(logger, hadoopPath);
+      parquetReaderStats = null;
     }
-
   }
 
-  private void updateStats(){
-
-    operatorContext.getStats().addLongStat(Metric.NUM_DICT_PAGE_LOADS,
-        parquetReaderStats.numDictPageLoads.longValue());
-    operatorContext.getStats().addLongStat(Metric.NUM_DATA_PAGE_lOADS, parquetReaderStats.numDataPageLoads.longValue());
-    operatorContext.getStats().addLongStat(Metric.NUM_DATA_PAGES_DECODED, parquetReaderStats.numDataPagesDecoded.longValue());
-    operatorContext.getStats().addLongStat(Metric.NUM_DICT_PAGES_DECOMPRESSED,
-        parquetReaderStats.numDictPagesDecompressed.longValue());
-    operatorContext.getStats().addLongStat(Metric.NUM_DATA_PAGES_DECOMPRESSED,
-        parquetReaderStats.numDataPagesDecompressed.longValue());
-    operatorContext.getStats().addLongStat(Metric.TOTAL_DICT_PAGE_READ_BYTES,
-        parquetReaderStats.totalDictPageReadBytes.longValue());
-    operatorContext.getStats().addLongStat(Metric.TOTAL_DATA_PAGE_READ_BYTES,
-        parquetReaderStats.totalDataPageReadBytes.longValue());
-    operatorContext.getStats().addLongStat(Metric.TOTAL_DICT_DECOMPRESSED_BYTES,
-        parquetReaderStats.totalDictDecompressedBytes.longValue());
-    operatorContext.getStats().addLongStat(Metric.TOTAL_DATA_DECOMPRESSED_BYTES,
-        parquetReaderStats.totalDataDecompressedBytes.longValue());
-    operatorContext.getStats().addLongStat(Metric.TIME_DICT_PAGE_LOADS,
-        parquetReaderStats.timeDictPageLoads.longValue());
-    operatorContext.getStats().addLongStat(Metric.TIME_DATA_PAGE_LOADS,
-        parquetReaderStats.timeDataPageLoads.longValue());
-    operatorContext.getStats().addLongStat(Metric.TIME_DATA_PAGE_DECODE,
-        parquetReaderStats.timeDataPageDecode.longValue());
-    operatorContext.getStats().addLongStat(Metric.TIME_DICT_PAGE_DECODE,
-        parquetReaderStats.timeDictPageDecode.longValue());
-    operatorContext.getStats().addLongStat(Metric.TIME_DICT_PAGES_DECOMPRESSED,
-        parquetReaderStats.timeDictPagesDecompressed.longValue());
-    operatorContext.getStats().addLongStat(Metric.TIME_DATA_PAGES_DECOMPRESSED,
-        parquetReaderStats.timeDataPagesDecompressed.longValue());
-    operatorContext.getStats().addLongStat(Metric.TIME_DISK_SCAN_WAIT,
-        parquetReaderStats.timeDiskScanWait.longValue());
-    operatorContext.getStats().addLongStat(Metric.TIME_DISK_SCAN, parquetReaderStats.timeDiskScan.longValue());
-    operatorContext.getStats().addLongStat(Metric.TIME_FIXEDCOLUMN_READ, parquetReaderStats.timeFixedColumnRead.longValue());
-    operatorContext.getStats().addLongStat(Metric.TIME_VARCOLUMN_READ, parquetReaderStats.timeVarColumnRead.longValue());
-    operatorContext.getStats().addLongStat(Metric.TIME_PROCESS, parquetReaderStats.timeProcess.longValue());
-
+  private void updateStats() {
+    parquetReaderStats.update(operatorContext.getStats());
   }
 
   @Override
   protected List<SchemaPath> getDefaultColumnsToRead() {
     return DEFAULT_COLS_TO_READ;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/676ea889/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
new file mode 100644
index 0000000..ab4b1b8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Mapping from the schema of the Parquet file to that of the record reader
+ * to the schema that Drill and the Parquet reader uses.
+ */
+
+public class ParquetSchema {
+  /**
+   * Set of columns specified in the SELECT clause. Will be null for
+   * a SELECT * query.
+   */
+  private final Collection<SchemaPath> selectedCols;
+
+  /**
+   * Parallel list to the columns list above, it is used to determine the subset of the project
+   * pushdown columns that do not appear in this file.
+   */
+  private final boolean[] columnsFound;
+  private final OptionManager options;
+  private final int rowGroupIndex;
+  private final ParquetMetadata footer;
+
+  /**
+   * List of metadata for selected columns. This list does two things.
+   * First, it identifies the Parquet columns we wish to select. Second, it
+   * provides metadata for those columns. Note that null columns (columns
+   * in the SELECT clause but not in the file) appear elsewhere.
+   */
+  private List<ParquetColumnMetadata> selectedColumnMetadata = new ArrayList<>();
+  private int bitWidthAllFixedFields;
+  private boolean allFieldsFixedLength;
+  private long groupRecordCount;
+  private int recordsPerBatch;
+
+  /**
+   * Build the Parquet schema. The schema can be based on a "SELECT *",
+   * meaning we want all columns defined in the Parquet file. In this case,
+   * the list of selected columns is null. Or, the query can be based on
+   * an explicit list of selected columns. In this case, the
+   * columns need not exist in the Parquet file. If a column does not exist,
+   * the reader returns null for that column. If no selected column exists
+   * in the file, then we return "mock" records: records with only null
+   * values, but repeated for the number of rows in the Parquet file.
+   *
+   * @param options session options
+   * @param rowGroupIndex row group to read
+   * @param selectedCols columns specified in the SELECT clause, or null if
+   * this is a SELECT * query
+   */
+
+  public ParquetSchema(OptionManager options, int rowGroupIndex, ParquetMetadata footer, Collection<SchemaPath> selectedCols) {
+    this.options = options;
+    this.rowGroupIndex = rowGroupIndex;
+    this.selectedCols = selectedCols;
+    this.footer = footer;
+    if (selectedCols == null) {
+      columnsFound = null;
+    } else {
+      columnsFound = new boolean[selectedCols.size()];
+    }
+  }
+
+  /**
+   * Build the schema for this read as a combination of the schema specified in
+   * the Parquet footer and the list of columns selected in the query.
+   *
+   * @param footer Parquet metadata
+   * @param batchSize target size of the batch, in rows
+   * @throws Exception if anything goes wrong
+   */
+
+  public void buildSchema(long batchSize) throws Exception {
+    groupRecordCount = footer.getBlocks().get(rowGroupIndex).getRowCount();
+    loadParquetSchema();
+    computeFixedPart();
+
+    if (! selectedColumnMetadata.isEmpty()  && allFieldsFixedLength) {
+      recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields,
+          footer.getBlocks().get(0).getColumns().get(0).getValueCount()), ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH);
+    }
+    else {
+      recordsPerBatch = ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
+    }
+  }
+
+  /**
+   * Scan the Parquet footer, then map each Parquet column to the list of columns
+   * we want to read. Track those to be read.
+   */
+
+  private void loadParquetSchema() {
+    // TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below
+    // store a map from column name to converted types if they are non-null
+    Map<String, SchemaElement> schemaElements = ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
+
+    // loop to add up the length of the fixed width columns and build the schema
+    for (ColumnDescriptor column : footer.getFileMetaData().getSchema().getColumns()) {
+      ParquetColumnMetadata columnMetadata = new ParquetColumnMetadata(column);
+      columnMetadata.resolveDrillType(schemaElements, options);
+      if (! fieldSelected(columnMetadata.field)) {
+        continue;
+      }
+      selectedColumnMetadata.add(columnMetadata);
+    }
+  }
+
+  /**
+   * Fixed-width fields are the easiest to plan. We know the size of each column,
+   * making it easy to determine the total length of each vector, once we know
+   * the target record count. A special reader is used in the fortunate case
+   * that all fields are fixed width.
+   */
+
+  private void computeFixedPart() {
+    allFieldsFixedLength = true;
+    for (ParquetColumnMetadata colMd : selectedColumnMetadata) {
+      if (colMd.isFixedLength()) {
+        bitWidthAllFixedFields += colMd.length;
+      } else {
+        allFieldsFixedLength = false;
+      }
+    }
+  }
+
+  public boolean isStarQuery() { return selectedCols == null; }
+  public ParquetMetadata footer() { return footer; }
+  public int getBitWidthAllFixedFields() { return bitWidthAllFixedFields; }
+  public int getRecordsPerBatch() { return recordsPerBatch; }
+  public boolean allFieldsFixedLength() { return allFieldsFixedLength; }
+  public List<ParquetColumnMetadata> getColumnMetadata() { return selectedColumnMetadata; }
+
+  /**
+   * Return the Parquet file row count.
+   *
+   * @return number of records in the Parquet row group
+   */
+
+  public long getGroupRecordCount() { return groupRecordCount; }
+
+  public BlockMetaData getRowGroupMetadata() {
+    return footer.getBlocks().get(rowGroupIndex);
+  }
+
+  /**
+   * Determine if a Parquet field is selected for the query. It is selected
+   * either if this is a star query (we want all columns), or the column
+   * appers in the select list.
+   *
+   * @param field the Parquet column expressed as as Drill field.
+   * @return true if the column is to be included in the scan, false
+   * if not
+   */
+
+  private boolean fieldSelected(MaterializedField field) {
+    // TODO - not sure if this is how we want to represent this
+    // for now it makes the existing tests pass, simply selecting
+    // all available data if no columns are provided
+    if (isStarQuery()) {
+      return true;
+    }
+
+    int i = 0;
+    for (SchemaPath expr : selectedCols) {
+      if ( field.getPath().equalsIgnoreCase(expr.getAsUnescapedPath())) {
+        columnsFound[i] = true;
+        return true;
+      }
+      i++;
+    }
+    return false;
+  }
+
+  /**
+   * Create "dummy" fields for columns which are selected in the SELECT clause, but not
+   * present in the Parquet schema.
+   * @param output the output container
+   * @throws SchemaChangeException should not occur
+   */
+
+  public void createNonExistentColumns(OutputMutator output, List<NullableIntVector> nullFilledVectors) throws SchemaChangeException {
+    List<SchemaPath> projectedColumns = Lists.newArrayList(selectedCols);
+    for (int i = 0; i < columnsFound.length; i++) {
+      SchemaPath col = projectedColumns.get(i);
+      assert col != null;
+      if ( ! columnsFound[i] && ! col.equals(ParquetRecordReader.STAR_COLUMN)) {
+        nullFilledVectors.add(createMissingColumn(col, output));
+      }
+    }
+  }
+
+  /**
+   * Create a "dummy" column for a missing field. The column is of type optional
+   * int, but will always be null.
+   *
+   * @param col the selected, but non-existent, schema path
+   * @param output the output container
+   * @return the value vector for the field
+   * @throws SchemaChangeException should not occur
+   */
+
+  private NullableIntVector createMissingColumn(SchemaPath col, OutputMutator output) throws SchemaChangeException {
+    MaterializedField field = MaterializedField.create(col.getAsUnescapedPath(),
+                          Types.optional(TypeProtos.MinorType.INT));
+    return (NullableIntVector) output.addField(field,
+              TypeHelper.getValueVectorClass(TypeProtos.MinorType.INT, DataMode.OPTIONAL));
+  }
+
+  Map<String, Integer> buildChunkMap(BlockMetaData rowGroupMetadata) {
+    // the column chunk meta-data is not guaranteed to be in the same order as the columns in the schema
+    // a map is constructed for fast access to the correct columnChunkMetadata to correspond
+    // to an element in the schema
+    Map<String, Integer> columnChunkMetadataPositionsInList = new HashMap<>();
+
+    int colChunkIndex = 0;
+    for (ColumnChunkMetaData colChunk : rowGroupMetadata.getColumns()) {
+      columnChunkMetadataPositionsInList.put(Arrays.toString(colChunk.getPath().toArray()), colChunkIndex);
+      colChunkIndex++;
+    }
+    return columnChunkMetadataPositionsInList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/676ea889/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
new file mode 100644
index 0000000..f94edf1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.parquet.ParquetReaderStats;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+
+/**
+ * Internal state for reading from a Parquet file. Tracks information
+ * required from one call of <tt>next()</tt> to the next.
+ * <p>
+ * At present, this is a bit of a muddle as it holds all read state.
+ * As such, this is a snapshot of a refactoring effort. Subsequent passes
+ * will move state into specific readers where possible.
+ */
+
+public class ReadState {
+  private final ParquetSchema schema;
+  private final ParquetReaderStats parquetReaderStats;
+  private VarLenBinaryReader varLengthReader;
+  /**
+   * For columns not found in the file, we need to return a schema element with the correct number of values
+   * at that position in the schema. Currently this requires a vector be present. Here is a list of all of these vectors
+   * that need only have their value count set at the end of each call to next(), as the values default to null.
+   */
+  private List<NullableIntVector> nullFilledVectors;
+  private List<ColumnReader<?>> columnReaders = new ArrayList<>();
+  private long numRecordsToRead; // number of records to read
+  /**
+   * Keeps track of the number of records read thus far.
+   * <p>
+   * Also keeps track of the number of records returned in the case where only columns outside of the file were selected.
+   * No actual data needs to be read out of the file, we only need to return batches until we have 'read' the number of
+   * records specified in the row group metadata.
+   */
+  private long totalRecordsRead;
+  private boolean useAsyncColReader;
+
+  public ReadState(ParquetSchema schema, ParquetReaderStats parquetReaderStats, long numRecordsToRead, boolean useAsyncColReader) {
+    this.schema = schema;
+    this.parquetReaderStats = parquetReaderStats;
+    this.useAsyncColReader = useAsyncColReader;
+    if (! schema.isStarQuery()) {
+      nullFilledVectors = new ArrayList<>();
+    }
+    // Callers can pass -1 if they want to read all rows.
+    if (numRecordsToRead == ParquetRecordReader.NUM_RECORDS_TO_READ_NOT_SPECIFIED) {
+      this.numRecordsToRead = schema.getGroupRecordCount();
+    } else {
+      assert (numRecordsToRead >= 0);
+      this.numRecordsToRead = Math.min(numRecordsToRead, schema.getGroupRecordCount());
+    }
+  }
+
+  /**
+   * Create the readers needed to read columns: fixed-length or variable length.
+   *
+   * @param reader
+   * @param output
+   * @throws Exception
+   */
+
+  @SuppressWarnings("unchecked")
+  public void buildReader(ParquetRecordReader reader, OutputMutator output) throws Exception {
+    final ArrayList<VarLengthColumn<? extends ValueVector>> varLengthColumns = new ArrayList<>();
+    // initialize all of the column read status objects
+    BlockMetaData rowGroupMetadata = schema.getRowGroupMetadata();
+    Map<String, Integer> columnChunkMetadataPositionsInList = schema.buildChunkMap(rowGroupMetadata);
+    for (ParquetColumnMetadata columnMetadata : schema.getColumnMetadata()) {
+      ColumnDescriptor column = columnMetadata.column;
+      columnMetadata.columnChunkMetaData = rowGroupMetadata.getColumns().get(
+                      columnChunkMetadataPositionsInList.get(Arrays.toString(column.getPath())));
+      columnMetadata.buildVector(output);
+      if (! columnMetadata.isFixedLength( )) {
+        // create a reader and add it to the appropriate list
+        varLengthColumns.add(columnMetadata.makeVariableWidthReader(reader));
+      } else if (columnMetadata.isRepeated()) {
+        varLengthColumns.add(columnMetadata.makeRepeatedFixedWidthReader(reader, schema.getRecordsPerBatch()));
+      }
+      else {
+        columnReaders.add(columnMetadata.makeFixedWidthReader(reader, schema.getRecordsPerBatch()));
+      }
+    }
+    varLengthReader = new VarLenBinaryReader(reader, varLengthColumns);
+    if (! schema.isStarQuery()) {
+      schema.createNonExistentColumns(output, nullFilledVectors);
+    }
+  }
+
+  /**
+   * Several readers use the first column reader to get information about the whole
+   * record or group (such as row count.)
+   *
+   * @return the reader for the first column
+   */
+
+  public ColumnReader<?> getFirstColumnReader() {
+    if (columnReaders.size() > 0) {
+      return columnReaders.get(0);
+    }
+    else if (varLengthReader.columns.size() > 0) {
+      return varLengthReader.columns.get(0);
+    } else {
+      return null;
+    }
+  }
+
+  public void resetBatch() {
+    for (final ColumnReader<?> column : columnReaders) {
+      column.valuesReadInCurrentPass = 0;
+    }
+    for (final VarLengthColumn<?> r : varLengthReader.columns) {
+      r.valuesReadInCurrentPass = 0;
+    }
+  }
+
+  public ParquetSchema schema() { return schema; }
+  public List<ColumnReader<?>> getColumnReaders() { return columnReaders; }
+  public long recordsRead() { return totalRecordsRead; }
+  public VarLenBinaryReader varLengthReader() { return varLengthReader; }
+  public long getRecordsToRead() { return numRecordsToRead; }
+  public boolean useAsyncColReader() { return useAsyncColReader; }
+  public ParquetReaderStats parquetReaderStats() { return parquetReaderStats; }
+
+  /**
+   * When the SELECT clause references columns that do not exist in the Parquet
+   * file, we don't issue an error; instead we simply make up a column and
+   * fill it with nulls. This method does the work of null-filling the made-up
+   * vectors.
+   *
+   * @param readCount the number of rows read in the present record batch,
+   * which is the number of null column values to create
+   */
+
+  public void fillNullVectors(int readCount) {
+
+    // if we have requested columns that were not found in the file fill their vectors with null
+    // (by simply setting the value counts inside of them, as they start null filled)
+
+    if (nullFilledVectors != null) {
+      for (final ValueVector vv : nullFilledVectors ) {
+        vv.getMutator().setValueCount(readCount);
+      }
+    }
+  }
+
+  public void updateCounts(int readCount) {
+    totalRecordsRead += readCount;
+    numRecordsToRead -= readCount;
+  }
+
+  public void close() {
+    if (columnReaders != null) {
+      for (final ColumnReader<?> column : columnReaders) {
+        column.clear();
+      }
+      columnReaders.clear();
+      columnReaders = null;
+    }
+    if (varLengthReader != null) {
+      for (final VarLengthColumn<? extends ValueVector> r : varLengthReader.columns) {
+        r.clear();
+      }
+      varLengthReader.columns.clear();
+      varLengthReader = null;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/676ea889/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index 79dc740..5c8db91 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.expr.holders.VarCharHolder;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetColumnMetadata;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.util.DecimalUtility;
 import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl;
@@ -169,6 +170,7 @@ public class DrillParquetGroupConverter extends GroupConverter {
     }
   }
 
+  @SuppressWarnings("resource")
   private PrimitiveConverter getConverterForType(String name, PrimitiveType type) {
 
     switch(type.getPrimitiveTypeName()) {
@@ -236,7 +238,7 @@ public class DrillParquetGroupConverter extends GroupConverter {
             return new DrillFixedBinaryToTimeStampConverter(writer);
           } else {
             VarBinaryWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).varBinary() : mapWriter.varBinary(name);
-            return new DrillFixedBinaryToVarbinaryConverter(writer, ParquetRecordReader.getTypeLengthInBits(type.getPrimitiveTypeName()) / 8, mutator.getManagedBuffer());
+            return new DrillFixedBinaryToVarbinaryConverter(writer, ParquetColumnMetadata.getTypeLengthInBits(type.getPrimitiveTypeName()) / 8, mutator.getManagedBuffer());
           }
         }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/676ea889/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetInternalsTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetInternalsTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetInternalsTest.java
new file mode 100644
index 0000000..60e466d
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetInternalsTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.TestBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.FixtureBuilder;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ParquetInternalsTest extends ClusterTest {
+
+  @BeforeClass
+  public static void setup( ) throws Exception {
+    FixtureBuilder builder = ClusterFixture.builder()
+      // Set options, etc.
+      ;
+    startCluster(builder);
+  }
+
+  @Test
+  public void testFixedWidth() throws Exception {
+    String sql = "SELECT l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity\n" +
+                 "FROM `cp`.`tpch/lineitem.parquet` LIMIT 20";
+//    client.queryBuilder().sql(sql).printCsv();
+
+    Map<SchemaPath, TypeProtos.MajorType> typeMap = new HashMap<>();
+    typeMap.put(TestBuilder.parsePath("l_orderkey"), Types.required(TypeProtos.MinorType.INT));
+    typeMap.put(TestBuilder.parsePath("l_partkey"), Types.required(TypeProtos.MinorType.INT));
+    typeMap.put(TestBuilder.parsePath("l_suppkey"), Types.required(TypeProtos.MinorType.INT));
+    typeMap.put(TestBuilder.parsePath("l_linenumber"), Types.required(TypeProtos.MinorType.INT));
+    typeMap.put(TestBuilder.parsePath("l_quantity"), Types.required(TypeProtos.MinorType.FLOAT8));
+    client.testBuilder()
+      .sqlQuery(sql)
+      .unOrdered()
+      .csvBaselineFile("parquet/expected/fixedWidth.csv")
+      .baselineColumns("l_orderkey", "l_partkey", "l_suppkey", "l_linenumber", "l_quantity")
+      .baselineTypes(typeMap)
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testVariableWidth() throws Exception {
+    String sql = "SELECT s_name, s_address, s_phone, s_comment\n" +
+                 "FROM `cp`.`tpch/supplier.parquet` LIMIT 20";
+//    client.queryBuilder().sql(sql).printCsv();
+
+    Map<SchemaPath, TypeProtos.MajorType> typeMap = new HashMap<>();
+    typeMap.put(TestBuilder.parsePath("s_name"), Types.required(TypeProtos.MinorType.VARCHAR));
+    typeMap.put(TestBuilder.parsePath("s_address"), Types.required(TypeProtos.MinorType.VARCHAR));
+    typeMap.put(TestBuilder.parsePath("s_phone"), Types.required(TypeProtos.MinorType.VARCHAR));
+    typeMap.put(TestBuilder.parsePath("s_comment"), Types.required(TypeProtos.MinorType.VARCHAR));
+    client.testBuilder()
+      .sqlQuery(sql)
+      .unOrdered()
+      .csvBaselineFile("parquet/expected/variableWidth.csv")
+      .baselineColumns("s_name", "s_address", "s_phone", "s_comment")
+      .baselineTypes(typeMap)
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testMixedWidth() throws Exception {
+    String sql = "SELECT s_suppkey, s_name, s_address, s_phone, s_acctbal\n" +
+                 "FROM `cp`.`tpch/supplier.parquet` LIMIT 20";
+//    client.queryBuilder().sql(sql).printCsv();
+
+    Map<SchemaPath, TypeProtos.MajorType> typeMap = new HashMap<>();
+    typeMap.put(TestBuilder.parsePath("s_suppkey"), Types.required(TypeProtos.MinorType.INT));
+    typeMap.put(TestBuilder.parsePath("s_name"), Types.required(TypeProtos.MinorType.VARCHAR));
+    typeMap.put(TestBuilder.parsePath("s_address"), Types.required(TypeProtos.MinorType.VARCHAR));
+    typeMap.put(TestBuilder.parsePath("s_phone"), Types.required(TypeProtos.MinorType.VARCHAR));
+    typeMap.put(TestBuilder.parsePath("s_acctbal"), Types.required(TypeProtos.MinorType.FLOAT8));
+    client.testBuilder()
+      .sqlQuery(sql)
+      .unOrdered()
+      .csvBaselineFile("parquet/expected/mixedWidth.csv")
+      .baselineColumns("s_suppkey", "s_name", "s_address", "s_phone", "s_acctbal")
+      .baselineTypes(typeMap)
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testStar() throws Exception {
+    String sql = "SELECT *\n" +
+                 "FROM `cp`.`tpch/supplier.parquet` LIMIT 20";
+//    client.queryBuilder().sql(sql).printCsv();
+
+    Map<SchemaPath, TypeProtos.MajorType> typeMap = new HashMap<>();
+    typeMap.put(TestBuilder.parsePath("s_suppkey"), Types.required(TypeProtos.MinorType.INT));
+    typeMap.put(TestBuilder.parsePath("s_name"), Types.required(TypeProtos.MinorType.VARCHAR));
+    typeMap.put(TestBuilder.parsePath("s_address"), Types.required(TypeProtos.MinorType.VARCHAR));
+    typeMap.put(TestBuilder.parsePath("s_nationkey"), Types.required(TypeProtos.MinorType.INT));
+    typeMap.put(TestBuilder.parsePath("s_phone"), Types.required(TypeProtos.MinorType.VARCHAR));
+    typeMap.put(TestBuilder.parsePath("s_acctbal"), Types.required(TypeProtos.MinorType.FLOAT8));
+    typeMap.put(TestBuilder.parsePath("s_comment"), Types.required(TypeProtos.MinorType.VARCHAR));
+    client.testBuilder()
+      .sqlQuery(sql)
+      .unOrdered()
+      .csvBaselineFile("parquet/expected/star.csv")
+      .baselineColumns("s_suppkey", "s_name", "s_address", "s_nationkey", "s_phone", "s_acctbal", "s_comment")
+      .baselineTypes(typeMap)
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testMissing() throws Exception {
+    String sql = "SELECT s_suppkey, bogus\n" +
+                 "FROM `cp`.`tpch/supplier.parquet` LIMIT 20";
+
+    // This test should return nothing but nulls. At present, the test
+    // framework can't check this case. Temporarily dumping the query
+    // to a CSV file to the console.
+    // TODO: Once the "row set" fixture is available, use that to verify
+    // that all rows are null.
+
+//    client.queryBuilder().sql(sql).printCsv();
+
+    // Can't handle nulls this way...
+//    Map<SchemaPath, TypeProtos.MajorType> typeMap = new HashMap<>();
+//    typeMap.put(TestBuilder.parsePath("s_suppkey"), Types.required(TypeProtos.MinorType.INT));
+//    typeMap.put(TestBuilder.parsePath("bogus"), Types.optional(TypeProtos.MinorType.INT));
+//    client.testBuilder()
+//      .sqlQuery(sql)
+//      .unOrdered()
+//      .csvBaselineFile("parquet/expected/bogus.csv")
+//      .baselineColumns("s_suppkey", "bogus")
+//      .baselineTypes(typeMap)
+//      .build()
+//      .run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/676ea889/exec/java-exec/src/test/resources/parquet/expected/bogus.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/expected/bogus.csv b/exec/java-exec/src/test/resources/parquet/expected/bogus.csv
new file mode 100644
index 0000000..52af180
--- /dev/null
+++ b/exec/java-exec/src/test/resources/parquet/expected/bogus.csv
@@ -0,0 +1,20 @@
+1,
+2,
+3,
+4,
+5,
+6,
+7,
+8,
+9,
+10,
+11,
+12,
+13,
+14,
+15,
+16,
+17,
+18,
+19,
+20,

http://git-wip-us.apache.org/repos/asf/drill/blob/676ea889/exec/java-exec/src/test/resources/parquet/expected/fixedWidth.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/expected/fixedWidth.csv b/exec/java-exec/src/test/resources/parquet/expected/fixedWidth.csv
new file mode 100644
index 0000000..198c9b4
--- /dev/null
+++ b/exec/java-exec/src/test/resources/parquet/expected/fixedWidth.csv
@@ -0,0 +1,20 @@
+1,1552,93,1,17.0
+1,674,75,2,36.0
+1,637,38,3,8.0
+1,22,48,4,28.0
+1,241,23,5,24.0
+1,157,10,6,32.0
+2,1062,33,1,38.0
+3,43,19,1,45.0
+3,191,70,2,49.0
+3,1285,60,3,27.0
+3,294,22,4,2.0
+3,1831,61,5,28.0
+3,622,16,6,26.0
+4,881,81,1,30.0
+5,1086,87,1,15.0
+5,1240,41,2,26.0
+5,376,5,3,50.0
+6,1397,36,1,37.0
+7,1821,51,1,12.0
+7,1453,93,2,9.0

http://git-wip-us.apache.org/repos/asf/drill/blob/676ea889/exec/java-exec/src/test/resources/parquet/expected/mixedWidth.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/expected/mixedWidth.csv b/exec/java-exec/src/test/resources/parquet/expected/mixedWidth.csv
new file mode 100644
index 0000000..8956083
--- /dev/null
+++ b/exec/java-exec/src/test/resources/parquet/expected/mixedWidth.csv
@@ -0,0 +1,20 @@
+1,"Supplier#000000001"," N kD4on9OM Ipw3,gf0JBoQDd7tgrzrddZ","27-918-335-1736",5755.94
+2,"Supplier#000000002","89eJ5ksX3ImxJQBvxObC,","15-679-861-2259",4032.68
+3,"Supplier#000000003","q1,G3Pj6OjIuUYfUoH18BFTKP5aU9bEV3","11-383-516-1199",4192.4
+4,"Supplier#000000004","Bk7ah4CK8SYQTepEmvMkkgMwg","25-843-787-7479",4641.08
+5,"Supplier#000000005","Gcdm2rJRzl5qlTVzc","21-151-690-3663",-283.84
+6,"Supplier#000000006","tQxuVm7s7CnK","24-696-997-4969",1365.79
+7,"Supplier#000000007","s,4TicNGB4uO6PaSqNBUq","33-990-965-2201",6820.35
+8,"Supplier#000000008","9Sq4bBH2FQEmaFOocY45sRTxo6yuoG","27-498-742-3860",7627.85
+9,"Supplier#000000009","1KhUgZegwM3ua7dsYmekYBsK","20-403-398-8662",5302.37
+10,"Supplier#000000010","Saygah3gYWMp72i PY","34-852-489-8585",3891.91
+11,"Supplier#000000011","JfwTs,LZrV, M,9C","28-613-996-1505",3393.08
+12,"Supplier#000000012","aLIW  q0HYd","18-179-925-7181",1432.69
+13,"Supplier#000000013","HK71HQyWoqRWOX8GI FpgAifW,2PoH","13-727-620-7813",9107.22
+14,"Supplier#000000014","EXsnO5pTNj4iZRm","25-656-247-5058",9189.82
+15,"Supplier#000000015","olXVbNBfVzRqgokr1T,Ie","18-453-357-6394",308.56
+16,"Supplier#000000016","YjP5C55zHDXL7LalK27zfQnwejdpin4AMpvh","32-822-502-4215",2972.26
+17,"Supplier#000000017","c2d,ESHRSkK3WYnxpgw6aOqN0q","29-601-884-9219",1687.81
+18,"Supplier#000000018","PGGVE5PWAMwKDZw ","26-729-551-1115",7040.82
+19,"Supplier#000000019","edZT3es,nBFD8lBXTGeTl","34-278-310-2731",6150.38
+20,"Supplier#000000020","iybAE,RmTymrZVYaFZva2SH,j","13-715-945-6730",530.82


Mime
View raw message