hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From x..@apache.org
Subject [2/2] hive git commit: HIVE-15112: Implement Parquet vectorization reader for Struct type (Ferdinand Xu, via Chao Sun) This closes #116
Date Sat, 10 Dec 2016 02:51:05 GMT
HIVE-15112: Implement Parquet vectorization reader for Struct type (Ferdinand Xu, via Chao Sun)
This closes #116


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9a524ada
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9a524ada
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9a524ada

Branch: refs/heads/master
Commit: 9a524ada2eafda2f4853a12d469f7ca48e57f38c
Parents: e8bf725
Author: Ferdinand Xu <cheng.a.xu@intel.com>
Authored: Sat Dec 10 03:14:48 2016 +0800
Committer: Ferdinand Xu <cheng.a.xu@intel.com>
Committed: Sat Dec 10 03:14:55 2016 +0800

----------------------------------------------------------------------
 .../parquet/vector/VectorizedColumnReader.java  | 566 +--------------
 .../vector/VectorizedParquetRecordReader.java   |  84 ++-
 .../vector/VectorizedPrimitiveColumnReader.java | 589 ++++++++++++++++
 .../vector/VectorizedStructColumnReader.java    |  59 ++
 .../io/parquet/TestVectorizedColumnReader.java  | 418 ++---------
 .../parquet/TestVectorizedColumnReaderBase.java | 694 +++++++++++++++++++
 ...ectorizedDictionaryEncodingColumnReader.java |  85 +++
 7 files changed, 1563 insertions(+), 932 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9a524ada/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java
