drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject [5/5] drill git commit: DRILL-2262: selecting columns of certain datatypes from a dictionary encoded parquet file created by drill fails
Date Sat, 07 Mar 2015 03:18:19 GMT
DRILL-2262: selecting columns of certain datatypes from a dictionary encoded parquet file created
by drill fails


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/fe94970e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/fe94970e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/fe94970e

Branch: refs/heads/master
Commit: fe94970e1fbfd6ceacbfc7f2187c5495d497e791
Parents: deba946
Author: adeneche <adeneche@gmail.com>
Authored: Thu Feb 26 19:55:08 2015 -0800
Committer: Steven Phillips <sphillips@maprtech.com>
Committed: Fri Mar 6 18:24:38 2015 -0800

----------------------------------------------------------------------
 .../columnreaders/ColumnReaderFactory.java      |  88 ++++++++++----
 .../NullableFixedByteAlignedReaders.java        |  94 +++++++++++++++
 .../ParquetFixedWidthDictionaryReaders.java     | 114 ++++++++++++++++++-
 .../columnreaders/TestColumnReaderFactory.java  |  84 ++++++++++++++
 .../parquet/decimal_dictionary.parquet          | Bin 0 -> 602 bytes
 .../parquet/decimal_nodictionary.parquet        | Bin 0 -> 558 bytes
 .../resources/parquet/time_dictionary.parquet   | Bin 0 -> 618 bytes
 .../resources/parquet/time_nodictionary.parquet | Bin 0 -> 576 bytes
 8 files changed, 356 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/fe94970e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index cbdc0b0..70b2342 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -19,25 +19,33 @@ package org.apache.drill.exec.store.parquet.columnreaders;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.exception.SchemaChangeException;
+
 import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.Decimal18Vector;
 import org.apache.drill.exec.vector.Decimal28SparseVector;
 import org.apache.drill.exec.vector.Decimal38SparseVector;
+import org.apache.drill.exec.vector.Decimal9Vector;
 import org.apache.drill.exec.vector.Float4Vector;
 import org.apache.drill.exec.vector.Float8Vector;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableDecimal18Vector;
 import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
 import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
+import org.apache.drill.exec.vector.NullableDecimal9Vector;
 import org.apache.drill.exec.vector.NullableFloat4Vector;
 import org.apache.drill.exec.vector.NullableFloat8Vector;
 import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableTimeStampVector;
+import org.apache.drill.exec.vector.NullableTimeVector;
 import org.apache.drill.exec.vector.NullableVarBinaryVector;
 import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.TimeStampVector;
+import org.apache.drill.exec.vector.TimeVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarBinaryVector;
 import org.apache.drill.exec.vector.VarCharVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
-
 import parquet.column.ColumnDescriptor;
 import parquet.column.Encoding;
 import parquet.format.ConvertedType;
@@ -83,9 +91,29 @@ public class ColumnReaderFactory {
         if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
           switch (columnChunkMetaData.getType()) {
             case INT32:
-              return new ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader,
allocateSize, descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement);
+              if (convertedType == null) {
+                return new ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader,
allocateSize, descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement);
+              }
+              switch (convertedType) {
+                case DECIMAL:
+                  return new ParquetFixedWidthDictionaryReaders.DictionaryDecimal9Reader(recordReader,
allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal9Vector) v, schemaElement);
+                case TIME_MILLIS:
+                  return new ParquetFixedWidthDictionaryReaders.DictionaryTimeReader(recordReader,
allocateSize, descriptor, columnChunkMetaData, fixedLength, (TimeVector) v, schemaElement);
+                default:
+                  throw new ExecutionSetupException("Unsupported dictionary converted type
" + convertedType + " for primitive type INT32");
+              }
             case INT64:
-              return new ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader,
allocateSize, descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
+              if (convertedType == null) {
+                return new ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader,
allocateSize, descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
+              }
+              switch (convertedType) {
+                case DECIMAL:
+                  return new ParquetFixedWidthDictionaryReaders.DictionaryDecimal18Reader(recordReader,
allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal18Vector) v, schemaElement);
+                case TIMESTAMP_MILLIS:
+                  return new ParquetFixedWidthDictionaryReaders.DictionaryTimeStampReader(recordReader,
allocateSize, descriptor, columnChunkMetaData, fixedLength, (TimeStampVector) v, schemaElement);
+                default:
+                  throw new ExecutionSetupException("Unsupported dictionary converted type
" + convertedType + " for primitive type INT64");
+              }
             case FLOAT:
               return new ParquetFixedWidthDictionaryReaders.DictionaryFloat4Reader(recordReader,
allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float4Vector) v, schemaElement);
             case DOUBLE:
@@ -169,28 +197,42 @@ public class ColumnReaderFactory {
                                                              boolean fixedLength,
                                                              ValueVector valueVec,
                                                              SchemaElement schemaElement)
throws ExecutionSetupException {
+    ConvertedType convertedType = schemaElement.getConverted_type();
+
     if (! columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
-      return new NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData,
-          fixedLength, valueVec, schemaElement);
+      return new NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, valueVec, schemaElement);
     } else {
-      if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64) {
-        return new NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData,
-            fixedLength, (NullableBigIntVector)valueVec, schemaElement);
-      }
-      else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32) {
-        return new NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData,
-            fixedLength, (NullableIntVector)valueVec, schemaElement);
-      }
-      else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT) {
-        return new NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData,
-            fixedLength, (NullableFloat4Vector)valueVec, schemaElement);
-      }
-      else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE) {
-        return new NullableFixedByteAlignedReaders.NullableDictionaryFloat8Reader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData,
-            fixedLength, (NullableFloat8Vector)valueVec, schemaElement);
-      }
-      else{
-        throw new ExecutionSetupException("Unsupported nullable column type " + columnDescriptor.getType().name()
);
+      switch (columnDescriptor.getType()) {
+        case INT32:
+          if (convertedType == null) {
+            return new NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableIntVector) valueVec,
schemaElement);
+          }
+          switch (convertedType) {
+            case DECIMAL:
+              return new NullableFixedByteAlignedReaders.NullableDictionaryDecimal9Reader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableDecimal9Vector)
valueVec, schemaElement);
+            case TIME_MILLIS:
+              return new NullableFixedByteAlignedReaders.NullableDictionaryTimeReader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeVector)valueVec,
schemaElement);
+            default:
+              throw new ExecutionSetupException("Unsupported nullable converted type " +
convertedType + " for primitive type INT32");
+          }
+        case INT64:
+          if (convertedType == null) {
+            return new NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableBigIntVector)valueVec,
schemaElement);
+          }
+          switch (convertedType) {
+            case DECIMAL:
+              return new NullableFixedByteAlignedReaders.NullableDictionaryDecimal18Reader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableDecimal18Vector)valueVec,
schemaElement);
+            case TIMESTAMP_MILLIS:
+              return new NullableFixedByteAlignedReaders.NullableDictionaryTimeStampReader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeStampVector)valueVec,
schemaElement);
+            default:
+              throw new ExecutionSetupException("Unsupported nullable converted type " +
convertedType + " for primitive type INT64");
+          }
+        case FLOAT:
+          return new NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableFloat4Vector)valueVec,
schemaElement);
+        case DOUBLE:
+          return new NullableFixedByteAlignedReaders.NullableDictionaryFloat8Reader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableFloat8Vector)valueVec,
schemaElement);
+        default:
+          throw new ExecutionSetupException("Unsupported nullable column type " + columnDescriptor.getType().name()
);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/fe94970e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index 707bc9c..8087118 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -34,6 +34,10 @@ import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
 import org.apache.drill.exec.vector.NullableFloat4Vector;
 import org.apache.drill.exec.vector.NullableFloat8Vector;
 import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableDecimal9Vector;
