spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ues...@apache.org
Subject spark git commit: [SPARK-12297][SQL] Hive compatibility for Parquet Timestamps
Date Mon, 08 May 2017 03:16:11 GMT
Repository: spark
Updated Branches:
  refs/heads/master f53a82072 -> 22691556e


[SPARK-12297][SQL] Hive compatibility for Parquet Timestamps

## What changes were proposed in this pull request?

This change allows timestamps in parquet-based hive table to behave as a "floating time", without a timezone, as timestamps are for other file formats.  If the storage timezone is the same as the session timezone, this conversion is a no-op.  When data is read from a hive table, the table property is *always* respected.  This allows spark to not change behavior when reading old data, but read newly written data correctly (whatever the source of the data is).

Spark inherited the original behavior from Hive, but Hive is also updating behavior to use the same  scheme in HIVE-12767 / HIVE-16231.

The default for Spark remains unchanged; created tables do not include the new table property.

This will only apply to hive tables; nothing is added to parquet metadata to indicate the timezone, so data that is read or written directly from parquet files will never have any conversions applied.

## How was this patch tested?

Added a unit test which creates tables, reads and writes data, under a variety of permutations (different storage timezones, different session timezones, vectorized reading on and off).

Author: Imran Rashid <irashid@cloudera.com>

Closes #16781 from squito/SPARK-12297.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/22691556
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/22691556
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/22691556

Branch: refs/heads/master
Commit: 22691556e5f0dfbac81b8cc9ca0a67c70c1711ca
Parents: f53a820
Author: Imran Rashid <irashid@cloudera.com>
Authored: Mon May 8 12:16:00 2017 +0900
Committer: Takuya UESHIN <ueshin@databricks.com>
Committed: Mon May 8 12:16:00 2017 +0900

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |   4 +-
 .../spark/sql/catalyst/util/DateTimeUtils.scala |   5 +
 .../parquet/VectorizedColumnReader.java         |  28 +-
 .../parquet/VectorizedParquetRecordReader.java  |   6 +-
 .../spark/sql/execution/command/tables.scala    |   8 +-
 .../datasources/parquet/ParquetFileFormat.scala |   2 +
 .../parquet/ParquetReadSupport.scala            |   3 +-
 .../parquet/ParquetRecordMaterializer.scala     |   9 +-
 .../parquet/ParquetRowConverter.scala           |  53 ++-
 .../parquet/ParquetWriteSupport.scala           |  25 +-
 .../spark/sql/hive/HiveExternalCatalog.scala    |  11 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  12 +-
 .../hive/ParquetHiveCompatibilitySuite.scala    | 379 ++++++++++++++++++-
 13 files changed, 516 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/22691556/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index cc0cbba..c39017e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -132,10 +132,10 @@ case class CatalogTablePartition(
   /**
    * Given the partition schema, returns a row with that schema holding the partition values.
    */
-  def toRow(partitionSchema: StructType, defaultTimeZondId: String): InternalRow = {
+  def toRow(partitionSchema: StructType, defaultTimeZoneId: String): InternalRow = {
     val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties)
     val timeZoneId = caseInsensitiveProperties.getOrElse(
-      DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId)
+      DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)
     InternalRow.fromSeq(partitionSchema.map { field =>
       val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
         null

http://git-wip-us.apache.org/repos/asf/spark/blob/22691556/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 6c1592f..bf596fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -498,6 +498,11 @@ object DateTimeUtils {
     false
   }
 
+  lazy val validTimezones = TimeZone.getAvailableIDs().toSet
+  def isValidTimezone(timezoneId: String): Boolean = {
+    validTimezones.contains(timezoneId)
+  }
+
   /**
    * Returns the microseconds since year zero (-17999) from microseconds since epoch.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/22691556/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 9d641b5..dabbc2b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.execution.datasources.parquet;
 
 import java.io.IOException;
+import java.util.TimeZone;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Dictionary;
@@ -30,6 +32,7 @@ import org.apache.parquet.schema.PrimitiveType;
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.DecimalType;
 
@@ -90,11 +93,30 @@ public class VectorizedColumnReader {
 
   private final PageReader pageReader;
   private final ColumnDescriptor descriptor;
+  private final TimeZone storageTz;
+  private final TimeZone sessionTz;
 
-  public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader)
+  public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader,
+                                Configuration conf)
       throws IOException {
     this.descriptor = descriptor;
     this.pageReader = pageReader;
+    // If the table has a timezone property, apply the correct conversions.  See SPARK-12297.
+    // The conf is sometimes null in tests.
+    String sessionTzString =
+        conf == null ? null : conf.get(SQLConf.SESSION_LOCAL_TIMEZONE().key());
+    if (sessionTzString == null || sessionTzString.isEmpty()) {
+      sessionTz = DateTimeUtils.defaultTimeZone();
+    } else {
+      sessionTz = TimeZone.getTimeZone(sessionTzString);
+    }
+    String storageTzString =
+        conf == null ? null : conf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY());
+    if (storageTzString == null || storageTzString.isEmpty()) {
+      storageTz = sessionTz;
+    } else {
+      storageTz = TimeZone.getTimeZone(storageTzString);
+    }
     this.maxDefLevel = descriptor.getMaxDefinitionLevel();
 
     DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
@@ -289,7 +311,7 @@ public class VectorizedColumnReader {
             // TODO: Convert dictionary of Binaries to dictionary of Longs
             if (!column.isNullAt(i)) {
               Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
-              column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
+              column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v, sessionTz, storageTz));
             }
           }
         } else {
@@ -422,7 +444,7 @@ public class VectorizedColumnReader {
         if (defColumn.readInteger() == maxDefLevel) {
           column.putLong(rowId + i,
               // Read 12 bytes for INT96
-              ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)));
+              ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12), sessionTz, storageTz));
         } else {
           column.putNull(rowId + i);
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/22691556/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 51bdf0f..d8974dd 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -95,6 +96,8 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
    */
   private boolean returnColumnarBatch;
 
+  private Configuration conf;
+
   /**
    * The default config on whether columnarBatch should be offheap.
    */
@@ -107,6 +110,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
   public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
       throws IOException, InterruptedException, UnsupportedOperationException {
     super.initialize(inputSplit, taskAttemptContext);
+    this.conf = taskAttemptContext.getConfiguration();
     initializeInternal();
   }
 
@@ -277,7 +281,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
     for (int i = 0; i < columns.size(); ++i) {
       if (missingColumns[i]) continue;
       columnReaders[i] = new VectorizedColumnReader(columns.get(i),
-          pages.getPageReader(columns.get(i)));
+          pages.getPageReader(columns.get(i)), conf);
     }
     totalCountLoadedSoFar += pages.getRowCount();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/22691556/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index ebf03e1..5843c5b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -26,7 +26,6 @@ import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 import scala.util.Try
 
-import org.apache.commons.lang3.StringEscapeUtils
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -37,7 +36,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils}
+import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils}
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -74,6 +73,10 @@ case class CreateTableLikeCommand(
       sourceTableDesc.provider
     }
 
