drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From par...@apache.org
Subject [04/15] drill git commit: DRILL-4373: Drill and Hive have incompatible timestamp representations in parquet - added sys/sess option "store.parquet.int96_as_timestamp"; - added int96 to timestamp converter for both readers; - added unit tests;
Date Tue, 01 Nov 2016 20:29:49 GMT
DRILL-4373: Drill and Hive have incompatible timestamp representations in parquet - added sys/sess
option "store.parquet.int96_as_timestamp"; - added int96 to timestamp converter for both readers;
- added unit tests;

This closes #600


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7e7214b4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7e7214b4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7e7214b4

Branch: refs/heads/master
Commit: 7e7214b40784668d1599f265067f789aedb6cf86
Parents: 4a82bc1
Author: Vitalii Diravka <vitalii.diravka@gmail.com>
Authored: Fri Sep 2 21:43:50 2016 +0000
Committer: Parth Chandra <parthc@apache.org>
Committed: Tue Nov 1 10:43:06 2016 -0700

----------------------------------------------------------------------
 ...onvertHiveParquetScanToDrillParquetScan.java |   3 +-
 .../exec/store/hive/HiveStoragePlugin.java      |   2 +
 .../org/apache/drill/exec/ExecConstants.java    |   2 +
 .../impl/conv/ConvertFromImpalaTimestamp.java   |  35 ++++++---
 .../server/options/SystemOptionManager.java     |   1 +
 .../store/parquet/ParquetReaderUtility.java     |  39 ++++++++--
 .../columnreaders/ColumnReaderFactory.java      |  15 +++-
 .../NullableFixedByteAlignedReaders.java        |  28 +++++++
 .../columnreaders/ParquetRecordReader.java      |   4 +
 .../ParquetToDrillTypeConverter.java            |   7 +-
 .../parquet2/DrillParquetGroupConverter.java    |  32 +++++++-
 .../test/java/org/apache/drill/TestBuilder.java |  11 +++
 .../physical/impl/writer/TestParquetWriter.java |  76 +++++++++++++++++--
 .../parquet/int96_dict_change/000000_0          | Bin 0 -> 270 bytes
 .../parquet/int96_dict_change/000000_1          | Bin 0 -> 312 bytes
 .../testInt96DictChange/q1.tsv                  |  12 +++
 16 files changed, 240 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
index c43664c..228308f 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -68,7 +68,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
   public static final ConvertHiveParquetScanToDrillParquetScan INSTANCE = new ConvertHiveParquetScanToDrillParquetScan();
 
   private static final DrillSqlOperator INT96_TO_TIMESTAMP =
-      new DrillSqlOperator("convert_fromTIMESTAMP_IMPALA", 1, true);
+      new DrillSqlOperator("convert_fromTIMESTAMP_IMPALA_LOCALTIMEZONE", 1, true);
 
   private static final DrillSqlOperator RTRIM = new DrillSqlOperator("RTRIM", 1, true);
 
