spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-26243) Use java.time API for parsing timestamps and dates from JSON
Date Sun, 16 Dec 2018 01:37:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-26243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16722345#comment-16722345 ] 

ASF GitHub Bot commented on SPARK-26243:
----------------------------------------

asfgit closed pull request #23196: [SPARK-26243][SQL] Use java.time API for parsing timestamps and dates from JSON
URL: https://github.com/apache/spark/pull/23196
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md
index fee0e6df7177c..3d585864eefe4 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -33,7 +33,7 @@ displayTitle: Spark SQL Upgrading Guide
 
   - Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0.
 
-  - Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
+  - Since Spark 3.0, CSV/JSON datasources use java.time API for parsing and generating CSV/JSON content. In Spark version 2.4 and earlier, java.text.SimpleDateFormat is used for the same purpuse with fallbacks to the parsing mechanisms of Spark 2.0 and 1.x. For example, `2018-12-08 10:39:21.123` with the pattern `yyyy-MM-dd'T'HH:mm:ss.SSS` cannot be parsed since Spark 3.0 because the timestamp does not match to the pattern but it can be parsed by earlier Spark versions due to a fallback to `Timestamp.valueOf`. To parse the same timestamp since Spark 3.0, the pattern should be `yyyy-MM-dd HH:mm:ss.SSS`. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
 
 ## Upgrading From Spark SQL 2.3 to 2.4
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index 345dc4d41993e..35ade136cc607 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -22,13 +22,13 @@ import scala.util.control.Exception.allCatch
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.TypeCoercion
 import org.apache.spark.sql.catalyst.expressions.ExprUtils