+    val properties = sourceTableDesc.properties.filter { case (k, _) =>
+      k == ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
+    }
+
     // If the location is specified, we create an external table internally.
     // Otherwise create a managed table.
     val tblType = if (location.isEmpty) CatalogTableType.MANAGED else CatalogTableType.EXTERNAL
@@ -86,6 +89,7 @@ case class CreateTableLikeCommand(
           locationUri = location.map(CatalogUtils.stringToURI(_))),
         schema = sourceTableDesc.schema,
         provider = newProvider,
+        properties = properties,
         partitionColumnNames = sourceTableDesc.partitionColumnNames,
         bucketSpec = sourceTableDesc.bucketSpec)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/22691556/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 2f3a2c6..8113768 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -632,4 +632,6 @@ object ParquetFileFormat extends Logging {
         Failure(cause)
     }.toOption
   }
+
+  val PARQUET_TIMEZONE_TABLE_PROPERTY = "parquet.mr.int96.write.zone"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/22691556/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index f1a35dd..bf395a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -95,7 +95,8 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo
     new ParquetRecordMaterializer(
       parquetRequestedSchema,
       ParquetReadSupport.expandUDT(catalystRequestedSchema),
-      new ParquetSchemaConverter(conf))
+      new ParquetSchemaConverter(conf),
+      conf)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/22691556/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
index 4e49a0d..df04199 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
 import org.apache.parquet.schema.MessageType
 
@@ -29,13 +30,17 @@ import org.apache.spark.sql.types.StructType
  * @param parquetSchema Parquet schema of the records to be read
  * @param catalystSchema Catalyst schema of the rows to be constructed
  * @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters
+ * @param hadoopConf hadoop Configuration for passing extra params for parquet conversion
  */
 private[parquet] class ParquetRecordMaterializer(
-    parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetSchemaConverter)
+    parquetSchema: MessageType,
+    catalystSchema: StructType,
+    schemaConverter: ParquetSchemaConverter,
+    hadoopConf: Configuration)
   extends RecordMaterializer[UnsafeRow] {
 
   private val rootConverter =
-    new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, NoopUpdater)
+    new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, hadoopConf, NoopUpdater)
 
   override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord
 

http://git-wip-us.apache.org/repos/asf/spark/blob/22691556/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 32e6c60..d52ff62 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -19,10 +19,12 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.math.{BigDecimal, BigInteger}
 import java.nio.ByteOrder
+import java.util.TimeZone
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.column.Dictionary
 import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
 import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type}
@@ -34,6 +36,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -117,12 +120,14 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd
  * @param parquetType Parquet schema of Parquet records
  * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined
  *        types should have been expanded.
+ * @param hadoopConf a hadoop Configuration for passing any extra parameters for parquet conversion
  * @param updater An updater which propagates converted field values to the parent container
  */
 private[parquet] class ParquetRowConverter(
     schemaConverter: ParquetSchemaConverter,
     parquetType: GroupType,
     catalystType: StructType,
+    hadoopConf: Configuration,
     updater: ParentContainerUpdater)
   extends ParquetGroupConverter(updater) with Logging {
 
@@ -261,18 +266,18 @@ private[parquet] class ParquetRowConverter(
 
       case TimestampType =>
         // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
+        // If the table has a timezone property, apply the correct conversions.  See SPARK-12297.
+        val sessionTzString = hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)
+        val sessionTz = Option(sessionTzString).map(TimeZone.getTimeZone(_))
+          .getOrElse(DateTimeUtils.defaultTimeZone())
+        val storageTzString = hadoopConf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY)
+        val storageTz = Option(storageTzString).map(TimeZone.getTimeZone(_)).getOrElse(sessionTz)
         new ParquetPrimitiveConverter(updater) {
           // Converts nanosecond timestamps stored as INT96
           override def addBinary(value: Binary): Unit = {
-            assert(
-              value.length() == 12,
-              "Timestamps (with nanoseconds) are expected to be stored in 12-byte long binaries, " +
-              s"but got a ${value.length()}-byte binary.")
-
-            val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
-            val timeOfDayNanos = buf.getLong
-            val julianDay = buf.getInt
-            updater.setLong(DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos))
+            val timestamp = ParquetRowConverter.binaryToSQLTimestamp(value, sessionTz = sessionTz,
+              storageTz = storageTz)
+            updater.setLong(timestamp)
           }
         }
 