@@ -296,6 +296,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
     if (outputType.getSqlTypeName() == SqlTypeName.TIMESTAMP) {
       // TIMESTAMP is stored as INT96 by Hive in ParquetFormat. Use convert_fromTIMESTAMP_IMPALA
UDF to convert
       // INT96 format data to TIMESTAMP
+      // TODO: Remove this conversion once "store.parquet.reader.int96_as_timestamp" will
be true by default
       return rb.makeCall(INT96_TO_TIMESTAMP, inputRef);
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index 8f8fdba..f99a934 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -114,6 +114,8 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
 
   @Override
   public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext
optimizerRulesContext) {
+    // TODO: Remove implicit using of convert_fromTIMESTAMP_IMPALA function
+    // once "store.parquet.reader.int96_as_timestamp" will be true by default
     if(optimizerRulesContext.getPlannerSettings().getOptions()
         .getOption(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS).bool_val) {
       return ImmutableSet.<StoragePluginOptimizerRule>of(ConvertHiveParquetScanToDrillParquetScan.INSTANCE);

http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 053311f..21015bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -144,6 +144,8 @@ public interface ExecConstants {
   OptionValidator PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR = new PositiveLongValidator(PARQUET_VECTOR_FILL_CHECK_THRESHOLD,
100l, 10l);
   String PARQUET_NEW_RECORD_READER = "store.parquet.use_new_reader";
   OptionValidator PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR = new BooleanValidator(PARQUET_NEW_RECORD_READER,
false);
+  String PARQUET_READER_INT96_AS_TIMESTAMP = "store.parquet.reader.int96_as_timestamp";
+  OptionValidator PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR = new BooleanValidator(PARQUET_READER_INT96_AS_TIMESTAMP,
false);
 
   OptionValidator COMPILE_SCALAR_REPLACEMENT = new BooleanValidator("exec.compile.scalar_replacement",
false);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/ConvertFromImpalaTimestamp.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/ConvertFromImpalaTimestamp.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/ConvertFromImpalaTimestamp.java
index a57eede..38e0514 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/ConvertFromImpalaTimestamp.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/ConvertFromImpalaTimestamp.java
@@ -28,6 +28,29 @@ import org.apache.drill.exec.expr.holders.VarBinaryHolder;
 public class ConvertFromImpalaTimestamp {
 
 
+  @FunctionTemplate(name = "convert_fromTIMESTAMP_IMPALA_LOCALTIMEZONE", scope = FunctionTemplate.FunctionScope.SIMPLE,
nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class ImpalaTimestampConvertFromWithLocalTimezone implements DrillSimpleFunc
{
+
+    @Param VarBinaryHolder in;
+    @Output TimeStampHolder out;
+
+
+    @Override
+    public void setup() { }
+
+    @Override
+    public void eval() {
+      org.apache.drill.exec.util.ByteBufUtil.checkBufferLength(in.buffer, in.start, in.end,
12);
+
+      in.buffer.readerIndex(in.start);
+      long nanosOfDay = in.buffer.readLong();
+      int julianDay = in.buffer.readInt();
+      long dateTime = (julianDay - org.apache.drill.exec.store.parquet.ParquetReaderUtility.JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH)
*
+          org.joda.time.DateTimeConstants.MILLIS_PER_DAY + (nanosOfDay / org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.NANOS_PER_MILLISECOND);
+      out.value = new org.joda.time.DateTime(dateTime, org.joda.time.chrono.JulianChronology.getInstance()).withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis();
+    }
+  }
+
   @FunctionTemplate(name = "convert_fromTIMESTAMP_IMPALA", scope = FunctionTemplate.FunctionScope.SIMPLE,
nulls = FunctionTemplate.NullHandling.NULL_IF_NULL)
   public static class ImpalaTimestampConvertFrom implements DrillSimpleFunc {
 
@@ -45,16 +68,8 @@ public class ConvertFromImpalaTimestamp {
       in.buffer.readerIndex(in.start);
       long nanosOfDay = in.buffer.readLong();
       int julianDay = in.buffer.readInt();
-      /* We use the same implementation as org.joda.time.DateTimeUtils.fromJulianDay but
avoid rounding errors
-         Note we need to subtract half of a day because julian days are recorded as starting
at noon.
-         From Joda :
-              public static final long fromJulianDay(double julianDay) {
-                484            double epochDay = julianDay - 2440587.5d;
-                485            return (long) (epochDay * 86400000d);
-                486        }
-      */
-      long dateTime = (julianDay - 2440588)*86400000L + (nanosOfDay / 1000000);
-      out.value = new org.joda.time.DateTime((long) dateTime, org.joda.time.chrono.JulianChronology.getInstance()).withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis();
+      out.value = (julianDay - org.apache.drill.exec.store.parquet.ParquetReaderUtility.JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH)
*
+          org.joda.time.DateTimeConstants.MILLIS_PER_DAY + (nanosOfDay / org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.NANOS_PER_MILLISECOND);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 71ebd7d..f272c9d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -100,6 +100,7 @@ public class SystemOptionManager extends BaseOptionManager implements
AutoClosea
       ExecConstants.PARQUET_VECTOR_FILL_THRESHOLD_VALIDATOR,
       ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR,
       ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR,
+      ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR,
       ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR,
       ExecConstants.ENABLE_UNION_TYPE,
       ExecConstants.TEXT_ESTIMATED_ROW_SIZE,

http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 1f6dc1e..470cc00 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -39,6 +39,8 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.OriginalType;
 import org.joda.time.Chronology;
 import org.joda.time.DateTimeConstants;
+import org.apache.parquet.example.data.simple.NanoTime;
+import org.apache.parquet.io.api.Binary;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -76,21 +78,21 @@ public class ParquetReaderUtility {
    * in the data pages themselves to see if they are likely corrupt.
    */
   public enum DateCorruptionStatus {
-    META_SHOWS_CORRUPTION{
+    META_SHOWS_CORRUPTION {
       @Override
-      public String toString(){
+      public String toString() {
         return "It is determined from metadata that the date values are definitely CORRUPT";
       }
     },
     META_SHOWS_NO_CORRUPTION {
       @Override
-      public String toString(){
+      public String toString() {
         return "It is determined from metadata that the date values are definitely CORRECT";
       }
     },
     META_UNCLEAR_TEST_VALUES {
       @Override
-      public String toString(){
+      public String toString() {
         return "Not enough info in metadata, parquet reader will test individual date values";
       }
     }
@@ -152,7 +154,7 @@ public class ParquetReaderUtility {
             OriginalType originalType = columnMetadata.getOriginalType();
             if (OriginalType.DATE.equals(originalType) && columnMetadata.hasSingleValue()
&&
                 (Integer) columnMetadata.getMaxValue() > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD)
{
-              int newMinMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer)columnMetadata.getMaxValue());
+              int newMinMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) columnMetadata.getMaxValue());
               columnMetadata.setMax(newMinMax);
               columnMetadata.setMin(newMinMax);
             }
@@ -290,4 +292,31 @@ public class ParquetReaderUtility {
     }
     return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
   }
+
+  /**
+   * Utilities for converting from parquet INT96 binary (impala, hive timestamp)
+   * to date time value. This utilizes the Joda library.
+   */
+  public static class NanoTimeUtils {
+
+    public static final long NANOS_PER_MILLISECOND = 1000000;
+
+  /**
+   * @param binaryTimeStampValue
+   *          hive, impala timestamp values with nanoseconds precision
+   *          are stored in parquet Binary as INT96 (12 constant bytes)
+   *
+   * @return  Unix Timestamp - the number of milliseconds since January 1, 1970, 00:00:00
GMT
+   *          represented by @param binaryTimeStampValue .
+   */
+    public static long getDateTimeValueFromBinary(Binary binaryTimeStampValue) {
+      // This method represents binaryTimeStampValue as ByteBuffer, where timestamp is stored
as sum of
+      // julian day number (32-bit) and nanos of day (64-bit)
+      NanoTime nt = NanoTime.fromBinary(binaryTimeStampValue);
+      int julianDay = nt.getJulianDay();
+      long nanosOfDay = nt.getTimeOfDayNanos();
+      return (julianDay - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+          + nanosOfDay / NANOS_PER_MILLISECOND;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index ea65615..662d5c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.parquet.columnreaders;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.BitVector;
@@ -241,7 +242,12 @@ public class ColumnReaderFactory {
 
     if (! columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
       if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT96) {
-        return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec,
schemaElement);
+         // TODO: check convertedType once parquet support TIMESTAMP_NANOS type annotation.
+        if (parentReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val)
{
+          return new NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector) valueVec,
schemaElement);
+        } else {
+          return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec,
schemaElement);
+        }
       }else{
         return new NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader<>(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, valueVec, schemaElement);
       }
@@ -272,7 +278,12 @@ public class ColumnReaderFactory {
               throw new ExecutionSetupException("Unsupported nullable converted type " +
convertedType + " for primitive type INT64");
           }
         case INT96:
-           return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec,
schemaElement);
+          // TODO: check convertedType once parquet support TIMESTAMP_NANOS type annotation.
+          if (parentReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val)
{
+            return new NullableFixedByteAlignedReaders.NullableFixedBinaryAsTimeStampReader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableTimeStampVector) valueVec,
schemaElement);
+          } else {
+            return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec,
schemaElement);
+          }
         case FLOAT:
           return new NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader,
allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, (NullableFloat4Vector)valueVec,
schemaElement);
         case DOUBLE:

http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index df4c1ba..f4fe5ee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -46,6 +46,7 @@ import org.apache.parquet.io.api.Binary;
 import org.joda.time.DateTimeConstants;
 
 import io.netty.buffer.DrillBuf;
+import static org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.getDateTimeValueFromBinary;
 
 public class NullableFixedByteAlignedReaders {
 
@@ -107,6 +108,33 @@ public class NullableFixedByteAlignedReaders {
     }
   }
 
+  /**
+   * Class for reading parquet fixed binary type INT96, which is used for storing hive,
+   * impala timestamp values with nanoseconds precision. So it reads such values as a drill
timestamp.
+   */
+  static class NullableFixedBinaryAsTimeStampReader extends NullableFixedByteAlignedReader<NullableTimeStampVector>
{
+    NullableFixedBinaryAsTimeStampReader(ParquetRecordReader parentReader, int allocateSize,
ColumnDescriptor descriptor,
+                              ColumnChunkMetaData columnChunkMetaData, boolean fixedLength,
NullableTimeStampVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v,
schemaElement);
+    }
+
+    @Override
+    protected void readField(long recordsToReadInThisPass) {
+      this.bytebuf = pageReader.pageData;
+      if (usingDictionary) {
+        for (int i = 0; i < recordsToReadInThisPass; i++){
+          Binary binaryTimeStampValue = pageReader.dictionaryValueReader.readBytes();
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, getDateTimeValueFromBinary(binaryTimeStampValue));
+        }
+      } else {
+        for (int i = 0; i < recordsToReadInThisPass; i++) {
+          Binary binaryTimeStampValue = pageReader.valueReader.readBytes();
+          valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, getDateTimeValueFromBinary(binaryTimeStampValue));
+        }
+      }
+    }
+  }
+
   static class NullableDictionaryIntReader extends NullableColumnReader<NullableIntVector>
{
 
     NullableDictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor
descriptor,

http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index c51c72c..f095a8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -248,6 +248,10 @@ public class ParquetRecordReader extends AbstractRecordReader {
     return operatorContext;
   }
 
+  public FragmentContext getFragmentContext() {
+    return fragmentContext;
+  }
+
   /**
    * Returns data type length for a given {@see ColumnDescriptor} and it's corresponding
    * {@see SchemaElement}. Neither is enough information alone as the max

http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
index b6d1a72..57c0a66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
@@ -21,6 +21,7 @@ import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 
 import org.apache.drill.common.util.CoreDecimalUtility;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.parquet.format.ConvertedType;
@@ -94,7 +95,11 @@ public class ParquetToDrillTypeConverter {
       // TODO - Both of these are not supported by the parquet library yet (7/3/13),
       // but they are declared here for when they are implemented
       case INT96:
-        return TypeProtos.MinorType.VARBINARY;
+        if (options.getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val)
{
+          return TypeProtos.MinorType.TIMESTAMP;
+        } else {
+          return TypeProtos.MinorType.VARBINARY;
+        }
       case FIXED_LEN_BYTE_ARRAY:
         if (convertedType == null) {
           checkArgument(length > 0, "A length greater than zero must be provided for a
FixedBinary type.");

http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index 48a0bfd..2f2db05 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -28,6 +28,7 @@ import java.util.List;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.holders.BigIntHolder;
 import org.apache.drill.exec.expr.holders.BitHolder;
 import org.apache.drill.exec.expr.holders.DateHolder;
@@ -81,6 +82,8 @@ import org.apache.parquet.schema.Type.Repetition;
 
 import com.google.common.collect.Lists;
 
+import static org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.getDateTimeValueFromBinary;
+
 public class DrillParquetGroupConverter extends GroupConverter {
 
   private List<Converter> converters;
@@ -226,9 +229,15 @@ public class DrillParquetGroupConverter extends GroupConverter {
         }
       }
       case INT96: {
+        // TODO: replace null with TIMESTAMP_NANOS once parquet support such type annotation.
         if (type.getOriginalType() == null) {
-          VarBinaryWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).varBinary()
: mapWriter.varBinary(name);
-          return new DrillFixedBinaryToVarbinaryConverter(writer, ParquetRecordReader.getTypeLengthInBits(type.getPrimitiveTypeName())
/ 8, mutator.getManagedBuffer());
+          if (options.getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val)
{
+            TimeStampWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).timeStamp()
: mapWriter.timeStamp(name);
+            return new DrillFixedBinaryToTimeStampConverter(writer);
+          } else {
+            VarBinaryWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).varBinary()
: mapWriter.varBinary(name);
+            return new DrillFixedBinaryToVarbinaryConverter(writer, ParquetRecordReader.getTypeLengthInBits(type.getPrimitiveTypeName())
/ 8, mutator.getManagedBuffer());
+          }
         }
 
       }
@@ -622,4 +631,23 @@ public class DrillParquetGroupConverter extends GroupConverter {
       writer.write(holder);
     }
   }
+
+  /**
+   * Parquet currently supports a fixed binary type INT96 for storing hive, impala timestamp
+   * with nanoseconds precision.
+   */
+  public static class DrillFixedBinaryToTimeStampConverter extends PrimitiveConverter {
+    private TimeStampWriter writer;
+    private TimeStampHolder holder = new TimeStampHolder();
+
+    public DrillFixedBinaryToTimeStampConverter(TimeStampWriter writer) {
+      this.writer = writer;
+    }
+
+    @Override
+    public void addBinary(Binary value) {
+      holder.value = getDateTimeValueFromBinary(value);
+      writer.write(holder);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
index 8acf936..a19b30e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java
@@ -190,12 +190,23 @@ public class TestBuilder {
     return this;
   }
 
+  public TestBuilder optionSettingQueriesForBaseline(String queries, Object... args) {
+    this.baselineOptionSettingQueries = String.format(queries, args);
+    return this;
+  }
+
   // list of queries to run before the test query, can be used to set several options
   // list takes the form of a semi-colon separated list
   public TestBuilder optionSettingQueriesForTestQuery(String queries) {
     this.testOptionSettingQueries = queries;
     return this;
   }
+
+  public TestBuilder optionSettingQueriesForTestQuery(String query, Object... args) throws
Exception {
+    this.testOptionSettingQueries = String.format(query, args);
+    return this;
+  }
+
   public TestBuilder approximateEquality() {
     this.approximateEquality = true;
     return this;

http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 6890394..cf43339 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -32,7 +32,9 @@ import java.util.Map;
 
 import com.google.common.base.Joiner;
 import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.fn.interp.TestConstantFolding;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -739,30 +741,76 @@ public class TestParquetWriter extends BaseTestQuery {
   }
 
   /*
-  Test the reading of an int96 field. Impala encodes timestamps as int96 fields
+    Impala encodes timestamp values as int96 fields. Test the reading of an int96 field with
two converters:
+    the first one converts parquet INT96 into drill VARBINARY and the second one (works while
+    store.parquet.reader.int96_as_timestamp option is enabled) converts parquet INT96 into
drill TIMESTAMP.
    */
   @Test
   public void testImpalaParquetInt96() throws Exception {
     compareParquetReadersColumnar("field_impala_ts", "cp.`parquet/int96_impala_1.parquet`");
+    try {
+      test("alter session set %s = true", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
+      compareParquetReadersColumnar("field_impala_ts", "cp.`parquet/int96_impala_1.parquet`");
+    } finally {
+      test("alter session reset %s", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
+    }
   }
 
   /*
-  Test the reading of a binary field where data is in dicationary _and_ non-dictionary encoded
pages
+  Test the reading of a binary field as drill varbinary where data is in dicationary _and_
non-dictionary encoded pages
    */
   @Test
-  public void testImpalaParquetVarBinary_DictChange() throws Exception {
+  public void testImpalaParquetBinaryAsVarBinary_DictChange() throws Exception {
     compareParquetReadersColumnar("field_impala_ts", "cp.`parquet/int96_dict_change.parquet`");
   }
 
   /*
+  Test the reading of a binary field as drill timestamp where data is in dicationary _and_
non-dictionary encoded pages
+   */
+  @Test
+  public void testImpalaParquetBinaryAsTimeStamp_DictChange() throws Exception {
+    final String WORKING_PATH = TestTools.getWorkingPath();
+    final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
+    try {
+      testBuilder()
+          .sqlQuery("select int96_ts from dfs_test.`%s/parquet/int96_dict_change`", TEST_RES_PATH)
+          .optionSettingQueriesForTestQuery(
+              "alter session set `%s` = true", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP)
+          .ordered()
+          .csvBaselineFile("testframework/testParquetReader/testInt96DictChange/q1.tsv")
+          .baselineTypes(TypeProtos.MinorType.TIMESTAMP)
+          .baselineColumns("int96_ts")
+          .build().run();
+    } finally {
+      test("alter system reset `%s`", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
+    }
+  }
+
+  /*
      Test the conversion from int96 to impala timestamp
    */
   @Test
-  public void testImpalaParquetTimestampAsInt96() throws Exception {
+  public void testTimestampImpalaConvertFrom() throws Exception {
     compareParquetReadersColumnar("convert_from(field_impala_ts, 'TIMESTAMP_IMPALA')", "cp.`parquet/int96_impala_1.parquet`");
   }
 
   /*
+     Test reading parquet Int96 as TimeStamp and comparing obtained values with the
+     old results (reading the same values as VarBinary and convert_fromTIMESTAMP_IMPALA function
using)
+   */
+  @Test
+  public void testImpalaParquetTimestampInt96AsTimeStamp() throws Exception {
+    try {
+      test("alter session set %s = false", ExecConstants.PARQUET_NEW_RECORD_READER);
+      compareParquetInt96Converters("field_impala_ts", "cp.`parquet/int96_impala_1.parquet`");
+      test("alter session set %s = true", ExecConstants.PARQUET_NEW_RECORD_READER);
+      compareParquetInt96Converters("field_impala_ts", "cp.`parquet/int96_impala_1.parquet`");
+    } finally {
+      test("alter session reset `%s`", ExecConstants.PARQUET_NEW_RECORD_READER);
+    }
+  }
+
+  /*
     Test a file with partitions and an int96 column. (Data generated using Hive)
    */
   @Test
@@ -782,7 +830,6 @@ public class TestParquetWriter extends BaseTestQuery {
   Test the conversion from int96 to impala timestamp with hive data including nulls. Validate
against expected values
   */
   @Test
-  @Ignore("relies on particular time zone")
   public void testHiveParquetTimestampAsInt96_basic() throws Exception {
     final String q = "SELECT cast(convert_from(timestamp_field, 'TIMESTAMP_IMPALA') as varchar(19))
 as timestamp_field "
             + "from cp.`parquet/part1/hive_all_types.parquet` ";
@@ -791,7 +838,7 @@ public class TestParquetWriter extends BaseTestQuery {
             .unOrdered()
             .sqlQuery(q)
             .baselineColumns("timestamp_field")
-            .baselineValues("2013-07-05 17:01:00")
+            .baselineValues("2013-07-06 00:01:00")
             .baselineValues((Object)null)
             .go();
   }
@@ -859,5 +906,22 @@ public class TestParquetWriter extends BaseTestQuery {
         "cp.`parquet/last_page_one_null.parquet`");
   }
 
+  private void compareParquetInt96Converters(String selection, String table) throws Exception
{
+    try {
+      testBuilder()
+          .ordered()
+          .sqlQuery("select `%s` from %s", selection, table)
+          .optionSettingQueriesForTestQuery(
+              "alter session set `%s` = true", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP)
+          .sqlBaselineQuery("select convert_from(`%1$s`, 'TIMESTAMP_IMPALA') as `%1$s` from
%2$s", selection, table)
+          .optionSettingQueriesForBaseline(
+              "alter session set `%s` = false", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP)
+          .build()
+          .run();
+    } finally {
+      test("alter system reset `%s`", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
+    }
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_0
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_0 b/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_0
new file mode 100644
index 0000000..8517428
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_0
differ

http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_1
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_1 b/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_1
new file mode 100644
index 0000000..0183b50
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/int96_dict_change/000000_1
differ

http://git-wip-us.apache.org/repos/asf/drill/blob/7e7214b4/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv
b/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv
new file mode 100644
index 0000000..91b9b01
--- /dev/null
+++ b/exec/java-exec/src/test/resources/testframework/testParquetReader/testInt96DictChange/q1.tsv
@@ -0,0 +1,12 @@
+1970-01-01 00:00:01.000
+1971-01-01 00:00:01.000
+1972-01-01 00:00:01.000
+1973-01-01 00:00:01.000
+1974-01-01 00:00:01.000
+2010-01-01 00:00:01.000
+2011-01-01 00:00:01.000
+2012-01-01 00:00:01.000
+2013-01-01 00:00:01.000
+2014-01-01 00:00:01.000
+2015-01-01 00:00:01.000
+2016-01-01 00:00:01.000


Mime
View raw message