index 5a9c7f9..e3be982 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java
@@ -1,9 +1,13 @@
 /**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -11,561 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.hive.ql.io.parquet.vector;
 
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.bytes.BytesUtils;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.column.page.DataPage;
-import org.apache.parquet.column.page.DataPageV1;
-import org.apache.parquet.column.page.DataPageV2;
-import org.apache.parquet.column.page.DictionaryPage;
-import org.apache.parquet.column.page.PageReader;
-import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
-import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.Type;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.sql.Timestamp;
-import java.util.Arrays;
-
-import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
-import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
-import static org.apache.parquet.column.ValuesType.VALUES;
-
-/**
- * It's column level Parquet reader which is used to read a batch of records for a column,
- * part of the code is referred from Apache Spark and Apache Parquet.
- */
-public class VectorizedColumnReader {
-
-  private static final Logger LOG = LoggerFactory.getLogger(VectorizedColumnReader.class);
-
-  private boolean skipTimestampConversion = false;
-
-  /**
-   * Total number of values read.
-   */
-  private long valuesRead;
 
+public interface VectorizedColumnReader {
   /**
-   * value that indicates the end of the current page. That is,
-   * if valuesRead == endOfPageValueCount, we are at the end of the page.
+   * read records with specified size and type into the columnVector
+   *
+   * @param total      number of records to read into the column vector
+   * @param column     column vector where the reader will read data into
+   * @param columnType the type of column vector
+   * @throws IOException
    */
-  private long endOfPageValueCount;
-
-  /**
-   * The dictionary, if this column has dictionary encoding.
-   */
-  private final Dictionary dictionary;
-
-  /**
-   * If true, the current page is dictionary encoded.
-   */
-  private boolean isCurrentPageDictionaryEncoded;
-
-  /**
-   * Maximum definition level for this column.
-   */
-  private final int maxDefLevel;
-
-  private int definitionLevel;
-  private int repetitionLevel;
-
-  /**
-   * Repetition/Definition/Value readers.
-   */
-  private IntIterator repetitionLevelColumn;
-  private IntIterator definitionLevelColumn;
-  private ValuesReader dataColumn;
-
-  /**
-   * Total values in the current page.
-   */
-  private int pageValueCount;
-
-  private final PageReader pageReader;
-  private final ColumnDescriptor descriptor;
-  private final Type type;
-
-  public VectorizedColumnReader(
-    ColumnDescriptor descriptor,
-    PageReader pageReader,
-    boolean skipTimestampConversion,
-    Type type) throws IOException {
-    this.descriptor = descriptor;
-    this.type = type;
-    this.pageReader = pageReader;
-    this.maxDefLevel = descriptor.getMaxDefinitionLevel();
-    this.skipTimestampConversion = skipTimestampConversion;
-
-    DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
-    if (dictionaryPage != null) {
-      try {
-        this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
-        this.isCurrentPageDictionaryEncoded = true;
-      } catch (IOException e) {
-        throw new IOException("could not decode the dictionary for " + descriptor, e);
-      }
-    } else {
-      this.dictionary = null;
-      this.isCurrentPageDictionaryEncoded = false;
-    }
-  }
-
   void readBatch(
     int total,
     ColumnVector column,
-    TypeInfo columnType) throws IOException {
-
-    int rowId = 0;
-    while (total > 0) {
-      // Compute the number of values we want to read in this page.
-      int leftInPage = (int) (endOfPageValueCount - valuesRead);
-      if (leftInPage == 0) {
-        readPage();
-        leftInPage = (int) (endOfPageValueCount - valuesRead);
-      }
-
-      int num = Math.min(total, leftInPage);
-      if (isCurrentPageDictionaryEncoded) {
-        LongColumnVector dictionaryIds = new LongColumnVector();
-        // Read and decode dictionary ids.
-        readDictionaryIDs(num, dictionaryIds, rowId);
-        decodeDictionaryIds(rowId, num, column, dictionaryIds);
-      } else {
-        // assign values in vector
-        PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType;
-        switch (primitiveColumnType.getPrimitiveCategory()) {
-        case INT:
-        case BYTE:
-        case SHORT:
-          readIntegers(num, (LongColumnVector) column, rowId);
-          break;
-        case DATE:
-        case INTERVAL_YEAR_MONTH:
-        case LONG:
-          readLongs(num, (LongColumnVector) column, rowId);
-          break;
-        case BOOLEAN:
-          readBooleans(num, (LongColumnVector) column, rowId);
-          break;
-        case DOUBLE:
-          readDoubles(num, (DoubleColumnVector) column, rowId);
-          break;
-        case BINARY:
-        case STRING:
-        case CHAR:
-        case VARCHAR:
-          readBinaries(num, (BytesColumnVector) column, rowId);
-          break;
-        case FLOAT:
-          readFloats(num, (DoubleColumnVector) column, rowId);
-          break;
-        case DECIMAL:
-          readDecimal(num, (DecimalColumnVector) column, rowId);
-          break;
-        case INTERVAL_DAY_TIME:
-        case TIMESTAMP:
-        default:
-          throw new IOException(
-            "Unsupported type category: " + primitiveColumnType.getPrimitiveCategory());
-        }
-      }
-      rowId += num;
-      total -= num;
-    }
-  }
-
-  private void readDictionaryIDs(
-    int total,
-    LongColumnVector c,
-    int rowId) throws IOException {
-    int left = total;
-    while (left > 0) {
-      readRepetitionAndDefinitionLevels();
-      if (definitionLevel >= maxDefLevel) {
-        c.vector[rowId] = dataColumn.readValueDictionaryId();
-        c.isNull[rowId] = false;
-        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
-      } else {
-        c.isNull[rowId] = true;
-        c.isRepeating = false;
-        c.noNulls = false;
-      }
-      rowId++;
-      left--;
-    }
-  }
-
-  private void readIntegers(
-    int total,
-    LongColumnVector c,
-    int rowId) throws IOException {
-    int left = total;
-    while (left > 0) {
-      readRepetitionAndDefinitionLevels();
-      if (definitionLevel >= maxDefLevel) {
-        c.vector[rowId] = dataColumn.readInteger();
-        c.isNull[rowId] = false;
-        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
-      } else {
-        c.isNull[rowId] = true;
-        c.isRepeating = false;
-        c.noNulls = false;
-      }
-      rowId++;
-      left--;
-    }
-  }
-
-  private void readDoubles(
-    int total,
-    DoubleColumnVector c,
-    int rowId) throws IOException {
-    int left = total;
-    while (left > 0) {
-      readRepetitionAndDefinitionLevels();
-      if (definitionLevel >= maxDefLevel) {
-        c.vector[rowId] = dataColumn.readDouble();
-        c.isNull[rowId] = false;
-        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
-      } else {
-        c.isNull[rowId] = true;
-        c.isRepeating = false;
-        c.noNulls = false;
-      }
-      rowId++;
-      left--;
-    }
-  }
-
-  private void readBooleans(
-    int total,
-    LongColumnVector c,
-    int rowId) throws IOException {
-    int left = total;
-    while (left > 0) {
-      readRepetitionAndDefinitionLevels();
-      if (definitionLevel >= maxDefLevel) {
-        c.vector[rowId] = dataColumn.readBoolean() ? 1 : 0;
-        c.isNull[rowId] = false;
-        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
-      } else {
-        c.isNull[rowId] = true;
-        c.isRepeating = false;
-        c.noNulls = false;
-      }
-      rowId++;
-      left--;
-    }
-  }
-
-  private void readLongs(
-    int total,
-    LongColumnVector c,
-    int rowId) throws IOException {
-    int left = total;
-    while (left > 0) {
-      readRepetitionAndDefinitionLevels();
-      if (definitionLevel >= maxDefLevel) {
-        c.vector[rowId] = dataColumn.readLong();
-        c.isNull[rowId] = false;
-        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
-      } else {
-        c.isNull[rowId] = true;
-        c.isRepeating = false;
-        c.noNulls = false;
-      }
-      rowId++;
-      left--;
-    }
-  }
-
-  private void readFloats(
-    int total,
-    DoubleColumnVector c,
-    int rowId) throws IOException {
-    int left = total;
-    while (left > 0) {
-      readRepetitionAndDefinitionLevels();
-      if (definitionLevel >= maxDefLevel) {
-        c.vector[rowId] = dataColumn.readFloat();
-        c.isNull[rowId] = false;
-        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
-      } else {
-        c.isNull[rowId] = true;
-        c.isRepeating = false;
-        c.noNulls = false;
-      }
-      rowId++;
-      left--;
-    }
-  }
-
-  private void readDecimal(
-    int total,
-    DecimalColumnVector c,
-    int rowId) throws IOException {
-    int left = total;
-    c.precision = (short) type.asPrimitiveType().getDecimalMetadata().getPrecision();
-    c.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale();
-    while (left > 0) {
-      readRepetitionAndDefinitionLevels();
-      if (definitionLevel >= maxDefLevel) {
-        c.vector[rowId].set(dataColumn.readBytes().getBytesUnsafe(), c.scale);
-        c.isNull[rowId] = false;
-        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
-      } else {
-        c.isNull[rowId] = true;
-        c.isRepeating = false;
-        c.noNulls = false;
-      }
-      rowId++;
-      left--;
-    }
-  }
-
-  private void readBinaries(
-    int total,
-    BytesColumnVector c,
-    int rowId) throws IOException {
-    int left = total;
-    while (left > 0) {
-      readRepetitionAndDefinitionLevels();
-      if (definitionLevel >= maxDefLevel) {
-        c.setVal(rowId, dataColumn.readBytes().getBytesUnsafe());
-        c.isNull[rowId] = false;
-        // TODO figure out a better way to set repeat for Binary type
-        c.isRepeating = false;
-      } else {
-        c.isNull[rowId] = true;
-        c.isRepeating = false;
-        c.noNulls = false;
-      }
-      rowId++;
-      left--;
-    }
-  }
-
-  /**
-   * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
-   */
-  private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
-                                   LongColumnVector dictionaryIds) {
-    System.arraycopy(dictionaryIds.isNull, rowId, column.isNull, rowId, num);
-    if (column.noNulls) {
-      column.noNulls = dictionaryIds.noNulls;
-    }
-    column.isRepeating = column.isRepeating && dictionaryIds.isRepeating;
-
-    switch (descriptor.getType()) {
-    case INT32:
-      for (int i = rowId; i < rowId + num; ++i) {
-        ((LongColumnVector) column).vector[i] =
-          dictionary.decodeToInt((int) dictionaryIds.vector[i]);
-      }
-      break;
-    case INT64:
-      for (int i = rowId; i < rowId + num; ++i) {
-        ((LongColumnVector) column).vector[i] =
-          dictionary.decodeToLong((int) dictionaryIds.vector[i]);
-      }
-      break;
-    case FLOAT:
-      for (int i = rowId; i < rowId + num; ++i) {
-        ((DoubleColumnVector) column).vector[i] =
-          dictionary.decodeToFloat((int) dictionaryIds.vector[i]);
-      }
-      break;
-    case DOUBLE:
-      for (int i = rowId; i < rowId + num; ++i) {
-        ((DoubleColumnVector) column).vector[i] =
-          dictionary.decodeToDouble((int) dictionaryIds.vector[i]);
-      }
-      break;
-    case INT96:
-      for (int i = rowId; i < rowId + num; ++i) {
-        ByteBuffer buf = dictionary.decodeToBinary((int) dictionaryIds.vector[i]).toByteBuffer();
-        buf.order(ByteOrder.LITTLE_ENDIAN);
-        long timeOfDayNanos = buf.getLong();
-        int julianDay = buf.getInt();
-        NanoTime nt = new NanoTime(julianDay, timeOfDayNanos);
-        Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipTimestampConversion);
-        ((TimestampColumnVector) column).set(i, ts);
-      }
-      break;
-    case BINARY:
-    case FIXED_LEN_BYTE_ARRAY:
-      for (int i = rowId; i < rowId + num; ++i) {
-        ((BytesColumnVector) column)
-          .setVal(i, dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe());
-      }
-      break;
-    default:
-      throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType());
-    }
-  }
-
-  private void readRepetitionAndDefinitionLevels() {
-    repetitionLevel = repetitionLevelColumn.nextInt();
-    definitionLevel = definitionLevelColumn.nextInt();
-    valuesRead++;
-  }
-
-  private void readPage() throws IOException {
-    DataPage page = pageReader.readPage();
-    // TODO: Why is this a visitor?
-    page.accept(new DataPage.Visitor<Void>() {
-      @Override
-      public Void visit(DataPageV1 dataPageV1) {
-        readPageV1(dataPageV1);
-        return null;
-      }
-
-      @Override
-      public Void visit(DataPageV2 dataPageV2) {
-        readPageV2(dataPageV2);
-        return null;
-      }
-    });
-  }
-
-  private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) throws IOException {
-    this.pageValueCount = valueCount;
-    this.endOfPageValueCount = valuesRead + pageValueCount;
-    if (dataEncoding.usesDictionary()) {
-      this.dataColumn = null;
-      if (dictionary == null) {
-        throw new IOException(
-          "could not read page in col " + descriptor +
-            " as the dictionary was missing for encoding " + dataEncoding);
-      }
-      dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary);
-      this.isCurrentPageDictionaryEncoded = true;
-    } else {
-      if (dataEncoding != Encoding.PLAIN) {
-        throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
-      }
-      dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
-      this.isCurrentPageDictionaryEncoded = false;
-    }
-
-    try {
-      dataColumn.initFromPage(pageValueCount, bytes, offset);
-    } catch (IOException e) {
-      throw new IOException("could not read page in col " + descriptor, e);
-    }
-  }
-
-  private void readPageV1(DataPageV1 page) {
-    ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
-    ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
-    this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
-    this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
-    try {
-      byte[] bytes = page.getBytes().toByteArray();
-      LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
-      LOG.debug("reading repetition levels at 0");
-      rlReader.initFromPage(pageValueCount, bytes, 0);
-      int next = rlReader.getNextOffset();
-      LOG.debug("reading definition levels at " + next);
-      dlReader.initFromPage(pageValueCount, bytes, next);
-      next = dlReader.getNextOffset();
-      LOG.debug("reading data at " + next);
-      initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
-    } catch (IOException e) {
-      throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e);
-    }
-  }
-
-  private void readPageV2(DataPageV2 page) {
-    this.pageValueCount = page.getValueCount();
-    this.repetitionLevelColumn = newRLEIterator(descriptor.getMaxRepetitionLevel(),
-      page.getRepetitionLevels());
-    this.definitionLevelColumn = newRLEIterator(descriptor.getMaxDefinitionLevel(), page.getDefinitionLevels());
-    try {
-      LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records");
-      initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount());
-    } catch (IOException e) {
-      throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e);
-    }
-  }
-
-  private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
-    try {
-      if (maxLevel == 0) {
-        return new NullIntIterator();
-      }
-      return new RLEIntIterator(
-        new RunLengthBitPackingHybridDecoder(
-          BytesUtils.getWidthFromMaxInt(maxLevel),
-          new ByteArrayInputStream(bytes.toByteArray())));
-    } catch (IOException e) {
-      throw new ParquetDecodingException("could not read levels in page for col " + descriptor, e);
-    }
-  }
-
-  /**
-   * Utility classes to abstract over different way to read ints with different encodings.
-   * TODO: remove this layer of abstraction?
-   */
-  abstract static class IntIterator {
-    abstract int nextInt();
-  }
-
-  protected static final class ValuesReaderIntIterator extends IntIterator {
-    ValuesReader delegate;
-
-    public ValuesReaderIntIterator(ValuesReader delegate) {
-      this.delegate = delegate;
-    }
-
-    @Override
-    int nextInt() {
-      return delegate.readInteger();
-    }
-  }
-
-  protected static final class RLEIntIterator extends IntIterator {
-    RunLengthBitPackingHybridDecoder delegate;
-
-    public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
-      this.delegate = delegate;
-    }
-
-    @Override
-    int nextInt() {
-      try {
-        return delegate.readInt();
-      } catch (IOException e) {
-        throw new ParquetDecodingException(e);
-      }
-    }
-  }
-
-  protected static final class NullIntIterator extends IntIterator {
-    @Override
-    int nextInt() { return 0; }
-  }
+    TypeInfo columnType) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9a524ada/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index f94c49a..699de59 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -23,11 +23,13 @@ import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
 import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.parquet.ParquetRuntimeException;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.filter2.compat.FilterCompat;