@@ -302,7 +307,7 @@ private[parquet] class ParquetRowConverter(
 
       case t: StructType =>
         new ParquetRowConverter(
-          schemaConverter, parquetType.asGroupType(), t, new ParentContainerUpdater {
+          schemaConverter, parquetType.asGroupType(), t, hadoopConf, new ParentContainerUpdater {
             override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
           })
 
@@ -651,6 +656,7 @@ private[parquet] class ParquetRowConverter(
 }
 
 private[parquet] object ParquetRowConverter {
+
   def binaryToUnscaledLong(binary: Binary): Long = {
     // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here
     // we are using `Binary.toByteBuffer.array()` to steal the underlying byte array without
@@ -673,12 +679,35 @@ private[parquet] object ParquetRowConverter {
     unscaled
   }
 
-  def binaryToSQLTimestamp(binary: Binary): SQLTimestamp = {
+  /**
+   * Converts an int96 to a SQLTimestamp, given both the storage timezone and the local timezone.
+   * The timestamp is really meant to be interpreted as a "floating time", but since we
+   * actually store it as micros since epoch, why we have to apply a conversion when timezones
+   * change.
+   *
+   * @param binary a parquet Binary which holds one int96
+   * @param sessionTz the session timezone.  This will be used to determine how to display the time,
+    *                  and compute functions on the timestamp which involve a timezone, eg. extract
+    *                  the hour.
+   * @param storageTz the timezone which was used to store the timestamp.  This should come from the
+    *                  timestamp table property, or else assume its the same as the sessionTz
+   * @return a timestamp (millis since epoch) which will render correctly in the sessionTz
+   */
+  def binaryToSQLTimestamp(
+      binary: Binary,
+      sessionTz: TimeZone,
+      storageTz: TimeZone): SQLTimestamp = {
     assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected to be stored in" +
       s" 12-byte long binaries. Found a ${binary.length()}-byte binary instead.")
     val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
     val timeOfDayNanos = buffer.getLong
     val julianDay = buffer.getInt
-    DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
+    val utcEpochMicros = DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
+    // avoid expensive time logic if possible.
+    if (sessionTz.getID() != storageTz.getID()) {
+      DateTimeUtils.convertTz(utcEpochMicros, sessionTz, storageTz)
+    } else {
+      utcEpochMicros
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/22691556/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index 38b0e33..679ed8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.nio.{ByteBuffer, ByteOrder}
 import java.util
+import java.util.TimeZone
 
 import scala.collection.JavaConverters.mapAsJavaMapConverter
 
@@ -75,6 +76,9 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
   // Reusable byte array used to write decimal values
   private val decimalBuffer = new Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION))
 
+  private var storageTz: TimeZone = _
+  private var sessionTz: TimeZone = _
+
   override def init(configuration: Configuration): WriteContext = {
     val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
     this.schema = StructType.fromString(schemaString)
@@ -91,6 +95,19 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
 
 
     this.rootFieldWriters = schema.map(_.dataType).map(makeWriter)
+    // If the table has a timezone property, apply the correct conversions.  See SPARK-12297.
+    val sessionTzString = configuration.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)
+    sessionTz = if (sessionTzString == null || sessionTzString == "") {
+      TimeZone.getDefault()
+    } else {
+      TimeZone.getTimeZone(sessionTzString)
+    }
+    val storageTzString = configuration.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY)
+    storageTz = if (storageTzString == null || storageTzString == "") {
+      sessionTz
+    } else {
+      TimeZone.getTimeZone(storageTzString)
+    }
 
     val messageType = new ParquetSchemaConverter(configuration).convert(schema)
     val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava
@@ -178,7 +195,13 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
 
           // NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond
           // precision.  Nanosecond parts of timestamp values read from INT96 are simply stripped.
-          val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal))
+          val rawMicros = row.getLong(ordinal)
+          val adjustedMicros = if (sessionTz.getID() == storageTz.getID()) {
+            rawMicros
+          } else {
+            DateTimeUtils.convertTz(rawMicros, storageTz, sessionTz)
+          }
+          val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(adjustedMicros)
           val buf = ByteBuffer.wrap(timestampBuffer)
           buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
           recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))

http://git-wip-us.apache.org/repos/asf/spark/blob/22691556/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index ba48fac..8fef467 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -39,9 +39,10 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.sql.internal.StaticSQLConf._
@@ -224,6 +225,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       throw new TableAlreadyExistsException(db = db, table = table)
     }
 
+    val tableTz = tableDefinition.properties.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY)
+    tableTz.foreach { tz =>
+      if (!DateTimeUtils.isValidTimezone(tz)) {
+        throw new AnalysisException(s"Cannot set" +
+          s" ${ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY} to invalid timezone $tz")
+      }
+    }
+
     if (tableDefinition.tableType == VIEW) {
       client.createTable(tableDefinition, ignoreIfExists)
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/22691556/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 6b98066..e0b565c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
 import org.apache.spark.sql.types._
 
@@ -174,7 +175,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
             // We don't support hive bucketed tables, only ones we write out.
             bucketSpec = None,
             fileFormat = fileFormat,
-            options = options)(sparkSession = sparkSession)
+            options = options ++ getStorageTzOptions(relation))(sparkSession = sparkSession)
           val created = LogicalRelation(fsRelation, updatedTable)
           tableRelationCache.put(tableIdentifier, created)
           created
@@ -201,7 +202,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
                 userSpecifiedSchema = Option(dataSchema),
                 // We don't support hive bucketed tables, only ones we write out.
                 bucketSpec = None,
-                options = options,
+                options = options ++ getStorageTzOptions(relation),
                 className = fileType).resolveRelation(),
               table = updatedTable)
 
@@ -222,6 +223,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     result.copy(output = newOutput)
   }
 