-import org.apache.spark.sql.catalyst.util.DateTimeFormatter
+import org.apache.spark.sql.catalyst.util.TimestampFormatter
 import org.apache.spark.sql.types._
 
 class CSVInferSchema(val options: CSVOptions) extends Serializable {
 
   @transient
-  private lazy val timeParser = DateTimeFormatter(
+  private lazy val timestampParser = TimestampFormatter(
     options.timestampFormat,
     options.timeZone,
     options.locale)
@@ -160,7 +160,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
 
   private def tryParseTimestamp(field: String): DataType = {
     // This case infers a custom `dataFormat` is set.
-    if ((allCatch opt timeParser.parse(field)).isDefined) {
+    if ((allCatch opt timestampParser.parse(field)).isDefined) {
       TimestampType
     } else {
       tryParseBoolean(field)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
index af09cd6c8449b..f012d96138f37 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
@@ -22,7 +22,7 @@ import java.io.Writer
 import com.univocity.parsers.csv.CsvWriter
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter}
+import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
 import org.apache.spark.sql.types._
 
 class UnivocityGenerator(
@@ -41,18 +41,18 @@ class UnivocityGenerator(
   private val valueConverters: Array[ValueConverter] =
     schema.map(_.dataType).map(makeConverter).toArray
 
-  private val timeFormatter = DateTimeFormatter(
+  private val timestampFormatter = TimestampFormatter(
     options.timestampFormat,
     options.timeZone,
     options.locale)
-  private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale)
+  private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
 
   private def makeConverter(dataType: DataType): ValueConverter = dataType match {
     case DateType =>
       (row: InternalRow, ordinal: Int) => dateFormatter.format(row.getInt(ordinal))
 
     case TimestampType =>
-      (row: InternalRow, ordinal: Int) => timeFormatter.format(row.getLong(ordinal))
+      (row: InternalRow, ordinal: Int) => timestampFormatter.format(row.getLong(ordinal))
 
     case udt: UserDefinedType[_] => makeConverter(udt.sqlType)
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 0f375e036029c..ed089120055e2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -74,11 +74,11 @@ class UnivocityParser(
 
   private val row = new GenericInternalRow(requiredSchema.length)
 
-  private val timeFormatter = DateTimeFormatter(
+  private val timestampFormatter = TimestampFormatter(
     options.timestampFormat,
     options.timeZone,
     options.locale)
-  private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale)
+  private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
 
   // Retrieve the raw record string.
   private def getCurrentInput: UTF8String = {
@@ -158,7 +158,7 @@ class UnivocityParser(
       }
 
     case _: TimestampType => (d: String) =>
-      nullSafeDatum(d, name, nullable, options)(timeFormatter.parse)
+      nullSafeDatum(d, name, nullable, options)(timestampFormatter.parse)
 
     case _: DateType => (d: String) =>
       nullSafeDatum(d, name, nullable, options)(dateFormatter.parse)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index e10b8a327c01a..eaff3fa7bec25 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -21,7 +21,6 @@ import java.nio.charset.{Charset, StandardCharsets}
 import java.util.{Locale, TimeZone}
 
 import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
-import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util._
@@ -82,13 +81,10 @@ private[sql] class JSONOptions(
   val timeZone: TimeZone = DateTimeUtils.getTimeZone(
     parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
 
-  // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
-  val dateFormat: FastDateFormat =
-    FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), locale)
+  val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd")
 
-  val timestampFormat: FastDateFormat =
-    FastDateFormat.getInstance(
-      parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, locale)
+  val timestampFormat: String =
+    parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
 
   val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
index d02a2be8ddad6..951f5190cd504 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.core._
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
-import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
+import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.types._
 
 /**
@@ -77,6 +77,12 @@ private[sql] class JacksonGenerator(
 
   private val lineSeparator: String = options.lineSeparatorInWrite
 
+  private val timestampFormatter = TimestampFormatter(
+    options.timestampFormat,
+    options.timeZone,
+    options.locale)
+  private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
+
   private def makeWriter(dataType: DataType): ValueWriter = dataType match {
     case NullType =>
       (row: SpecializedGetters, ordinal: Int) =>
@@ -116,14 +122,12 @@ private[sql] class JacksonGenerator(
 
     case TimestampType =>
       (row: SpecializedGetters, ordinal: Int) =>
-        val timestampString =
-          options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
+        val timestampString = timestampFormatter.format(row.getLong(ordinal))
         gen.writeString(timestampString)
 
     case DateType =>
       (row: SpecializedGetters, ordinal: Int) =>
-        val dateString =
-          options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
+        val dateString = dateFormatter.format(row.getInt(ordinal))
         gen.writeString(dateString)
 
     case BinaryType =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 2357595906b11..4862fa2897027 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -55,6 +55,12 @@ class JacksonParser(
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
+  private val timestampFormatter = TimestampFormatter(
+    options.timestampFormat,
+    options.timeZone,
+    options.locale)
+  private val dateFormatter = DateFormatter(options.dateFormat, options.locale)
+
   /**
    * Create a converter which converts the JSON documents held by the `JsonParser`
    * to a value according to a desired schema. This is a wrapper for the method
@@ -218,17 +224,7 @@ class JacksonParser(
     case TimestampType =>
       (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
         case VALUE_STRING if parser.getTextLength >= 1 =>
-          val stringValue = parser.getText
-          // This one will lose microseconds parts.
-          // See https://issues.apache.org/jira/browse/SPARK-10681.
-          Long.box {
-            Try(options.timestampFormat.parse(stringValue).getTime * 1000L)
-              .getOrElse {
-                // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
-                // compatibility.
-                DateTimeUtils.stringToTime(stringValue).getTime * 1000L
-              }
-          }
+          timestampFormatter.parse(parser.getText)
 
         case VALUE_NUMBER_INT =>
           parser.getLongValue * 1000000L
@@ -237,22 +233,7 @@ class JacksonParser(
     case DateType =>
       (parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) {
         case VALUE_STRING if parser.getTextLength >= 1 =>
-          val stringValue = parser.getText
-          // This one will lose microseconds parts.
-          // See https://issues.apache.org/jira/browse/SPARK-10681.x
-          Int.box {
-            Try(DateTimeUtils.millisToDays(options.dateFormat.parse(stringValue).getTime))
-              .orElse {
-                // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
-                // compatibility.
-                Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(stringValue).getTime))
-              }
-              .getOrElse {
-                // In Spark 1.5.0, we store the data as number of days since epoch in string.
-                // So, we just convert it to Int.
-                stringValue.toInt
-              }
-          }
+          dateFormatter.parse(parser.getText)
       }
 
     case BinaryType =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
similarity index 63%
rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index ad1f4131de2f6..2b8d22dde9267 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.util
 
 import java.time._
 import java.time.format.DateTimeFormatterBuilder
-import java.time.temporal.{ChronoField, TemporalQueries}
+import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries}
 import java.util.{Locale, TimeZone}
 
 import scala.util.Try
@@ -28,31 +28,44 @@ import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.sql.internal.SQLConf
 
-sealed trait DateTimeFormatter {
+sealed trait TimestampFormatter {
   def parse(s: String): Long // returns microseconds since epoch
   def format(us: Long): String
 }
 
-class Iso8601DateTimeFormatter(
+trait FormatterUtils {
+  protected def zoneId: ZoneId
+  protected def buildFormatter(
+      pattern: String,
+      locale: Locale): java.time.format.DateTimeFormatter = {
+    new DateTimeFormatterBuilder()
+      .appendPattern(pattern)
+      .parseDefaulting(ChronoField.YEAR_OF_ERA, 1970)
+      .parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
+      .parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
+      .parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
+      .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
+      .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
+      .toFormatter(locale)
+  }
+  protected def toInstantWithZoneId(temporalAccessor: TemporalAccessor): java.time.Instant = {
+    val localDateTime = LocalDateTime.from(temporalAccessor)
+    val zonedDateTime = ZonedDateTime.of(localDateTime, zoneId)
+    Instant.from(zonedDateTime)
+  }
+}
+
+class Iso8601TimestampFormatter(
     pattern: String,
     timeZone: TimeZone,
-    locale: Locale) extends DateTimeFormatter {
-  val formatter = new DateTimeFormatterBuilder()
-    .appendPattern(pattern)
-    .parseDefaulting(ChronoField.YEAR_OF_ERA, 1970)
-    .parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
-    .parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
-    .parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
-    .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
-    .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
-    .toFormatter(locale)
+    locale: Locale) extends TimestampFormatter with FormatterUtils {
+  val zoneId = timeZone.toZoneId
+  val formatter = buildFormatter(pattern, locale)
 
   def toInstant(s: String): Instant = {
     val temporalAccessor = formatter.parse(s)
     if (temporalAccessor.query(TemporalQueries.offset()) == null) {
-      val localDateTime = LocalDateTime.from(temporalAccessor)
-      val zonedDateTime = ZonedDateTime.of(localDateTime, timeZone.toZoneId)
-      Instant.from(zonedDateTime)
+      toInstantWithZoneId(temporalAccessor)
     } else {
       Instant.from(temporalAccessor)
     }
@@ -75,10 +88,10 @@ class Iso8601DateTimeFormatter(
   }
 }
 
-class LegacyDateTimeFormatter(
+class LegacyTimestampFormatter(
     pattern: String,
     timeZone: TimeZone,
-    locale: Locale) extends DateTimeFormatter {
+    locale: Locale) extends TimestampFormatter {
   val format = FastDateFormat.getInstance(pattern, timeZone, locale)
 
   protected def toMillis(s: String): Long = format.parse(s).getTime
@@ -90,21 +103,21 @@ class LegacyDateTimeFormatter(
   }
 }
 
-class LegacyFallbackDateTimeFormatter(
+class LegacyFallbackTimestampFormatter(
     pattern: String,
     timeZone: TimeZone,
-    locale: Locale) extends LegacyDateTimeFormatter(pattern, timeZone, locale) {
+    locale: Locale) extends LegacyTimestampFormatter(pattern, timeZone, locale) {
   override def toMillis(s: String): Long = {
     Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime)
   }
 }
 
-object DateTimeFormatter {
-  def apply(format: String, timeZone: TimeZone, locale: Locale): DateTimeFormatter = {
+object TimestampFormatter {
+  def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = {
     if (SQLConf.get.legacyTimeParserEnabled) {
-      new LegacyFallbackDateTimeFormatter(format, timeZone, locale)
+      new LegacyFallbackTimestampFormatter(format, timeZone, locale)
     } else {
-      new Iso8601DateTimeFormatter(format, timeZone, locale)
+      new Iso8601TimestampFormatter(format, timeZone, locale)
     }
   }
 }
@@ -116,13 +129,19 @@ sealed trait DateFormatter {
 
 class Iso8601DateFormatter(
     pattern: String,
-    timeZone: TimeZone,
-    locale: Locale) extends DateFormatter {
+    locale: Locale) extends DateFormatter with FormatterUtils {
+
+  val zoneId = ZoneId.of("UTC")
+
+  val formatter = buildFormatter(pattern, locale)
 
-  val dateTimeFormatter = new Iso8601DateTimeFormatter(pattern, timeZone, locale)
+  def toInstant(s: String): Instant = {
+    val temporalAccessor = formatter.parse(s)
+    toInstantWithZoneId(temporalAccessor)
+  }
 
   override def parse(s: String): Int = {
-    val seconds = dateTimeFormatter.toInstant(s).getEpochSecond
+    val seconds = toInstant(s).getEpochSecond
     val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY)
 
     days.toInt
@@ -130,15 +149,12 @@ class Iso8601DateFormatter(
 
   override def format(days: Int): String = {
     val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY)
-    dateTimeFormatter.formatter.withZone(timeZone.toZoneId).format(instant)
+    formatter.withZone(zoneId).format(instant)
   }
 }
 
-class LegacyDateFormatter(
-    pattern: String,
-    timeZone: TimeZone,
-    locale: Locale) extends DateFormatter {
-  val format = FastDateFormat.getInstance(pattern, timeZone, locale)
+class LegacyDateFormatter(pattern: String, locale: Locale) extends DateFormatter {
+  val format = FastDateFormat.getInstance(pattern, locale)
 
   def parse(s: String): Int = {
     val milliseconds = format.parse(s).getTime
@@ -153,8 +169,7 @@ class LegacyDateFormatter(
 
 class LegacyFallbackDateFormatter(
     pattern: String,
-    timeZone: TimeZone,
-    locale: Locale) extends LegacyDateFormatter(pattern, timeZone, locale) {
+    locale: Locale) extends LegacyDateFormatter(pattern, locale) {
   override def parse(s: String): Int = {
     Try(super.parse(s)).orElse {
       // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
@@ -169,11 +184,11 @@ class LegacyFallbackDateFormatter(
 }
 
 object DateFormatter {
-  def apply(format: String, timeZone: TimeZone, locale: Locale): DateFormatter = {
+  def apply(format: String, locale: Locale): DateFormatter = {
     if (SQLConf.get.legacyTimeParserEnabled) {
-      new LegacyFallbackDateFormatter(format, timeZone, locale)
+      new LegacyFallbackDateFormatter(format, locale)
     } else {
-      new Iso8601DateFormatter(format, timeZone, locale)
+      new Iso8601DateFormatter(format, locale)
     }
   }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala
deleted file mode 100644
index 02d4ee0490604..0000000000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimeFormatterSuite.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.util
-
-import java.util.{Locale, TimeZone}
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter, DateTimeTestUtils}
-
-class DateTimeFormatterSuite  extends SparkFunSuite {
-  test("parsing dates using time zones") {
-    val localDate = "2018-12-02"
-    val expectedDays = Map(
-      "UTC" -> 17867,
-      "PST" -> 17867,
-      "CET" -> 17866,
-      "Africa/Dakar" -> 17867,
-      "America/Los_Angeles" -> 17867,
-      "Antarctica/Vostok" -> 17866,
-      "Asia/Hong_Kong" -> 17866,
-      "Europe/Amsterdam" -> 17866)
-    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
-      val formatter = DateFormatter("yyyy-MM-dd", TimeZone.getTimeZone(timeZone), Locale.US)
-      val daysSinceEpoch = formatter.parse(localDate)
-      assert(daysSinceEpoch === expectedDays(timeZone))
-    }
-  }
-
-  test("parsing timestamps using time zones") {
-    val localDate = "2018-12-02T10:11:12.001234"
-    val expectedMicros = Map(
-      "UTC" -> 1543745472001234L,
-      "PST" -> 1543774272001234L,
-      "CET" -> 1543741872001234L,
-      "Africa/Dakar" -> 1543745472001234L,
-      "America/Los_Angeles" -> 1543774272001234L,
-      "Antarctica/Vostok" -> 1543723872001234L,
-      "Asia/Hong_Kong" -> 1543716672001234L,
-      "Europe/Amsterdam" -> 1543741872001234L)
-    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
-      val formatter = DateTimeFormatter(
-        "yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
-        TimeZone.getTimeZone(timeZone),
-        Locale.US)
-      val microsSinceEpoch = formatter.parse(localDate)
-      assert(microsSinceEpoch === expectedMicros(timeZone))
-    }
-  }
-
-  test("format dates using time zones") {
-    val daysSinceEpoch = 17867
-    val expectedDate = Map(
-      "UTC" -> "2018-12-02",
-      "PST" -> "2018-12-01",
-      "CET" -> "2018-12-02",
-      "Africa/Dakar" -> "2018-12-02",
-      "America/Los_Angeles" -> "2018-12-01",
-      "Antarctica/Vostok" -> "2018-12-02",
-      "Asia/Hong_Kong" -> "2018-12-02",
-      "Europe/Amsterdam" -> "2018-12-02")
-    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
-      val formatter = DateFormatter("yyyy-MM-dd", TimeZone.getTimeZone(timeZone), Locale.US)
-      val date = formatter.format(daysSinceEpoch)
-      assert(date === expectedDate(timeZone))
-    }
-  }
-
-  test("format timestamps using time zones") {
-    val microsSinceEpoch = 1543745472001234L
-    val expectedTimestamp = Map(
-      "UTC" -> "2018-12-02T10:11:12.001234",
-      "PST" -> "2018-12-02T02:11:12.001234",
-      "CET" -> "2018-12-02T11:11:12.001234",
-      "Africa/Dakar" -> "2018-12-02T10:11:12.001234",
-      "America/Los_Angeles" -> "2018-12-02T02:11:12.001234",
-      "Antarctica/Vostok" -> "2018-12-02T16:11:12.001234",
-      "Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234",
-      "Europe/Amsterdam" -> "2018-12-02T11:11:12.001234")
-    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
-      val formatter = DateTimeFormatter(
-        "yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
-        TimeZone.getTimeZone(timeZone),
-        Locale.US)
-      val timestamp = formatter.format(microsSinceEpoch)
-      assert(timestamp === expectedTimestamp(timeZone))
-    }
-  }
-}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimestampFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimestampFormatterSuite.scala
new file mode 100644
index 0000000000000..43e348c7eebf4
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateTimestampFormatterSuite.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import java.util.{Locale, TimeZone}
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.internal.SQLConf
+
+class DateTimestampFormatterSuite extends SparkFunSuite with SQLHelper {
+  test("parsing dates") {
+    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+      withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+        val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
+        val daysSinceEpoch = formatter.parse("2018-12-02")
+        assert(daysSinceEpoch === 17867)
+      }
+    }
+  }
+
+  test("parsing timestamps using time zones") {
+    val localDate = "2018-12-02T10:11:12.001234"
+    val expectedMicros = Map(
+      "UTC" -> 1543745472001234L,
+      "PST" -> 1543774272001234L,
+      "CET" -> 1543741872001234L,
+      "Africa/Dakar" -> 1543745472001234L,
+      "America/Los_Angeles" -> 1543774272001234L,
+      "Antarctica/Vostok" -> 1543723872001234L,
+      "Asia/Hong_Kong" -> 1543716672001234L,
+      "Europe/Amsterdam" -> 1543741872001234L)
+    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+      val formatter = TimestampFormatter(
+        "yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
+        TimeZone.getTimeZone(timeZone),
+        Locale.US)
+      val microsSinceEpoch = formatter.parse(localDate)
+      assert(microsSinceEpoch === expectedMicros(timeZone))
+    }
+  }
+
+  test("format dates") {
+    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+      withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+        val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
+        val date = formatter.format(17867)
+        assert(date === "2018-12-02")
+      }
+    }
+  }
+
+  test("format timestamps using time zones") {
+    val microsSinceEpoch = 1543745472001234L
+    val expectedTimestamp = Map(
+      "UTC" -> "2018-12-02T10:11:12.001234",
+      "PST" -> "2018-12-02T02:11:12.001234",
+      "CET" -> "2018-12-02T11:11:12.001234",
+      "Africa/Dakar" -> "2018-12-02T10:11:12.001234",
+      "America/Los_Angeles" -> "2018-12-02T02:11:12.001234",
+      "Antarctica/Vostok" -> "2018-12-02T16:11:12.001234",
+      "Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234",
+      "Europe/Amsterdam" -> "2018-12-02T11:11:12.001234")
+    DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+      val formatter = TimestampFormatter(
+        "yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
+        TimeZone.getTimeZone(timeZone),
+        Locale.US)
+      val timestamp = formatter.format(microsSinceEpoch)
+      assert(timestamp === expectedTimestamp(timeZone))
+    }
+  }
+
+  test("roundtrip timestamp -> micros -> timestamp using timezones") {
+    Seq(
+      -58710115316212000L,
+      -18926315945345679L,
+      -9463427405253013L,
+      -244000001L,
+      0L,
+      99628200102030L,
+      1543749753123456L,
+      2177456523456789L,
+      11858049903010203L).foreach { micros =>
+      DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
+        val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US)
+        val timestamp = formatter.format(micros)
+        val parsed = formatter.parse(timestamp)
+        assert(micros === parsed)
+      }
+    }
+  }
+
+  test("roundtrip micros -> timestamp -> micros using timezones") {
+    Seq(
+      "0109-07-20T18:38:03.788000",
+      "1370-04-01T10:00:54.654321",
+      "1670-02-11T14:09:54.746987",
+      "1969-12-31T23:55:55.999999",
+      "1970-01-01T00:00:00.000000",
+      "1973-02-27T02:30:00.102030",
+      "2018-12-02T11:22:33.123456",
+      "2039-01-01T01:02:03.456789",
+      "2345-10-07T22:45:03.010203").foreach { timestamp =>
+      DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
+        val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US)
+        val micros = formatter.parse(timestamp)
+        val formatted = formatter.format(micros)
+        assert(timestamp === formatted)
+      }
+    }
+  }
+
+  test("roundtrip date -> days -> date") {
+    Seq(
+      "0050-01-01",
+      "0953-02-02",
+      "1423-03-08",
+      "1969-12-31",
+      "1972-08-25",
+      "1975-09-26",
+      "2018-12-12",
+      "2038-01-01",
+      "5010-11-17").foreach { date =>
+      DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+        withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+          val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
+          val days = formatter.parse(date)
+          val formatted = formatter.format(days)
+          assert(date === formatted)
+        }
+      }
+    }
+  }
+
+  test("roundtrip days -> date -> days") {
+    Seq(
+      -701265,
+      -371419,
+      -199722,
+      -1,
+      0,
+      967,
+      2094,
+      17877,
+      24837,
+      1110657).foreach { days =>
+      DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
+        withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
+          val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
+          val date = formatter.format(days)
+          val parsed = formatter.parse(date)
+          assert(days === parsed)
+        }
+      }
+    }
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index dff37ca2d40f0..96f7d24b36356 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -57,14 +57,17 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     }
 
     val factory = new JsonFactory()
-    def enforceCorrectType(value: Any, dataType: DataType): Any = {
+    def enforceCorrectType(
+        value: Any,
+        dataType: DataType,
+        options: Map[String, String] = Map.empty): Any = {
       val writer = new StringWriter()
       Utils.tryWithResource(factory.createGenerator(writer)) { generator =>
         generator.writeObject(value)
         generator.flush()
       }
 
-      val dummyOption = new JSONOptions(Map.empty[String, String], "GMT")
+      val dummyOption = new JSONOptions(options, SQLConf.get.sessionLocalTimeZone)
       val dummySchema = StructType(Seq.empty)
       val parser = new JacksonParser(dummySchema, dummyOption, allowArrayAsStructs = true)
 
@@ -96,19 +99,27 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong * 1000L)),
         enforceCorrectType(intNumber.toLong, TimestampType))
     val strTime = "2014-09-30 12:34:56"
-    checkTypePromotion(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
-        enforceCorrectType(strTime, TimestampType))
+    checkTypePromotion(
+      expected = DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
+      enforceCorrectType(strTime, TimestampType,
+        Map("timestampFormat" -> "yyyy-MM-dd HH:mm:ss")))
 
     val strDate = "2014-10-15"
     checkTypePromotion(
       DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType))
 
     val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
-    val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
     checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)),
-        enforceCorrectType(ISO8601Time1, TimestampType))
+        enforceCorrectType(
+          ISO8601Time1,
+          TimestampType,
+          Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SX")))
+    val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
     checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)),
-        enforceCorrectType(ISO8601Time2, TimestampType))
+        enforceCorrectType(
+          ISO8601Time2,
+          TimestampType,
+          Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ssXXX")))
 
     val ISO8601Date = "1970-01-01"
     checkTypePromotion(DateTimeUtils.millisToDays(32400000),
@@ -1440,103 +1451,105 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("backward compatibility") {
-    // This test we make sure our JSON support can read JSON data generated by previous version
-    // of Spark generated through toJSON method and JSON data source.
-    // The data is generated by the following program.
-    // Here are a few notes:
-    //  - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13)
-    //      in the JSON object.
-    //  - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to
-    //      JSON objects generated by those Spark versions (col17).
-    //  - If the type is NullType, we do not write data out.
-
-    // Create the schema.
-    val struct =
-      StructType(
-        StructField("f1", FloatType, true) ::
-          StructField("f2", ArrayType(BooleanType), true) :: Nil)
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") {
+      // This test we make sure our JSON support can read JSON data generated by previous version
+      // of Spark generated through toJSON method and JSON data source.
+      // The data is generated by the following program.
+      // Here are a few notes:
+      //  - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13)
+      //      in the JSON object.
+      //  - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to
+      //      JSON objects generated by those Spark versions (col17).
+      //  - If the type is NullType, we do not write data out.
+
+      // Create the schema.
+      val struct =
+        StructType(
+          StructField("f1", FloatType, true) ::
+            StructField("f2", ArrayType(BooleanType), true) :: Nil)
 
-    val dataTypes =
-      Seq(
-        StringType, BinaryType, NullType, BooleanType,
-        ByteType, ShortType, IntegerType, LongType,
-        FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
-        DateType, TimestampType,
-        ArrayType(IntegerType), MapType(StringType, LongType), struct,
-        new TestUDT.MyDenseVectorUDT())
-    val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
-      StructField(s"col$index", dataType, nullable = true)
-    }
-    val schema = StructType(fields)
+      val dataTypes =
+        Seq(
+          StringType, BinaryType, NullType, BooleanType,
+          ByteType, ShortType, IntegerType, LongType,
+          FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
+          DateType, TimestampType,
+          ArrayType(IntegerType), MapType(StringType, LongType), struct,
+          new TestUDT.MyDenseVectorUDT())
+      val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
+        StructField(s"col$index", dataType, nullable = true)
+      }
+      val schema = StructType(fields)
 
-    val constantValues =
-      Seq(
-        "a string in binary".getBytes(StandardCharsets.UTF_8),
-        null,
-        true,
-        1.toByte,
-        2.toShort,
-        3,
-        Long.MaxValue,
-        0.25.toFloat,
-        0.75,
-        new java.math.BigDecimal(s"1234.23456"),
-        new java.math.BigDecimal(s"1.23456"),
-        java.sql.Date.valueOf("2015-01-01"),
-        java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"),
-        Seq(2, 3, 4),
-        Map("a string" -> 2000L),
-        Row(4.75.toFloat, Seq(false, true)),
-        new TestUDT.MyDenseVector(Array(0.25, 2.25, 4.25)))
-    val data =
-      Row.fromSeq(Seq("Spark " + spark.sparkContext.version) ++ constantValues) :: Nil
+      val constantValues =
+        Seq(
+          "a string in binary".getBytes(StandardCharsets.UTF_8),
+          null,
+          true,
+          1.toByte,
+          2.toShort,
+          3,
+          Long.MaxValue,
+          0.25.toFloat,
+          0.75,
+          new java.math.BigDecimal(s"1234.23456"),
+          new java.math.BigDecimal(s"1.23456"),
+          java.sql.Date.valueOf("2015-01-01"),
+          java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"),
+          Seq(2, 3, 4),
+          Map("a string" -> 2000L),
+          Row(4.75.toFloat, Seq(false, true)),
+          new TestUDT.MyDenseVector(Array(0.25, 2.25, 4.25)))
+      val data =
+        Row.fromSeq(Seq("Spark " + spark.sparkContext.version) ++ constantValues) :: Nil
 
-    // Data generated by previous versions.
-    // scalastyle:off
-    val existingJSONData =
+      // Data generated by previous versions.
+      // scalastyle:off
+      val existingJSONData =
       """{"col0":"Spark 1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
-      """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
-      """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
-      """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
-      """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
-      """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
-      """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil
-    // scalastyle:on
-
-    // Generate data for the current version.
-    val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema)
-    withTempPath { path =>
-      df.write.format("json").mode("overwrite").save(path.getCanonicalPath)
+        """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+        """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+        """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+        """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+        """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+        """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil
+      // scalastyle:on
+
+      // Generate data for the current version.
+      val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema)
+      withTempPath { path =>
+        df.write.format("json").mode("overwrite").save(path.getCanonicalPath)
 
-      // df.toJSON will convert internal rows to external rows first and then generate
-      // JSON objects. While, df.write.format("json") will write internal rows directly.
-      val allJSON =
+        // df.toJSON will convert internal rows to external rows first and then generate
+        // JSON objects. While, df.write.format("json") will write internal rows directly.
+        val allJSON =
         existingJSONData ++
           df.toJSON.collect() ++
           sparkContext.textFile(path.getCanonicalPath).collect()
 
-      Utils.deleteRecursively(path)
-      sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath)
-
-      // Read data back with the schema specified.
-      val col0Values =
-        Seq(
-          "Spark 1.2.2",
-          "Spark 1.3.1",
-          "Spark 1.3.1",
-          "Spark 1.4.1",
-          "Spark 1.4.1",
-          "Spark 1.5.0",
-          "Spark 1.5.0",
-          "Spark " + spark.sparkContext.version,
-          "Spark " + spark.sparkContext.version)
-      val expectedResult = col0Values.map { v =>
-        Row.fromSeq(Seq(v) ++ constantValues)
+        Utils.deleteRecursively(path)
+        sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath)
+
+        // Read data back with the schema specified.
+        val col0Values =
+          Seq(
+            "Spark 1.2.2",
+            "Spark 1.3.1",
+            "Spark 1.3.1",
+            "Spark 1.4.1",
+            "Spark 1.4.1",
+            "Spark 1.5.0",
+            "Spark 1.5.0",
+            "Spark " + spark.sparkContext.version,
+            "Spark " + spark.sparkContext.version)
+        val expectedResult = col0Values.map { v =>
+          Row.fromSeq(Seq(v) ++ constantValues)
+        }
+        checkAnswer(
+          spark.read.format("json").schema(schema).load(path.getCanonicalPath),
+          expectedResult
+        )
       }
-      checkAnswer(
-        spark.read.format("json").schema(schema).load(path.getCanonicalPath),
-        expectedResult
-      )
     }
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index 6075f2c8877d6..f0f62b608785d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.sources
 
 import java.io.File
+import java.util.TimeZone
 
 import scala.util.Random
 
@@ -125,56 +126,62 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
     } else {
       Seq(false)
     }
-    for (dataType <- supportedDataTypes) {
-      for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) {
-        val extraMessage = if (isParquetDataSource) {
-          s" with parquet.enable.dictionary = $parquetDictionaryEncodingEnabled"
-        } else {
-          ""
-        }
-        logInfo(s"Testing $dataType data type$extraMessage")
-
-        val extraOptions = Map[String, String](
-          "parquet.enable.dictionary" -> parquetDictionaryEncodingEnabled.toString
-        )
-
-        withTempPath { file =>
-          val path = file.getCanonicalPath
-
-          val dataGenerator = RandomDataGenerator.forType(
-            dataType = dataType,
-            nullable = true,
-            new Random(System.nanoTime())
-          ).getOrElse {
-            fail(s"Failed to create data generator for schema $dataType")
+    // TODO: Support new parser too, see SPARK-26374.
+    withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") {
+      for (dataType <- supportedDataTypes) {
+        for (parquetDictionaryEncodingEnabled <- parquetDictionaryEncodingEnabledConfs) {
+          val extraMessage = if (isParquetDataSource) {
+            s" with parquet.enable.dictionary = $parquetDictionaryEncodingEnabled"
+          } else {
+            ""
+          }
+          logInfo(s"Testing $dataType data type$extraMessage")
+
+          val extraOptions = Map[String, String](
+            "parquet.enable.dictionary" -> parquetDictionaryEncodingEnabled.toString
+          )
+
+          withTempPath { file =>
+            val path = file.getCanonicalPath
+
+            val seed = System.nanoTime()
+            withClue(s"Random data generated with the seed: ${seed}") {
+              val dataGenerator = RandomDataGenerator.forType(
+                dataType = dataType,
+                nullable = true,
+                new Random(seed)
+              ).getOrElse {
+                fail(s"Failed to create data generator for schema $dataType")
+              }
+
+              // Create a DF for the schema with random data. The index field is used to sort the
+              // DataFrame.  This is a workaround for SPARK-10591.
+              val schema = new StructType()
+                .add("index", IntegerType, nullable = false)
+                .add("col", dataType, nullable = true)
+              val rdd =
+                spark.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
+              val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
+
+              df.write
+                .mode("overwrite")
+                .format(dataSourceName)
+                .option("dataSchema", df.schema.json)
+                .options(extraOptions)
+                .save(path)
+
+              val loadedDF = spark
+                .read
+                .format(dataSourceName)
+                .option("dataSchema", df.schema.json)
+                .schema(df.schema)
+                .options(extraOptions)
+                .load(path)
+                .orderBy("index")
+
+              checkAnswer(loadedDF, df)
+            }
           }
-
-          // Create a DF for the schema with random data. The index field is used to sort the
-          // DataFrame.  This is a workaround for SPARK-10591.
-          val schema = new StructType()
-            .add("index", IntegerType, nullable = false)
-            .add("col", dataType, nullable = true)
-          val rdd =
-            spark.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
-          val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
-
-          df.write
-            .mode("overwrite")
-            .format(dataSourceName)
-            .option("dataSchema", df.schema.json)
-            .options(extraOptions)
-            .save(path)
-
-          val loadedDF = spark
-            .read
-            .format(dataSourceName)
-            .option("dataSchema", df.schema.json)
-            .schema(df.schema)
-            .options(extraOptions)
-            .load(path)
-            .orderBy("index")
-
-          checkAnswer(loadedDF, df)
         }
       }
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Use java.time API for parsing timestamps and dates from JSON
> ------------------------------------------------------------
>
>                 Key: SPARK-26243
>                 URL: https://issues.apache.org/jira/browse/SPARK-26243
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Maxim Gekk
>            Assignee: Maxim Gekk
>            Priority: Major
>             Fix For: 3.0.0
>
>
> Currently, CSV datasource uses Apache FastDateFormat with a few fallbacks for parsing values of TimestampType and DateType. The result of parsing is an instance of java.util.Date/Timestamp which represents a specific instant in time, with millisecond precision. The tickets aims to switch on Java 8 API - java.time which allow parsing with nanoseconds precision.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message