@@ -35,6 +37,7 @@ import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetInputSplit;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.InvalidSchemaException;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
 import org.slf4j.Logger;
@@ -68,6 +71,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
   private List<String> columnNamesList;
   private List<TypeInfo> columnTypesList;
   private VectorizedRowBatchCtx rbCtx;
+  private List<Integer> indexColumnsWanted;
 
   /**
    * For each request column, the reader to read this column. This is NULL if this column
@@ -198,7 +202,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
         columnTypesList);
     }
 
-    List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
+    indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
     if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) {
       requestedSchema =
         DataWritableReadSupport.getSchemaByIndex(tableSchema, columnNamesList, indexColumnsWanted);
@@ -279,11 +283,81 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     List<ColumnDescriptor> columns = requestedSchema.getColumns();
     List<Type> types = requestedSchema.getFields();
     columnReaders = new VectorizedColumnReader[columns.size()];
-    for (int i = 0; i < columns.size(); ++i) {
-      columnReaders[i] =
-        new VectorizedColumnReader(columns.get(i), pages.getPageReader(columns.get(i)),
-          skipTimestampConversion, types.get(i));
+
+    if (!ColumnProjectionUtils.isReadAllColumns(jobConf) && !indexColumnsWanted.isEmpty()) {
+      for (int i = 0; i < types.size(); ++i) {
+        columnReaders[i] =
+          buildVectorizedParquetReader(columnTypesList.get(indexColumnsWanted.get(i)), types.get(i),
+            pages, requestedSchema.getColumns(), skipTimestampConversion, 0);
+      }
+    } else {
+      for (int i = 0; i < types.size(); ++i) {
+        columnReaders[i] = buildVectorizedParquetReader(columnTypesList.get(i), types.get(i), pages,
+          requestedSchema.getColumns(), skipTimestampConversion, 0);
+      }
     }
+
     totalCountLoadedSoFar += pages.getRowCount();
   }
+
+  private List<ColumnDescriptor> getAllColumnDescriptorByType(
+    int depth,
+    Type type,
+    List<ColumnDescriptor> columns) throws ParquetRuntimeException {
+    List<ColumnDescriptor> res = new ArrayList<>();
+    for (ColumnDescriptor descriptor : columns) {
+      if (depth >= descriptor.getPath().length) {
+        throw new InvalidSchemaException("Corrupted Parquet schema");
+      }
+      if (type.getName().equals(descriptor.getPath()[depth])) {
+        res.add(descriptor);
+      }
+    }
+    return res;
+  }
+
+  // Build VectorizedParquetColumnReader via Hive typeInfo and Parquet schema
+  private VectorizedColumnReader buildVectorizedParquetReader(
+    TypeInfo typeInfo,
+    Type type,
+    PageReadStore pages,
+    List<ColumnDescriptor> columnDescriptors,
+    boolean skipTimestampConversion,
+    int depth) throws IOException {
+    List<ColumnDescriptor> descriptors =
+      getAllColumnDescriptorByType(depth, type, columnDescriptors);
+    switch (typeInfo.getCategory()) {
+    case PRIMITIVE:
+      if (columnDescriptors == null || columnDescriptors.isEmpty()) {
+        throw new RuntimeException(
+          "Failed to find related Parquet column descriptor with type " + type);
+      } else {
+        return new VectorizedPrimitiveColumnReader(descriptors.get(0),
+          pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type);
+      }
+    case STRUCT:
+      StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+      List<VectorizedColumnReader> fieldReaders = new ArrayList<>();
+      List<TypeInfo> fieldTypes = structTypeInfo.getAllStructFieldTypeInfos();
+      List<Type> types = type.asGroupType().getFields();
+      for (int i = 0; i < fieldTypes.size(); i++) {
+        VectorizedColumnReader r =
+          buildVectorizedParquetReader(fieldTypes.get(i), types.get(i), pages, descriptors,
+            skipTimestampConversion, depth + 1);
+        if (r != null) {
+          fieldReaders.add(r);
+        } else {
+          throw new RuntimeException(
+            "Fail to build Parquet vectorized reader based on Hive type " + fieldTypes.get(i)
+              .getTypeName() + " and Parquet type" + types.get(i).toString());
+        }
+      }
+      return new VectorizedStructColumnReader(fieldReaders);
+    case LIST:
+    case MAP:
+    case UNION:
+    default:
+      throw new RuntimeException("Unsupported category " + typeInfo.getCategory().name());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9a524ada/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
new file mode 100644
index 0000000..3d5c6e6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java
@@ -0,0 +1,589 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.vector;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
+import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.sql.Timestamp;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+/**
+ * It's column level Parquet reader which is used to read a batch of records for a column,
+ * part of the code is referred from Apache Spark and Apache Parquet.
+ */
+public class VectorizedPrimitiveColumnReader implements VectorizedColumnReader {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VectorizedPrimitiveColumnReader.class);
+
+  private boolean skipTimestampConversion = false;
+
+  /**
+   * Total number of values read.
+   */
+  private long valuesRead;
+
+  /**
+   * value that indicates the end of the current page. That is,
+   * if valuesRead == endOfPageValueCount, we are at the end of the page.
+   */
+  private long endOfPageValueCount;
+
+  /**
+   * The dictionary, if this column has dictionary encoding.
+   */
+  private final Dictionary dictionary;
+
+  /**
+   * If true, the current page is dictionary encoded.
+   */
+  private boolean isCurrentPageDictionaryEncoded;
+
+  /**
+   * Maximum definition level for this column.
+   */
+  private final int maxDefLevel;
+
+  private int definitionLevel;
+  private int repetitionLevel;
+
+  /**
+   * Repetition/Definition/Value readers.
+   */
+  private IntIterator repetitionLevelColumn;
+  private IntIterator definitionLevelColumn;
+  private ValuesReader dataColumn;
+
+  /**
+   * Total values in the current page.
+   */
+  private int pageValueCount;
+
+  private final PageReader pageReader;
+  private final ColumnDescriptor descriptor;
+  private final Type type;
+
+  public VectorizedPrimitiveColumnReader(
+    ColumnDescriptor descriptor,
+    PageReader pageReader,
+    boolean skipTimestampConversion,
+    Type type) throws IOException {
+    this.descriptor = descriptor;
+    this.type = type;
+    this.pageReader = pageReader;
+    this.maxDefLevel = descriptor.getMaxDefinitionLevel();
+    this.skipTimestampConversion = skipTimestampConversion;
+
+    DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+    if (dictionaryPage != null) {
+      try {
+        this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
+        this.isCurrentPageDictionaryEncoded = true;
+      } catch (IOException e) {
+        throw new IOException("could not decode the dictionary for " + descriptor, e);
+      }
+    } else {
+      this.dictionary = null;
+      this.isCurrentPageDictionaryEncoded = false;
+    }
+  }
+
+  public void readBatch(
+    int total,
+    ColumnVector column,
+    TypeInfo columnType) throws IOException {
+    int rowId = 0;
+    while (total > 0) {
+      // Compute the number of values we want to read in this page.
+      int leftInPage = (int) (endOfPageValueCount - valuesRead);
+      if (leftInPage == 0) {
+        readPage();
+        leftInPage = (int) (endOfPageValueCount - valuesRead);
+      }
+
+      int num = Math.min(total, leftInPage);
+      if (isCurrentPageDictionaryEncoded) {
+        LongColumnVector dictionaryIds = new LongColumnVector();
+        // Read and decode dictionary ids.
+        readDictionaryIDs(num, dictionaryIds, rowId);
+        decodeDictionaryIds(rowId, num, column, dictionaryIds);
+      } else {
+        // assign values in vector
+        readBatchHelper(num, column, columnType, rowId);
+      }
+      rowId += num;
+      total -= num;
+    }
+  }
+
+  private void readBatchHelper(
+    int num,
+    ColumnVector column,
+    TypeInfo columnType,
+    int rowId) throws IOException {
+    PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType;
+    switch (primitiveColumnType.getPrimitiveCategory()) {
+    case INT:
+    case BYTE:
+    case SHORT:
+      readIntegers(num, (LongColumnVector) column, rowId);
+      break;
+    case DATE:
+    case INTERVAL_YEAR_MONTH:
+    case LONG:
+      readLongs(num, (LongColumnVector) column, rowId);
+      break;
+    case BOOLEAN:
+      readBooleans(num, (LongColumnVector) column, rowId);
+      break;
+    case DOUBLE:
+      readDoubles(num, (DoubleColumnVector) column, rowId);
+      break;
+    case BINARY:
+    case STRING:
+    case CHAR:
+    case VARCHAR:
+      readBinaries(num, (BytesColumnVector) column, rowId);
+      break;
+    case FLOAT:
+      readFloats(num, (DoubleColumnVector) column, rowId);
+      break;
+    case DECIMAL:
+      readDecimal(num, (DecimalColumnVector) column, rowId);
+      break;
+    case INTERVAL_DAY_TIME:
+    case TIMESTAMP:
+    default:
+      throw new IOException("Unsupported type: " + type);
+    }
+  }
+
+  private void readDictionaryIDs(
+    int total,
+    LongColumnVector c,
+    int rowId) throws IOException {
+    int left = total;
+    while (left > 0) {
+      readRepetitionAndDefinitionLevels();
+      if (definitionLevel >= maxDefLevel) {
+        c.vector[rowId] = dataColumn.readValueDictionaryId();
+        c.isNull[rowId] = false;
+        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+      } else {
+        c.isNull[rowId] = true;
+        c.isRepeating = false;
+        c.noNulls = false;
+      }
+      rowId++;
+      left--;
+    }
+  }
+
+  private void readIntegers(
+    int total,
+    LongColumnVector c,
+    int rowId) throws IOException {
+    int left = total;
+    while (left > 0) {
+      readRepetitionAndDefinitionLevels();
+      if (definitionLevel >= maxDefLevel) {
+        c.vector[rowId] = dataColumn.readInteger();
+        c.isNull[rowId] = false;
+        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+      } else {
+        c.isNull[rowId] = true;
+        c.isRepeating = false;
+        c.noNulls = false;
+      }
+      rowId++;
+      left--;
+    }
+  }
+
+  private void readDoubles(
+    int total,
+    DoubleColumnVector c,
+    int rowId) throws IOException {
+    int left = total;
+    while (left > 0) {
+      readRepetitionAndDefinitionLevels();
+      if (definitionLevel >= maxDefLevel) {
+        c.vector[rowId] = dataColumn.readDouble();
+        c.isNull[rowId] = false;
+        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+      } else {
+        c.isNull[rowId] = true;
+        c.isRepeating = false;
+        c.noNulls = false;
+      }
+      rowId++;
+      left--;
+    }
+  }
+
+  private void readBooleans(
+    int total,
+    LongColumnVector c,
+    int rowId) throws IOException {
+    int left = total;
+    while (left > 0) {
+      readRepetitionAndDefinitionLevels();
+      if (definitionLevel >= maxDefLevel) {
+        c.vector[rowId] = dataColumn.readBoolean() ? 1 : 0;
+        c.isNull[rowId] = false;
+        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+      } else {
+        c.isNull[rowId] = true;
+        c.isRepeating = false;
+        c.noNulls = false;
+      }
+      rowId++;
+      left--;
+    }
+  }
+
+  private void readLongs(
+    int total,
+    LongColumnVector c,
+    int rowId) throws IOException {
+    int left = total;
+    while (left > 0) {
+      readRepetitionAndDefinitionLevels();
+      if (definitionLevel >= maxDefLevel) {
+        c.vector[rowId] = dataColumn.readLong();
+        c.isNull[rowId] = false;
+        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+      } else {
+        c.isNull[rowId] = true;
+        c.isRepeating = false;
+        c.noNulls = false;
+      }
+      rowId++;
+      left--;
+    }
+  }
+
+  private void readFloats(
+    int total,
+    DoubleColumnVector c,
+    int rowId) throws IOException {
+    int left = total;
+    while (left > 0) {
+      readRepetitionAndDefinitionLevels();
+      if (definitionLevel >= maxDefLevel) {
+        c.vector[rowId] = dataColumn.readFloat();
+        c.isNull[rowId] = false;
+        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+      } else {
+        c.isNull[rowId] = true;
+        c.isRepeating = false;
+        c.noNulls = false;
+      }
+      rowId++;
+      left--;
+    }
+  }
+
+  private void readDecimal(
+    int total,
+    DecimalColumnVector c,
+    int rowId) throws IOException {
+    int left = total;
+    c.precision = (short) type.asPrimitiveType().getDecimalMetadata().getPrecision();
+    c.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale();
+    while (left > 0) {
+      readRepetitionAndDefinitionLevels();
+      if (definitionLevel >= maxDefLevel) {
+        c.vector[rowId].set(dataColumn.readBytes().getBytesUnsafe(), c.scale);
+        c.isNull[rowId] = false;
+        c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
+      } else {
+        c.isNull[rowId] = true;
+        c.isRepeating = false;
+        c.noNulls = false;
+      }
+      rowId++;
+      left--;
+    }
+  }
+
+  private void readBinaries(
+    int total,
+    BytesColumnVector c,
+    int rowId) throws IOException {
+    int left = total;
+    while (left > 0) {
+      readRepetitionAndDefinitionLevels();
+      if (definitionLevel >= maxDefLevel) {
+        c.setVal(rowId, dataColumn.readBytes().getBytesUnsafe());
+        c.isNull[rowId] = false;
+        // TODO figure out a better way to set repeat for Binary type
+        c.isRepeating = false;
+      } else {
+        c.isNull[rowId] = true;
+        c.isRepeating = false;
+        c.noNulls = false;
+      }
+      rowId++;
+      left--;
+    }
+  }
+
+  /**
+   * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
+   */
+  private void decodeDictionaryIds(
+    int rowId,
+    int num,
+    ColumnVector column,
+    LongColumnVector dictionaryIds) {
+    System.arraycopy(dictionaryIds.isNull, rowId, column.isNull, rowId, num);
+    if (column.noNulls) {
+      column.noNulls = dictionaryIds.noNulls;
+    }
+    column.isRepeating = column.isRepeating && dictionaryIds.isRepeating;
+
+    switch (descriptor.getType()) {
+    case INT32:
+      for (int i = rowId; i < rowId + num; ++i) {
+        ((LongColumnVector) column).vector[i] =
+          dictionary.decodeToInt((int) dictionaryIds.vector[i]);
+      }
+      break;
+    case INT64:
+      for (int i = rowId; i < rowId + num; ++i) {
+        ((LongColumnVector) column).vector[i] =
+          dictionary.decodeToLong((int) dictionaryIds.vector[i]);
+      }
+      break;
+    case FLOAT:
+      for (int i = rowId; i < rowId + num; ++i) {
+        ((DoubleColumnVector) column).vector[i] =
+          dictionary.decodeToFloat((int) dictionaryIds.vector[i]);
+      }
+      break;
+    case DOUBLE:
+      for (int i = rowId; i < rowId + num; ++i) {
+        ((DoubleColumnVector) column).vector[i] =
+          dictionary.decodeToDouble((int) dictionaryIds.vector[i]);
+      }
+      break;
+    case INT96:
+      for (int i = rowId; i < rowId + num; ++i) {
+        ByteBuffer buf = dictionary.decodeToBinary((int) dictionaryIds.vector[i]).toByteBuffer();
+        buf.order(ByteOrder.LITTLE_ENDIAN);
+        long timeOfDayNanos = buf.getLong();
+        int julianDay = buf.getInt();
+        NanoTime nt = new NanoTime(julianDay, timeOfDayNanos);
+        Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipTimestampConversion);
+        ((TimestampColumnVector) column).set(i, ts);
+      }
+      break;
+    case BINARY:
+    case FIXED_LEN_BYTE_ARRAY:
+      if (column instanceof BytesColumnVector) {
+        for (int i = rowId; i < rowId + num; ++i) {
+          ((BytesColumnVector) column)
+            .setVal(i, dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe());
+        }
+      } else {
+        DecimalColumnVector decimalColumnVector = ((DecimalColumnVector) column);
+        decimalColumnVector.precision =
+          (short) type.asPrimitiveType().getDecimalMetadata().getPrecision();
+        decimalColumnVector.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale();
+        for (int i = rowId; i < rowId + num; ++i) {
+          decimalColumnVector.vector[i]
+            .set(dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe(),
+              decimalColumnVector.scale);
+        }
+      }
+      break;
+    default:
+      throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType());
+    }
+  }
+
+  private void readRepetitionAndDefinitionLevels() {
+    repetitionLevel = repetitionLevelColumn.nextInt();
+    definitionLevel = definitionLevelColumn.nextInt();
+    valuesRead++;
+  }
+
+  private void readPage() throws IOException {
+    DataPage page = pageReader.readPage();
+    // TODO: Why is this a visitor?
+    page.accept(new DataPage.Visitor<Void>() {
+      @Override
+      public Void visit(DataPageV1 dataPageV1) {
+        readPageV1(dataPageV1);
+        return null;
+      }
+
+      @Override
+      public Void visit(DataPageV2 dataPageV2) {
+        readPageV2(dataPageV2);
+        return null;
+      }
+    });
+  }
+
+  private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) throws IOException {
+    this.pageValueCount = valueCount;
+    this.endOfPageValueCount = valuesRead + pageValueCount;
+    if (dataEncoding.usesDictionary()) {
+      this.dataColumn = null;
+      if (dictionary == null) {
+        throw new IOException(
+          "could not read page in col " + descriptor +
+            " as the dictionary was missing for encoding " + dataEncoding);
+      }
+      dataColumn = dataEncoding.getDictionaryBasedValuesReader(descriptor, VALUES, dictionary);
+      this.isCurrentPageDictionaryEncoded = true;
+    } else {
+      if (dataEncoding != Encoding.PLAIN) {
+        throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
+      }
+      dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
+      this.isCurrentPageDictionaryEncoded = false;
+    }
+
+    try {
+      dataColumn.initFromPage(pageValueCount, bytes, offset);
+    } catch (IOException e) {
+      throw new IOException("could not read page in col " + descriptor, e);
+    }
+  }
+
+  private void readPageV1(DataPageV1 page) {
+    ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
+    ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
+    this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
+    this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
+    try {
+      byte[] bytes = page.getBytes().toByteArray();
+      LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
+      LOG.debug("reading repetition levels at 0");
+      rlReader.initFromPage(pageValueCount, bytes, 0);
+      int next = rlReader.getNextOffset();
+      LOG.debug("reading definition levels at " + next);
+      dlReader.initFromPage(pageValueCount, bytes, next);
+      next = dlReader.getNextOffset();
+      LOG.debug("reading data at " + next);
+      initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e);
+    }
+  }
+
+  private void readPageV2(DataPageV2 page) {
+    this.pageValueCount = page.getValueCount();
+    this.repetitionLevelColumn = newRLEIterator(descriptor.getMaxRepetitionLevel(),
+      page.getRepetitionLevels());
+    this.definitionLevelColumn = newRLEIterator(descriptor.getMaxDefinitionLevel(), page.getDefinitionLevels());
+    try {
+      LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records");
+      initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount());
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read page " + page + " in col " + descriptor, e);
+    }
+  }
+
+  private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
+    try {
+      if (maxLevel == 0) {
+        return new NullIntIterator();
+      }
+      return new RLEIntIterator(
+        new RunLengthBitPackingHybridDecoder(
+          BytesUtils.getWidthFromMaxInt(maxLevel),
+          new ByteArrayInputStream(bytes.toByteArray())));
+    } catch (IOException e) {
+      throw new ParquetDecodingException("could not read levels in page for col " + descriptor, e);
+    }
+  }
+
+  /**
+   * Utility classes to abstract over different way to read ints with different encodings.
+   * TODO: remove this layer of abstraction?
+   */
+  abstract static class IntIterator {
+    abstract int nextInt();
+  }
+
+  protected static final class ValuesReaderIntIterator extends IntIterator {
+    ValuesReader delegate;
+
+    public ValuesReaderIntIterator(ValuesReader delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    int nextInt() {
+      return delegate.readInteger();
+    }
+  }
+
+  protected static final class RLEIntIterator extends IntIterator {
+    RunLengthBitPackingHybridDecoder delegate;
+
+    public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    int nextInt() {
+      try {
+        return delegate.readInt();
+      } catch (IOException e) {
+        throw new ParquetDecodingException(e);
+      }
+    }
+  }
+
+  protected static final class NullIntIterator extends IntIterator {
+    @Override
+    int nextInt() { return 0; }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9a524ada/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java
new file mode 100644
index 0000000..cc6cb20
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.parquet.vector;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+
+import java.io.IOException;
+import java.util.List;
+
+public class VectorizedStructColumnReader implements VectorizedColumnReader {
+
+  private final List<VectorizedColumnReader> fieldReaders;
+
+  public VectorizedStructColumnReader(List<VectorizedColumnReader> fieldReaders) {
+    this.fieldReaders = fieldReaders;
+  }
+
+  @Override
+  public void readBatch(
+    int total,
+    ColumnVector column,
+    TypeInfo columnType) throws IOException {
+    StructColumnVector structColumnVector = (StructColumnVector) column;
+    StructTypeInfo structTypeInfo = (StructTypeInfo) columnType;
+    ColumnVector[] vectors = structColumnVector.fields;
+    for (int i = 0; i < vectors.length; i++) {
+      fieldReaders.get(i)
+        .readBatch(total, vectors[i], structTypeInfo.getAllStructFieldTypeInfos().get(i));
+      structColumnVector.isRepeating = structColumnVector.isRepeating && vectors[i].isRepeating;
+
+      for (int j = 0; j < vectors[i].isNull.length; j++) {
+        structColumnVector.isNull[j] =
+          (i == 0) ? vectors[i].isNull[j] : structColumnVector.isNull[j] && vectors[i].isNull[j];
+      }
+      structColumnVector.noNulls =
+        (i == 0) ? vectors[i].noNulls : structColumnVector.noNulls && vectors[i].noNulls;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9a524ada/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
index 276ff19..d4b4140 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
@@ -1,9 +1,13 @@
 /**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,416 +18,74 @@
 
 package org.apache.hadoop.hive.ql.io.parquet;
 
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
-import org.apache.hadoop.hive.ql.io.IOConstants;
-import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
-import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
-import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.parquet.example.data.Group;
-import org.apache.parquet.example.data.simple.SimpleGroupFactory;
-import org.apache.parquet.hadoop.ParquetInputFormat;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.example.GroupReadSupport;
-import org.apache.parquet.hadoop.example.GroupWriteSupport;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.MessageType;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.Random;
 
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.TestCase.assertFalse;
-import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
-import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA;
-import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
-import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
-import static org.junit.Assert.assertEquals;
-
-public class TestVectorizedColumnReader {
-
-  private static final int nElements = 2500;
-  protected static final Configuration conf = new Configuration();
-  protected static final Path file =
-    new Path("target/test/TestParquetVectorReader/testParquetFile");
-  private static String[] uniqueStrs = new String[nElements];
-  private static boolean[] isNulls = new boolean[nElements];
-  private static Random random = new Random();
-  protected static final MessageType schema = parseMessageType(
-    "message test { "
-      + "required int32 int32_field; "
-      + "required int64 int64_field; "
-      + "required int96 int96_field; "
-      + "required double double_field; "
-      + "required float float_field; "
-      + "required boolean boolean_field; "
-      + "required fixed_len_byte_array(3) flba_field; "
-      + "optional fixed_len_byte_array(1) some_null_field; "
-      + "optional fixed_len_byte_array(1) all_null_field; "
-      + "optional binary binary_field; "
-      + "optional binary binary_field_non_repeating; "
-      + "} ");
-
-  @AfterClass
-  public static void cleanup() throws IOException {
-    FileSystem fs = file.getFileSystem(conf);
-    if (fs.exists(file)) {
-      fs.delete(file, true);
-    }
-  }
+public class TestVectorizedColumnReader extends TestVectorizedColumnReaderBase {
+  static boolean isDictionaryEncoding = false;
 
   @BeforeClass
-  public static void prepareFile() throws IOException {
-    cleanup();
-
-    boolean dictionaryEnabled = true;
-    boolean validating = false;
-    GroupWriteSupport.setSchema(schema, conf);
-    SimpleGroupFactory f = new SimpleGroupFactory(schema);
-    ParquetWriter<Group> writer = new ParquetWriter<Group>(
-      file,
-      new GroupWriteSupport(),
-      GZIP, 1024*1024, 1024, 1024*1024,
-      dictionaryEnabled, validating, PARQUET_1_0, conf);
-    writeData(f, writer);
-  }
-
-  protected static void writeData(SimpleGroupFactory f, ParquetWriter<Group> writer) throws IOException {
-    initialStrings(uniqueStrs);
-    for (int i = 0; i < nElements; i++) {
-      Group group = f.newGroup()
-        .append("int32_field", i)
-        .append("int64_field", (long) 2 * i)
-        .append("int96_field", Binary.fromReusedByteArray("999999999999".getBytes()))
-        .append("double_field", i * 1.0)
-        .append("float_field", ((float) (i * 2.0)))
-        .append("boolean_field", i % 5 == 0)
-        .append("flba_field", "abc");
-
-      if (i % 2 == 1) {
-        group.append("some_null_field", "x");
-      }
-
-      if (i % 13 != 1) {
-        int binaryLen = i % 10;
-        group.append("binary_field",
-          Binary.fromString(new String(new char[binaryLen]).replace("\0", "x")));
-      }
-
-      if (uniqueStrs[i] != null) {
-        group.append("binary_field_non_repeating", Binary.fromString(uniqueStrs[i]));
-      }
-      writer.write(group);
-    }
-    writer.close();
+  public static void setup() throws IOException {
+    removeFile();
+    writeData(initWriterFromFile(), isDictionaryEncoding);
   }
 
-  private static String getRandomStr() {
-    int len = random.nextInt(10);
-    StringBuilder sb = new StringBuilder();
-    for (int i = 0; i < len; i++) {
-      sb.append((char) ('a' + random.nextInt(25)));
-    }
-    return sb.toString();
-  }
-
-  public static void initialStrings(String[] uniqueStrs) {
-    for (int i = 0; i < uniqueStrs.length; i++) {
-      String str = getRandomStr();
-      if (!str.isEmpty()) {
-        uniqueStrs[i] = str;
-        isNulls[i] = false;
-      }else{
-        isNulls[i] = true;
-      }
-    }
+  @AfterClass
+  public static void cleanup() throws IOException {
+    removeFile();
   }
 
-  private VectorizedParquetRecordReader createParquetReader(String schemaString, Configuration conf)
-    throws IOException, InterruptedException, HiveException {
-    conf.set(PARQUET_READ_SCHEMA, schemaString);
-    HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
-    HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp");
-
-    Job vectorJob = new Job(conf, "read vector");
-    ParquetInputFormat.setInputPaths(vectorJob, file);
-    ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class);
-    InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0);
-    initialVectorizedRowBatchCtx(conf);
-    return new VectorizedParquetRecordReader(split, new JobConf(conf));
+  @Test
+  public void testIntRead() throws Exception {
+    intRead(isDictionaryEncoding);
   }
 
-  private void initialVectorizedRowBatchCtx(Configuration conf) throws HiveException {
-    MapWork mapWork = new MapWork();
-    VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx();
-    rbCtx.init(createStructObjectInspector(conf), new String[0]);
-    mapWork.setVectorMode(true);
-    mapWork.setVectorizedRowBatchCtx(rbCtx);
-    Utilities.setMapWork(conf, mapWork);
+  @Test
+  public void testLongRead() throws Exception {
+    longRead(isDictionaryEncoding);
   }
 
-  private StructObjectInspector createStructObjectInspector(Configuration conf) {
-    // Create row related objects
-    String columnNames = conf.get(IOConstants.COLUMNS);
-    List<String> columnNamesList = DataWritableReadSupport.getColumnNames(columnNames);
-    String columnTypes = conf.get(IOConstants.COLUMNS_TYPES);
-    List<TypeInfo> columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes);
-    TypeInfo rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList);
-    return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo);
+  @Test
+  public void testDoubleRead() throws Exception {
+    doubleRead(isDictionaryEncoding);
   }
 
   @Test
-  public void testIntRead() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set(IOConstants.COLUMNS,"int32_field");
-    conf.set(IOConstants.COLUMNS_TYPES,"int");
-    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
-    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
-    VectorizedParquetRecordReader reader =
-      createParquetReader("message test { required int32 int32_field;}", conf);
-    VectorizedRowBatch previous = reader.createValue();
-    try {
-      long c = 0;
-      while (reader.next(NullWritable.get(), previous)) {
-        LongColumnVector vector = (LongColumnVector) previous.cols[0];
-        assertTrue(vector.noNulls);
-        for (int i = 0; i < vector.vector.length; i++) {
-          if(c == nElements){
-            break;
-          }
-          assertEquals(c, vector.vector[i]);
-          assertFalse(vector.isNull[i]);
-          c++;
-        }
-      }
-      assertEquals(nElements, c);
-    } finally {
-      reader.close();
-    }
+  public void testFloatRead() throws Exception {
+    floatRead(isDictionaryEncoding);
   }
 
   @Test
-  public void testLongRead() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set(IOConstants.COLUMNS,"int64_field");
-    conf.set(IOConstants.COLUMNS_TYPES, "bigint");
-    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
-    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
-    VectorizedParquetRecordReader reader =
-      createParquetReader("message test { required int64 int64_field;}", conf);
-    VectorizedRowBatch previous = reader.createValue();
-    try {
-      long c = 0;
-      while (reader.next(NullWritable.get(), previous)) {
-        LongColumnVector vector = (LongColumnVector) previous.cols[0];
-        assertTrue(vector.noNulls);
-        for (int i = 0; i < vector.vector.length; i++) {
-          if(c == nElements){
-            break;
-          }
-          assertEquals(2 * c, vector.vector[i]);
-          assertFalse(vector.isNull[i]);
-          c++;
-        }
-      }
-      assertEquals(nElements, c);
-    } finally {
-      reader.close();
-    }
+  public void testBooleanRead() throws Exception {
+    booleanRead();
   }
 
   @Test
-  public void testDoubleRead() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set(IOConstants.COLUMNS,"double_field");
-    conf.set(IOConstants.COLUMNS_TYPES, "double");
-    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
-    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
-    VectorizedParquetRecordReader reader =
-      createParquetReader("message test { required double double_field;}", conf);
-    VectorizedRowBatch previous = reader.createValue();
-    try {
-      long c = 0;
-      while (reader.next(NullWritable.get(), previous)) {
-        DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0];
-        assertTrue(vector.noNulls);
-        for (int i = 0; i < vector.vector.length; i++) {
-          if(c == nElements){
-            break;
-          }
-          assertEquals(1.0 * c, vector.vector[i], 0);
-          assertFalse(vector.isNull[i]);
-          c++;
-        }
-      }
-      assertEquals(nElements, c);
-    } finally {
-      reader.close();
-    }
+  public void testBinaryRead() throws Exception {
+    binaryRead(isDictionaryEncoding);
   }
 
   @Test
-  public void testFloatRead() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set(IOConstants.COLUMNS,"float_field");
-    conf.set(IOConstants.COLUMNS_TYPES, "float");
-    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
-    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
-    VectorizedParquetRecordReader reader =
-      createParquetReader("message test { required float float_field;}", conf);
-    VectorizedRowBatch previous = reader.createValue();
-    try {
-      long c = 0;
-      while (reader.next(NullWritable.get(), previous)) {
-        DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0];
-        assertTrue(vector.noNulls);
-        for (int i = 0; i < vector.vector.length; i++) {
-          if(c == nElements){
-            break;
-          }
-          assertEquals((float)2.0 * c, vector.vector[i], 0);
-          assertFalse(vector.isNull[i]);
-          c++;
-        }
-      }
-      assertEquals(nElements, c);
-    } finally {
-      reader.close();
-    }
+  public void testStructRead() throws Exception {
+    structRead(isDictionaryEncoding);
   }
 
   @Test
-  public void testBooleanRead() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set(IOConstants.COLUMNS,"boolean_field");
-    conf.set(IOConstants.COLUMNS_TYPES, "boolean");
-    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
-    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
-    VectorizedParquetRecordReader reader =
-      createParquetReader("message test { required boolean boolean_field;}", conf);
-    VectorizedRowBatch previous = reader.createValue();
-    try {
-      long c = 0;
-      while (reader.next(NullWritable.get(), previous)) {
-        LongColumnVector vector = (LongColumnVector) previous.cols[0];
-        assertTrue(vector.noNulls);
-        for (int i = 0; i < vector.vector.length; i++) {
-          if(c == nElements){
-            break;
-          }
-          int e = (c % 5 == 0) ? 1 : 0;
-          assertEquals(e, vector.vector[i]);
-          assertFalse(vector.isNull[i]);
-          c++;
-        }
-      }
-      assertEquals(nElements, c);
-    } finally {
-      reader.close();
-    }
+  public void testNestedStructRead() throws Exception {
+    nestedStructRead0(isDictionaryEncoding);
+    nestedStructRead1(isDictionaryEncoding);
   }
 
   @Test
-  public void testBinaryReadDictionaryEncoding() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set(IOConstants.COLUMNS,"binary_field");
-    conf.set(IOConstants.COLUMNS_TYPES, "string");
-    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
-    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
-    VectorizedParquetRecordReader reader =
-      createParquetReader("message test { required binary binary_field;}", conf);
-    VectorizedRowBatch previous = reader.createValue();
-    int c = 0;
-    try {
-      while (reader.next(NullWritable.get(), previous)) {
-        BytesColumnVector vector = (BytesColumnVector) previous.cols[0];
-        boolean noNull = true;
-        for (int i = 0; i < vector.vector.length; i++) {
-          if(c == nElements){
-            break;
-          }
-          if (c % 13 == 1) {
-            assertTrue(vector.isNull[i]);
-          } else {
-            assertFalse(vector.isNull[i]);
-            int binaryLen = c % 10;
-            String expected = new String(new char[binaryLen]).replace("\0", "x");
-            String actual = new String(ArrayUtils
-              .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i]));
-            assertEquals("Failed at " + c, expected, actual);
-            noNull = false;
-          }
-          c++;
-        }
-        assertEquals("No Null check failed at " + c, noNull, vector.noNulls);
-        assertFalse(vector.isRepeating);
-      }
-      assertEquals(nElements, c);
-    } finally {
-      reader.close();
-    }
+  public void structReadSomeNull() throws Exception {
+    structReadSomeNull(isDictionaryEncoding);
   }
 
   @Test
-  public void testBinaryRead() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set(IOConstants.COLUMNS,"binary_field_non_repeating");
-    conf.set(IOConstants.COLUMNS_TYPES, "string");
-    conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
-    conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0");
-    VectorizedParquetRecordReader reader =
-      createParquetReader("message test { required binary binary_field_non_repeating;}", conf);
-    VectorizedRowBatch previous = reader.createValue();
-    int c = 0;
-    try {
-      while (reader.next(NullWritable.get(), previous)) {
-        BytesColumnVector vector = (BytesColumnVector) previous.cols[0];
-        boolean noNull = true;
-        for (int i = 0; i < vector.vector.length; i++) {
-          if(c == nElements){
-            break;
-          }
-          String actual;
-          assertEquals("Null assert failed at " + c, isNulls[c], vector.isNull[i]);
-          if (!vector.isNull[i]) {
-            actual = new String(ArrayUtils
-              .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i]));
-            assertEquals("failed at " + c, uniqueStrs[c], actual);
-          }else{
-            noNull = false;
-          }
-          c++;
-        }
-        assertEquals("No Null check failed at " + c, noNull, vector.noNulls);
-        assertFalse(vector.isRepeating);
-      }
-      assertEquals("It doesn't exit at expected position", nElements, c);
-    } finally {
-      reader.close();
-    }
+  public void decimalRead() throws Exception {
+    decimalRead(isDictionaryEncoding);
   }
 }


Mime
View raw message