+import org.apache.drill.exec.vector.NullableDecimal18Vector;
+import org.apache.drill.exec.vector.NullableTimeVector;
+import org.apache.drill.exec.vector.NullableTimeStampVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.joda.time.DateTimeUtils;
 
@@ -84,6 +88,52 @@ public class NullableFixedByteAlignedReaders {
     }
   }
 
+  static class NullableDictionaryDecimal9Reader extends NullableColumnReader<NullableDecimal9Vector>
{
+
+    NullableDictionaryDecimal9Reader(ParquetRecordReader parentReader, int allocateSize,
ColumnDescriptor descriptor,
+                                ColumnChunkMetaData columnChunkMetaData, boolean fixedLength,
NullableDecimal9Vector v,
+                                SchemaElement schemaElement) throws ExecutionSetupException
{
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v,
schemaElement);
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+      if (usingDictionary) {
+        for (int i = 0; i < recordsToReadInThisPass; i++){
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger());
+        }
+      } else {
+        for (int i = 0; i < recordsToReadInThisPass; i++){
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readInteger());
+        }
+      }
+    }
+  }
+
+  static class NullableDictionaryTimeReader extends NullableColumnReader<NullableTimeVector>
{
+
+    NullableDictionaryTimeReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor
descriptor,
+                                     ColumnChunkMetaData columnChunkMetaData, boolean fixedLength,
NullableTimeVector v,
+                                     SchemaElement schemaElement) throws ExecutionSetupException
{
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v,
schemaElement);
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+      if (usingDictionary) {
+        for (int i = 0; i < recordsToReadInThisPass; i++){
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger());
+        }
+      } else {
+        for (int i = 0; i < recordsToReadInThisPass; i++){
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readInteger());
+        }
+      }
+    }
+  }
+
   static class NullableDictionaryBigIntReader extends NullableColumnReader<NullableBigIntVector>
{
 
     NullableDictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor
descriptor,
@@ -107,6 +157,50 @@ public class NullableFixedByteAlignedReaders {
     }
   }
 
+  static class NullableDictionaryTimeStampReader extends NullableColumnReader<NullableTimeStampVector>
{
+
+    NullableDictionaryTimeStampReader(ParquetRecordReader parentReader, int allocateSize,
ColumnDescriptor descriptor,
+                                   ColumnChunkMetaData columnChunkMetaData, boolean fixedLength,
NullableTimeStampVector v,
+                                   SchemaElement schemaElement) throws ExecutionSetupException
{
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v,
schemaElement);
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+      if (usingDictionary) {
+        for (int i = 0; i < recordsToReadInThisPass; i++){
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong());
+        }
+      } else {
+        for (int i = 0; i < recordsToReadInThisPass; i++){
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readLong());
+        }
+      }
+    }
+  }
+  static class NullableDictionaryDecimal18Reader extends NullableColumnReader<NullableDecimal18Vector>
{
+
+    NullableDictionaryDecimal18Reader(ParquetRecordReader parentReader, int allocateSize,
ColumnDescriptor descriptor,
+                                   ColumnChunkMetaData columnChunkMetaData, boolean fixedLength,
NullableDecimal18Vector v,
+                                   SchemaElement schemaElement) throws ExecutionSetupException
{
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v,
schemaElement);
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+      if (usingDictionary) {
+        for (int i = 0; i < recordsToReadInThisPass; i++){
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong());
+        }
+      } else {
+        for (int i = 0; i < recordsToReadInThisPass; i++){
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readLong());
+        }
+      }
+    }
+  }
   static class NullableDictionaryFloat4Reader extends NullableColumnReader<NullableFloat4Vector>
{
 
     NullableDictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor
descriptor,

http://git-wip-us.apache.org/repos/asf/drill/blob/fe94970e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
index 02b583a..15f4d28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
@@ -18,11 +18,15 @@
 package org.apache.drill.exec.store.parquet.columnreaders;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+
 import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.Decimal18Vector;
+import org.apache.drill.exec.vector.Decimal9Vector;
 import org.apache.drill.exec.vector.Float4Vector;
 import org.apache.drill.exec.vector.Float8Vector;
 import org.apache.drill.exec.vector.IntVector;
-
+import org.apache.drill.exec.vector.TimeStampVector;
+import org.apache.drill.exec.vector.TimeVector;
 import parquet.column.ColumnDescriptor;
 import parquet.format.SchemaElement;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -55,6 +59,58 @@ public class ParquetFixedWidthDictionaryReaders {
     }
   }
 
+  static class DictionaryDecimal9Reader extends FixedByteAlignedReader {
+
+    Decimal9Vector castedVector;
+
+    DictionaryDecimal9Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor
descriptor,
+                        ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Decimal9Vector
v,
+                        SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v,
schemaElement);
+      castedVector = v;
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+
+      recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+        - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+      if (usingDictionary) {
+        for (int i = 0; i < recordsReadInThisIteration; i++){
+          castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger());
+        }
+      }
+    }
+  }
+
+  static class DictionaryTimeReader extends FixedByteAlignedReader {
+
+    TimeVector castedVector;
+
+    DictionaryTimeReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor
descriptor,
+                        ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeVector
v,
+                        SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v,
schemaElement);
+      castedVector = v;
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+
+      recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+        - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+      if (usingDictionary) {
+        for (int i = 0; i < recordsReadInThisIteration; i++){
+          castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger());
+        }
+      }
+    }
+  }
+
   static class DictionaryBigIntReader extends FixedByteAlignedReader {
 
     BigIntVector castedVector;
@@ -83,6 +139,62 @@ public class ParquetFixedWidthDictionaryReaders {
     }
   }
 