+  private def getStorageTzOptions(relation: CatalogRelation): Map[String, String] = {
+    // We add the table timezone to the relation options, which automatically gets injected into the
+    // hadoopConf for the Parquet Converters
+    val storageTzKey = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
+    relation.tableMeta.properties.get(storageTzKey).map(storageTzKey -> _).toMap
+  }
+
   private def inferIfNeeded(
       relation: CatalogRelation,
       options: Map[String, String],

http://git-wip-us.apache.org/repos/asf/spark/blob/22691556/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
index 05b6059..2bfd63d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
@@ -17,12 +17,22 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.File
+import java.net.URLDecoder
 import java.sql.Timestamp
+import java.util.TimeZone
 
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompatibilityTest, ParquetFileFormat}
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{StringType, StructType, TimestampType}
 
 class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHiveSingleton {
   /**
@@ -141,4 +151,369 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
       Row(Seq(Row(1))),
       "ARRAY<STRUCT<array_element: INT>>")
   }
+
+  val testTimezones = Seq(
+    "UTC" -> "UTC",
+    "LA" -> "America/Los_Angeles",
+    "Berlin" -> "Europe/Berlin"
+  )
+  // Check creating parquet tables with timestamps, writing data into them, and reading it back out
+  // under a variety of conditions:
+  // * tables with explicit tz and those without
+  // * altering table properties directly
+  // * variety of timezones, local & non-local
+  val sessionTimezones = testTimezones.map(_._2).map(Some(_)) ++ Seq(None)
+  sessionTimezones.foreach { sessionTzOpt =>
+    val sparkSession = spark.newSession()
+    sessionTzOpt.foreach { tz => sparkSession.conf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, tz) }
+    testCreateWriteRead(sparkSession, "no_tz", None, sessionTzOpt)
+    val localTz = TimeZone.getDefault.getID()
+    testCreateWriteRead(sparkSession, "local", Some(localTz), sessionTzOpt)
+    // check with a variety of timezones.  The unit tests currently are configured to always use
+    // America/Los_Angeles, but even if they didn't, we'd be sure to cover a non-local timezone.
+    testTimezones.foreach { case (tableName, zone) =>
+      if (zone != localTz) {
+        testCreateWriteRead(sparkSession, tableName, Some(zone), sessionTzOpt)
+      }
+    }
+  }
+
+  private def testCreateWriteRead(
+      sparkSession: SparkSession,
+      baseTable: String,
+      explicitTz: Option[String],
+      sessionTzOpt: Option[String]): Unit = {
+    testCreateAlterTablesWithTimezone(sparkSession, baseTable, explicitTz, sessionTzOpt)
+    testWriteTablesWithTimezone(sparkSession, baseTable, explicitTz, sessionTzOpt)
+    testReadTablesWithTimezone(sparkSession, baseTable, explicitTz, sessionTzOpt)
+  }
+
+  private def checkHasTz(spark: SparkSession, table: String, tz: Option[String]): Unit = {
+    val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(table))
+    assert(tableMetadata.properties.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY) === tz)
+  }
+
+  private def testCreateAlterTablesWithTimezone(
+      spark: SparkSession,
+      baseTable: String,
+      explicitTz: Option[String],
+      sessionTzOpt: Option[String]): Unit = {
+    test(s"SPARK-12297: Create and Alter Parquet tables and timezones; explicitTz = $explicitTz; " +
+      s"sessionTzOpt = $sessionTzOpt") {
+      val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
+      withTable(baseTable, s"like_$baseTable", s"select_$baseTable", s"partitioned_$baseTable") {
+        // If we ever add a property to set the table timezone by default, defaultTz would change
+        val defaultTz = None
+        // check that created tables have correct TBLPROPERTIES
+        val tblProperties = explicitTz.map {
+          tz => s"""TBLPROPERTIES ($key="$tz")"""
+        }.getOrElse("")
+        spark.sql(
+          s"""CREATE TABLE $baseTable (
+                |  x int
+                | )
+                | STORED AS PARQUET
+                | $tblProperties
+            """.stripMargin)
+        val expectedTableTz = explicitTz.orElse(defaultTz)
+        checkHasTz(spark, baseTable, expectedTableTz)
+        spark.sql(
+          s"""CREATE TABLE partitioned_$baseTable (
+                |  x int
+                | )
+                | PARTITIONED BY (y int)
+                | STORED AS PARQUET
+                | $tblProperties
+            """.stripMargin)
+        checkHasTz(spark, s"partitioned_$baseTable", expectedTableTz)
+        spark.sql(s"CREATE TABLE like_$baseTable LIKE $baseTable")
+        checkHasTz(spark, s"like_$baseTable", expectedTableTz)
+        spark.sql(
+          s"""CREATE TABLE select_$baseTable
+                | STORED AS PARQUET
+                | AS
+                | SELECT * from $baseTable
+            """.stripMargin)
+        checkHasTz(spark, s"select_$baseTable", defaultTz)
+
+        // check alter table, setting, unsetting, resetting the property
+        spark.sql(
+          s"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="America/Los_Angeles")""")
+        checkHasTz(spark, baseTable, Some("America/Los_Angeles"))
+        spark.sql(s"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="UTC")""")
+        checkHasTz(spark, baseTable, Some("UTC"))
+        spark.sql(s"""ALTER TABLE $baseTable UNSET TBLPROPERTIES ($key)""")
+        checkHasTz(spark, baseTable, None)
+        explicitTz.foreach { tz =>
+          spark.sql(s"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="$tz")""")
+          checkHasTz(spark, baseTable, expectedTableTz)
+        }
+      }
+    }
+  }
+
+  val desiredTimestampStrings = Seq(
+    "2015-12-31 22:49:59.123",
+    "2015-12-31 23:50:59.123",
+    "2016-01-01 00:39:59.123",
+    "2016-01-01 01:29:59.123"
+  )
+  // We don't want to mess with timezones inside the tests themselves, since we use a shared
+  // spark context, and then we might be prone to issues from lazy vals for timezones.  Instead,
+  // we manually adjust the timezone just to determine what the desired millis (since epoch, in utc)
+  // is for various "wall-clock" times in different timezones, and then we can compare against those
+  // in our tests.
+  val timestampTimezoneToMillis = {
+    val originalTz = TimeZone.getDefault
+    try {
+      desiredTimestampStrings.flatMap { timestampString =>
+        Seq("America/Los_Angeles", "Europe/Berlin", "UTC").map { tzId =>
+          TimeZone.setDefault(TimeZone.getTimeZone(tzId))
+          val timestamp = Timestamp.valueOf(timestampString)
+          (timestampString, tzId) -> timestamp.getTime()
+        }
+      }.toMap
+    } finally {
+      TimeZone.setDefault(originalTz)
+    }
+  }
+
+  private def createRawData(spark: SparkSession): Dataset[(String, Timestamp)] = {
+    import spark.implicits._
+    val df = desiredTimestampStrings.toDF("display")
+    // this will get the millis corresponding to the display time given the current *session*
+    // timezone.
+    df.withColumn("ts", expr("cast(display as timestamp)")).as[(String, Timestamp)]
+  }
+
+  private def testWriteTablesWithTimezone(
+      spark: SparkSession,
+      baseTable: String,
+      explicitTz: Option[String],
+      sessionTzOpt: Option[String]) : Unit = {
+    val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
+    test(s"SPARK-12297: Write to Parquet tables with Timestamps; explicitTz = $explicitTz; " +
+        s"sessionTzOpt = $sessionTzOpt") {
+
+      withTable(s"saveAsTable_$baseTable", s"insert_$baseTable", s"partitioned_ts_$baseTable") {
+        val sessionTzId = sessionTzOpt.getOrElse(TimeZone.getDefault().getID())
+        // check that created tables have correct TBLPROPERTIES
+        val tblProperties = explicitTz.map {
+          tz => s"""TBLPROPERTIES ($key="$tz")"""
+        }.getOrElse("")
+
+        val rawData = createRawData(spark)
+        // Check writing data out.
+        // We write data into our tables, and then check the raw parquet files to see whether
+        // the correct conversion was applied.
+        rawData.write.saveAsTable(s"saveAsTable_$baseTable")
+        checkHasTz(spark, s"saveAsTable_$baseTable", None)
+        spark.sql(
+          s"""CREATE TABLE insert_$baseTable (
+                |  display string,
+                |  ts timestamp
+                | )
+                | STORED AS PARQUET
+                | $tblProperties
+               """.stripMargin)
+        checkHasTz(spark, s"insert_$baseTable", explicitTz)
+        rawData.write.insertInto(s"insert_$baseTable")
+        // no matter what, roundtripping via the table should leave the data unchanged
+        val readFromTable = spark.table(s"insert_$baseTable").collect()
+          .map { row => (row.getAs[String](0), row.getAs[Timestamp](1)).toString() }.sorted
+        assert(readFromTable === rawData.collect().map(_.toString()).sorted)
+
+        // Now we load the raw parquet data on disk, and check if it was adjusted correctly.
+        // Note that we only store the timezone in the table property, so when we read the
+        // data this way, we're bypassing all of the conversion logic, and reading the raw
+        // values in the parquet file.
+        val onDiskLocation = spark.sessionState.catalog
+          .getTableMetadata(TableIdentifier(s"insert_$baseTable")).location.getPath
+        // we test reading the data back with and without the vectorized reader, to make sure we
+        // haven't broken reading parquet from non-hive tables, with both readers.
+        Seq(false, true).foreach { vectorized =>
+          spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized)
+          val readFromDisk = spark.read.parquet(onDiskLocation).collect()
+          val storageTzId = explicitTz.getOrElse(sessionTzId)
+          readFromDisk.foreach { row =>
+            val displayTime = row.getAs[String](0)
+            val millis = row.getAs[Timestamp](1).getTime()
+            val expectedMillis = timestampTimezoneToMillis((displayTime, storageTzId))
+            assert(expectedMillis === millis, s"Display time '$displayTime' was stored " +
+              s"incorrectly with sessionTz = ${sessionTzOpt}; Got $millis, expected " +
+              s"$expectedMillis (delta = ${millis - expectedMillis})")
+          }
+        }
+
+        // check tables partitioned by timestamps.  We don't compare the "raw" data in this case,
+        // since they are adjusted even when we bypass the hive table.
+        rawData.write.partitionBy("ts").saveAsTable(s"partitioned_ts_$baseTable")
+        val partitionDiskLocation = spark.sessionState.catalog
+          .getTableMetadata(TableIdentifier(s"partitioned_ts_$baseTable")).location.getPath
+        // no matter what mix of timezones we use, the dirs should specify the value with the
+        // same time we use for display.
+        val parts = new File(partitionDiskLocation).list().collect {
+          case name if name.startsWith("ts=") => URLDecoder.decode(name.stripPrefix("ts="))
+        }.toSet
+        assert(parts === desiredTimestampStrings.toSet)
+      }
+    }
+  }
+
+  private def testReadTablesWithTimezone(
+      spark: SparkSession,
+      baseTable: String,
+      explicitTz: Option[String],
+      sessionTzOpt: Option[String]): Unit = {
+    val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
+    test(s"SPARK-12297: Read from Parquet tables with Timestamps; explicitTz = $explicitTz; " +
+      s"sessionTzOpt = $sessionTzOpt") {
+      withTable(s"external_$baseTable", s"partitioned_$baseTable") {
+        // we intentionally save this data directly, without creating a table, so we can
+        // see that the data is read back differently depending on table properties.
+        // we'll save with adjusted millis, so that it should be the correct millis after reading
+        // back.
+        val rawData = createRawData(spark)
+        // to avoid closing over entire class
+        val timestampTimezoneToMillis = this.timestampTimezoneToMillis
+        import spark.implicits._
+        val adjustedRawData = (explicitTz match {
+          case Some(tzId) =>
+            rawData.map { case (displayTime, _) =>
+              val storageMillis = timestampTimezoneToMillis((displayTime, tzId))
+              (displayTime, new Timestamp(storageMillis))
+            }
+          case _ =>
+            rawData
+        }).withColumnRenamed("_1", "display").withColumnRenamed("_2", "ts")
+        withTempPath { basePath =>
+          val unpartitionedPath = new File(basePath, "flat")
+          val partitionedPath = new File(basePath, "partitioned")
+          adjustedRawData.write.parquet(unpartitionedPath.getCanonicalPath)
+          val options = Map("path" -> unpartitionedPath.getCanonicalPath) ++
+            explicitTz.map { tz => Map(key -> tz) }.getOrElse(Map())
+
+          spark.catalog.createTable(
+            tableName = s"external_$baseTable",
+            source = "parquet",
+            schema = new StructType().add("display", StringType).add("ts", TimestampType),
+            options = options
+          )
+
+          // also write out a partitioned table, to make sure we can access that correctly.
+          // add a column we can partition by (value doesn't particularly matter).
+          val partitionedData = adjustedRawData.withColumn("id", monotonicallyIncreasingId)
+          partitionedData.write.partitionBy("id")
+            .parquet(partitionedPath.getCanonicalPath)
+          // unfortunately, catalog.createTable() doesn't let us specify partitioning, so just use
+          // a "CREATE TABLE" stmt.
+          val tblOpts = explicitTz.map { tz => s"""TBLPROPERTIES ($key="$tz")""" }.getOrElse("")
+          spark.sql(s"""CREATE EXTERNAL TABLE partitioned_$baseTable (
+                         |  display string,
+                         |  ts timestamp
+                         |)
+                         |PARTITIONED BY (id bigint)
+                         |STORED AS parquet
+                         |LOCATION 'file:${partitionedPath.getCanonicalPath}'
+                         |$tblOpts
+                          """.stripMargin)
+          spark.sql(s"msck repair table partitioned_$baseTable")
+
+          for {
+            vectorized <- Seq(false, true)
+            partitioned <- Seq(false, true)
+          } {
+            withClue(s"vectorized = $vectorized; partitioned = $partitioned") {
+              spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized)
+              val sessionTz = sessionTzOpt.getOrElse(TimeZone.getDefault().getID())
+              val table = if (partitioned) s"partitioned_$baseTable" else s"external_$baseTable"
+              val query = s"select display, cast(ts as string) as ts_as_string, ts " +
+                s"from $table"
+              val collectedFromExternal = spark.sql(query).collect()
+              assert( collectedFromExternal.size === 4)
+              collectedFromExternal.foreach { row =>
+                val displayTime = row.getAs[String](0)
+                // the timestamp should still display the same, despite the changes in timezones
+                assert(displayTime === row.getAs[String](1).toString())
+                // we'll also check that the millis behind the timestamp has the appropriate
+                // adjustments.
+                val millis = row.getAs[Timestamp](2).getTime()
+                val expectedMillis = timestampTimezoneToMillis((displayTime, sessionTz))
+                val delta = millis - expectedMillis
+                val deltaHours = delta / (1000L * 60 * 60)
+                assert(millis === expectedMillis, s"Display time '$displayTime' did not have " +
+                  s"correct millis: was $millis, expected $expectedMillis; delta = $delta " +
+                  s"($deltaHours hours)")
+              }
+
+              // Now test that the behavior is still correct even with a filter which could get
+              // pushed down into parquet.  We don't need extra handling for pushed down
+              // predicates because (a) in ParquetFilters, we ignore TimestampType and (b) parquet
+              // does not read statistics from int96 fields, as they are unsigned.  See
+              // scalastyle:off line.size.limit
+              // https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L419
+              // https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L348
+              // scalastyle:on line.size.limit
+              //
+              // Just to be defensive in case anything ever changes in parquet, this test checks
+              // the assumption on column stats, and also the end-to-end behavior.
+
+              val hadoopConf = sparkContext.hadoopConfiguration
+              val fs = FileSystem.get(hadoopConf)
+              val parts = if (partitioned) {
+                val subdirs = fs.listStatus(new Path(partitionedPath.getCanonicalPath))
+                  .filter(_.getPath().getName().startsWith("id="))
+                fs.listStatus(subdirs.head.getPath())
+                  .filter(_.getPath().getName().endsWith(".parquet"))
+              } else {
+                fs.listStatus(new Path(unpartitionedPath.getCanonicalPath))
+                  .filter(_.getPath().getName().endsWith(".parquet"))
+              }
+              // grab the meta data from the parquet file.  The next section of asserts just make
+              // sure the test is configured correctly.
+              assert(parts.size == 1)
+              val oneFooter = ParquetFileReader.readFooter(hadoopConf, parts.head.getPath)
+              assert(oneFooter.getFileMetaData.getSchema.getColumns.size === 2)
+              assert(oneFooter.getFileMetaData.getSchema.getColumns.get(1).getType() ===
+                PrimitiveTypeName.INT96)
+              val oneBlockMeta = oneFooter.getBlocks().get(0)
+              val oneBlockColumnMeta = oneBlockMeta.getColumns().get(1)
+              val columnStats = oneBlockColumnMeta.getStatistics
+              // This is the important assert.  Column stats are written, but they are ignored
+              // when the data is read back as mentioned above, b/c int96 is unsigned.  This
+              // assert makes sure this holds even if we change parquet versions (if eg. there
+              // were ever statistics even on unsigned columns).
+              assert(columnStats.isEmpty)
+
+              // These queries should return the entire dataset, but if the predicates were
+              // applied to the raw values in parquet, they would incorrectly filter data out.
+              Seq(
+                ">" -> "2015-12-31 22:00:00",
+                "<" -> "2016-01-01 02:00:00"
+              ).foreach { case (comparison, value) =>
+                val query =
+                  s"select ts from $table where ts $comparison '$value'"
+                val countWithFilter = spark.sql(query).count()
+                assert(countWithFilter === 4, query)
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  test("SPARK-12297: exception on bad timezone") {
+    val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
+    val badTzException = intercept[AnalysisException] {
+      spark.sql(
+        s"""CREATE TABLE bad_tz_table (
+              |  x int
+              | )
+              | STORED AS PARQUET
+              | TBLPROPERTIES ($key="Blart Versenwald III")
+            """.stripMargin)
+    }
+    assert(badTzException.getMessage.contains("Blart Versenwald III"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message