+  static class DictionaryDecimal18Reader extends FixedByteAlignedReader {
+
+    Decimal18Vector castedVector;
+
+    DictionaryDecimal18Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor
descriptor,
+                           ColumnChunkMetaData columnChunkMetaData, boolean fixedLength,
Decimal18Vector v,
+                           SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v,
schemaElement);
+      castedVector = v;
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+
+      recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+        - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+      for (int i = 0; i < recordsReadInThisIteration; i++){
+        try {
+          castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong());
+        } catch ( Exception ex) {
+          throw ex;
+        }
+      }
+    }
+  }
+
+  static class DictionaryTimeStampReader extends FixedByteAlignedReader {
+
+    TimeStampVector castedVector;
+
+    DictionaryTimeStampReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor
descriptor,
+                           ColumnChunkMetaData columnChunkMetaData, boolean fixedLength,
TimeStampVector v,
+                           SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v,
schemaElement);
+      castedVector = v;
+    }
+
+    // this method is called by its superclass during a read loop
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+
+      recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+        - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+      for (int i = 0; i < recordsReadInThisIteration; i++){
+        try {
+          castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong());
+        } catch ( Exception ex) {
+          throw ex;
+        }
+      }
+    }
+  }
+
   static class DictionaryFloat4Reader extends FixedByteAlignedReader {
 
     Float4Vector castedVector;

http://git-wip-us.apache.org/repos/asf/drill/blob/fe94970e/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestColumnReaderFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestColumnReaderFactory.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestColumnReaderFactory.java
new file mode 100644
index 0000000..9ae6b78
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestColumnReaderFactory.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.BaseTestQuery;
+import org.junit.Test;
+
+public class TestColumnReaderFactory extends BaseTestQuery {
+
+  /**
+   * check if Time and TimeStamp are read correctly with dictionary encoding.
+   */
+  @Test
+  public void testTimeAndTimeStampWithDictionary() throws Exception {
+    // the file 'time_dictionary.parquet' uses a PLAIN_DICTIONARY encoding and contains 4
columns:
+    // time_opt: INT32/TIME_MILLIS/OPTIONAL
+    // time_req: INT32/TIME_MILLIS/REQUIRED
+    // timestampt_opt: INT64/TIMESTAMP_MILLIS/OPTIONAL
+    // timestampt_req: INT64/TIMESTAMP_MILLIS/REQUIRED
+
+    // query parquet file. We shouldn't get any exception
+    testNoResult("SELECT * FROM cp.`parquet/time_dictionary.parquet`");
+  }
+
+  /**
+   * check if Time and TimeStamp are read correctly with plain encoding.
+   */
+  @Test
+  public void testTimeAndTimeStampWithNoDictionary() throws Exception {
+    // the file 'time_dictionary.parquet' uses a PLAIN encoding and contains 4 columns:
+    // time_opt: INT32/TIME_MILLIS/OPTIONAL
+    // time_req: INT32/TIME_MILLIS/REQUIRED
+    // timestampt_opt: INT64/TIMESTAMP_MILLIS/OPTIONAL
+    // timestampt_req: INT64/TIMESTAMP_MILLIS/REQUIRED
+
+    // query parquet file. We shouldn't get any exception
+    testNoResult("SELECT * FROM cp.`parquet/time_nodictionary.parquet`");
+  }
+
+  /**
+   * check if Decimal9 and Decimal18 are read correctly with dictionary encoding.
+   */
+  @Test
+  public void testDecimal9AndDecimal18WithDictionary() throws Exception {
+    // the file 'decimal_dictionary.parquet' uses a PLAIN_DICTIONARY encoding and contains
4 columns:
+    // d9_opt: INT32/DECIMAL9/OPTIONAL
+    // d9_req: INT32/DECIMAL9/REQUIRED
+    // d18_opt: INT64/DECIMAL18/OPTIONAL
+    // d18_req: INT64/DECIMAL18/REQUIRED
+
+    // query parquet file. We shouldn't get any exception
+    testNoResult("SELECT * FROM cp.`parquet/decimal_dictionary.parquet`");
+  }
+
+  /**
+   * check if Decimal9 and Decimal18 are read correctly with plain encoding.
+   */
+  @Test
+  public void testDecimal9AndDecimal18WithNoDictionary() throws Exception {
+    // the file 'decimal_dictionary.parquet' uses a PLAIN encoding and contains 4 columns:
+    // d9_opt: INT32/DECIMAL9/OPTIONAL
+    // d9_req: INT32/DECIMAL9/REQUIRED
+    // d18_opt: INT64/DECIMAL18/OPTIONAL
+    // d18_req: INT64/DECIMAL18/REQUIRED
+
+    // query parquet file. We shouldn't get any exception
+    testNoResult("SELECT * FROM cp.`parquet/decimal_nodictionary.parquet`");
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/fe94970e/exec/java-exec/src/test/resources/parquet/decimal_dictionary.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/decimal_dictionary.parquet b/exec/java-exec/src/test/resources/parquet/decimal_dictionary.parquet
new file mode 100644
index 0000000..bf1bd9c
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/decimal_dictionary.parquet
differ

http://git-wip-us.apache.org/repos/asf/drill/blob/fe94970e/exec/java-exec/src/test/resources/parquet/decimal_nodictionary.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/decimal_nodictionary.parquet b/exec/java-exec/src/test/resources/parquet/decimal_nodictionary.parquet
new file mode 100644
index 0000000..b9d24c5
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/decimal_nodictionary.parquet
differ

http://git-wip-us.apache.org/repos/asf/drill/blob/fe94970e/exec/java-exec/src/test/resources/parquet/time_dictionary.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/time_dictionary.parquet b/exec/java-exec/src/test/resources/parquet/time_dictionary.parquet
new file mode 100644
index 0000000..5b75154
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/time_dictionary.parquet
differ

http://git-wip-us.apache.org/repos/asf/drill/blob/fe94970e/exec/java-exec/src/test/resources/parquet/time_nodictionary.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/time_nodictionary.parquet b/exec/java-exec/src/test/resources/parquet/time_nodictionary.parquet
new file mode 100644
index 0000000..f78ff17
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/time_nodictionary.parquet
differ


Mime
View raw message