spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hvanhov...@apache.org
Subject [1/2] spark git commit: [SPARK-18936][SQL] Infrastructure for session local timezone support.
Date Thu, 26 Jan 2017 10:51:07 GMT
Repository: spark
Updated Branches:
  refs/heads/master 7045b8b35 -> 2969fb437


http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
index 35cea25..9978f35 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala
@@ -19,10 +19,12 @@ package org.apache.spark.sql.catalyst.expressions
 
 import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
-import java.util.{Calendar, Locale}
+import java.util.{Calendar, Locale, TimeZone}
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneGMT
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.CalendarInterval
 
@@ -30,16 +32,29 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
 
   import IntegralLiteralTestUtils._
 
+  val TimeZonePST = TimeZone.getTimeZone("PST")
+  val TimeZoneJST = TimeZone.getTimeZone("JST")
+
+  val gmtId = Option(TimeZoneGMT.getID)
+  val pstId = Option(TimeZonePST.getID)
+  val jstId = Option(TimeZoneJST.getID)
+
   val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
+  sdf.setTimeZone(TimeZoneGMT)
   val sdfDate = new SimpleDateFormat("yyyy-MM-dd", Locale.US)
+  sdfDate.setTimeZone(TimeZoneGMT)
   val d = new Date(sdf.parse("2015-04-08 13:10:15").getTime)
   val ts = new Timestamp(sdf.parse("2013-11-08 13:10:15").getTime)
 
   test("datetime function current_date") {
-    val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis())
-    val cd = CurrentDate().eval(EmptyRow).asInstanceOf[Int]
-    val d1 = DateTimeUtils.millisToDays(System.currentTimeMillis())
+    val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis(), TimeZoneGMT)
+    val cd = CurrentDate(gmtId).eval(EmptyRow).asInstanceOf[Int]
+    val d1 = DateTimeUtils.millisToDays(System.currentTimeMillis(), TimeZoneGMT)
     assert(d0 <= cd && cd <= d1 && d1 - d0 <= 1)
+
+    val cdjst = CurrentDate(jstId).eval(EmptyRow).asInstanceOf[Int]
+    val cdpst = CurrentDate(pstId).eval(EmptyRow).asInstanceOf[Int]
+    assert(cdpst <= cd && cd <= cdjst)
   }
 
   test("datetime function current_timestamp") {
@@ -50,9 +65,10 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
 
   test("DayOfYear") {
     val sdfDay = new SimpleDateFormat("D", Locale.US)
+
+    val c = Calendar.getInstance()
     (0 to 3).foreach { m =>
       (0 to 5).foreach { i =>
-        val c = Calendar.getInstance()
         c.set(2000, m, 28, 0, 0, 0)
         c.add(Calendar.DATE, i)
         checkEvaluation(DayOfYear(Literal(new Date(c.getTimeInMillis))),
@@ -66,8 +82,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
   test("Year") {
     checkEvaluation(Year(Literal.create(null, DateType)), null)
     checkEvaluation(Year(Literal(d)), 2015)
-    checkEvaluation(Year(Cast(Literal(sdfDate.format(d)), DateType)), 2015)
-    checkEvaluation(Year(Cast(Literal(ts), DateType)), 2013)
+    checkEvaluation(Year(Cast(Literal(sdfDate.format(d)), DateType, gmtId)), 2015)
+    checkEvaluation(Year(Cast(Literal(ts), DateType, gmtId)), 2013)
 
     val c = Calendar.getInstance()
     (2000 to 2002).foreach { y =>
@@ -86,8 +102,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
   test("Quarter") {
     checkEvaluation(Quarter(Literal.create(null, DateType)), null)
     checkEvaluation(Quarter(Literal(d)), 2)
-    checkEvaluation(Quarter(Cast(Literal(sdfDate.format(d)), DateType)), 2)
-    checkEvaluation(Quarter(Cast(Literal(ts), DateType)), 4)
+    checkEvaluation(Quarter(Cast(Literal(sdfDate.format(d)), DateType, gmtId)), 2)
+    checkEvaluation(Quarter(Cast(Literal(ts), DateType, gmtId)), 4)
 
     val c = Calendar.getInstance()
     (2003 to 2004).foreach { y =>
@@ -106,13 +122,13 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
   test("Month") {
     checkEvaluation(Month(Literal.create(null, DateType)), null)
     checkEvaluation(Month(Literal(d)), 4)
-    checkEvaluation(Month(Cast(Literal(sdfDate.format(d)), DateType)), 4)
-    checkEvaluation(Month(Cast(Literal(ts), DateType)), 11)
+    checkEvaluation(Month(Cast(Literal(sdfDate.format(d)), DateType, gmtId)), 4)
+    checkEvaluation(Month(Cast(Literal(ts), DateType, gmtId)), 11)
 
+    val c = Calendar.getInstance()
     (2003 to 2004).foreach { y =>
       (0 to 3).foreach { m =>
         (0 to 2 * 24).foreach { i =>
-          val c = Calendar.getInstance()
           c.set(y, m, 28, 0, 0, 0)
           c.add(Calendar.HOUR_OF_DAY, i)
           checkEvaluation(Month(Literal(new Date(c.getTimeInMillis))),
@@ -127,11 +143,11 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     checkEvaluation(DayOfMonth(Cast(Literal("2000-02-29"), DateType)), 29)
     checkEvaluation(DayOfMonth(Literal.create(null, DateType)), null)
     checkEvaluation(DayOfMonth(Literal(d)), 8)
-    checkEvaluation(DayOfMonth(Cast(Literal(sdfDate.format(d)), DateType)), 8)
-    checkEvaluation(DayOfMonth(Cast(Literal(ts), DateType)), 8)
+    checkEvaluation(DayOfMonth(Cast(Literal(sdfDate.format(d)), DateType, gmtId)), 8)
+    checkEvaluation(DayOfMonth(Cast(Literal(ts), DateType, gmtId)), 8)
 
+    val c = Calendar.getInstance()
     (1999 to 2000).foreach { y =>
-      val c = Calendar.getInstance()
       c.set(y, 0, 1, 0, 0, 0)
       (0 to 365).foreach { d =>
         c.add(Calendar.DATE, 1)
@@ -143,72 +159,114 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
   }
 
   test("Seconds") {
-    assert(Second(Literal.create(null, DateType)).resolved === false)
-    checkEvaluation(Second(Cast(Literal(d), TimestampType)), 0)
-    checkEvaluation(Second(Cast(Literal(sdf.format(d)), TimestampType)), 15)
-    checkEvaluation(Second(Literal(ts)), 15)
+    assert(Second(Literal.create(null, DateType), gmtId).resolved === false)
+    assert(Second(Cast(Literal(d), TimestampType), None).resolved === true)
+    checkEvaluation(Second(Cast(Literal(d), TimestampType, gmtId), gmtId), 0)
+    checkEvaluation(Second(Cast(Literal(sdf.format(d)), TimestampType, gmtId), gmtId), 15)
+    checkEvaluation(Second(Literal(ts), gmtId), 15)
 
     val c = Calendar.getInstance()
-    (0 to 60 by 5).foreach { s =>
-      c.set(2015, 18, 3, 3, 5, s)
-      checkEvaluation(Second(Literal(new Timestamp(c.getTimeInMillis))),
-        c.get(Calendar.SECOND))
+    for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) {
+      val timeZoneId = Option(tz.getID)
+      c.setTimeZone(tz)
+      (0 to 60 by 5).foreach { s =>
+        c.set(2015, 18, 3, 3, 5, s)
+        checkEvaluation(
+          Second(Literal(new Timestamp(c.getTimeInMillis)), timeZoneId),
+          c.get(Calendar.SECOND))
+      }
+      checkConsistencyBetweenInterpretedAndCodegen(
+        (child: Expression) => Second(child, timeZoneId), TimestampType)
     }
-    checkConsistencyBetweenInterpretedAndCodegen(Second, TimestampType)
   }
 
   test("WeekOfYear") {
     checkEvaluation(WeekOfYear(Literal.create(null, DateType)), null)
     checkEvaluation(WeekOfYear(Literal(d)), 15)
-    checkEvaluation(WeekOfYear(Cast(Literal(sdfDate.format(d)), DateType)), 15)
-    checkEvaluation(WeekOfYear(Cast(Literal(ts), DateType)), 45)
-    checkEvaluation(WeekOfYear(Cast(Literal("2011-05-06"), DateType)), 18)
+    checkEvaluation(WeekOfYear(Cast(Literal(sdfDate.format(d)), DateType, gmtId)), 15)
+    checkEvaluation(WeekOfYear(Cast(Literal(ts), DateType, gmtId)), 45)
+    checkEvaluation(WeekOfYear(Cast(Literal("2011-05-06"), DateType, gmtId)), 18)
     checkConsistencyBetweenInterpretedAndCodegen(WeekOfYear, DateType)
   }
 
   test("DateFormat") {
-    checkEvaluation(DateFormatClass(Literal.create(null, TimestampType), Literal("y")), null)
-    checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType),
-      Literal.create(null, StringType)), null)
-    checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType),
-      Literal("y")), "2015")
-    checkEvaluation(DateFormatClass(Literal(ts), Literal("y")), "2013")
+    checkEvaluation(
+      DateFormatClass(Literal.create(null, TimestampType), Literal("y"), gmtId),
+      null)
+    checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId),
+      Literal.create(null, StringType), gmtId), null)
+
+    checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId),
+      Literal("y"), gmtId), "2015")
+    checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), gmtId), "2013")
+    checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId),
+      Literal("H"), gmtId), "0")
+    checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), gmtId), "13")
+
+    checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, pstId),
+      Literal("y"), pstId), "2015")
+    checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), pstId), "2013")
+    checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, pstId),
+      Literal("H"), pstId), "0")
+    checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), pstId), "5")
+
+    checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, jstId),
+      Literal("y"), jstId), "2015")
+    checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), jstId), "2013")
+    checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, jstId),
+      Literal("H"), jstId), "0")
+    checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), jstId), "22")
   }
 
   test("Hour") {
-    assert(Hour(Literal.create(null, DateType)).resolved === false)
-    checkEvaluation(Hour(Cast(Literal(d), TimestampType)), 0)
-    checkEvaluation(Hour(Cast(Literal(sdf.format(d)), TimestampType)), 13)
-    checkEvaluation(Hour(Literal(ts)), 13)
+    assert(Hour(Literal.create(null, DateType), gmtId).resolved === false)
+    assert(Hour(Literal(ts), None).resolved === true)
+    checkEvaluation(Hour(Cast(Literal(d), TimestampType, gmtId), gmtId), 0)
+    checkEvaluation(Hour(Cast(Literal(sdf.format(d)), TimestampType, gmtId), gmtId), 13)
+    checkEvaluation(Hour(Literal(ts), gmtId), 13)
 
     val c = Calendar.getInstance()
-    (0 to 24).foreach { h =>
-      (0 to 60 by 15).foreach { m =>
-        (0 to 60 by 15).foreach { s =>
-          c.set(2015, 18, 3, h, m, s)
-          checkEvaluation(Hour(Literal(new Timestamp(c.getTimeInMillis))),
-            c.get(Calendar.HOUR_OF_DAY))
+    for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) {
+      val timeZoneId = Option(tz.getID)
+      c.setTimeZone(tz)
+      (0 to 24).foreach { h =>
+        (0 to 60 by 15).foreach { m =>
+          (0 to 60 by 15).foreach { s =>
+            c.set(2015, 18, 3, h, m, s)
+            checkEvaluation(
+              Hour(Literal(new Timestamp(c.getTimeInMillis)), timeZoneId),
+              c.get(Calendar.HOUR_OF_DAY))
+          }
         }
       }
+      checkConsistencyBetweenInterpretedAndCodegen(
+        (child: Expression) => Hour(child, timeZoneId), TimestampType)
     }
-    checkConsistencyBetweenInterpretedAndCodegen(Hour, TimestampType)
   }
 
   test("Minute") {
-    assert(Minute(Literal.create(null, DateType)).resolved === false)
-    checkEvaluation(Minute(Cast(Literal(d), TimestampType)), 0)
-    checkEvaluation(Minute(Cast(Literal(sdf.format(d)), TimestampType)), 10)
-    checkEvaluation(Minute(Literal(ts)), 10)
+    assert(Minute(Literal.create(null, DateType), gmtId).resolved === false)
+    assert(Minute(Literal(ts), None).resolved === true)
+    checkEvaluation(Minute(Cast(Literal(d), TimestampType, gmtId), gmtId), 0)
+    checkEvaluation(
+      Minute(Cast(Literal(sdf.format(d)), TimestampType, gmtId), gmtId), 10)
+    checkEvaluation(Minute(Literal(ts), gmtId), 10)
 
     val c = Calendar.getInstance()
-    (0 to 60 by 5).foreach { m =>
-      (0 to 60 by 15).foreach { s =>
-        c.set(2015, 18, 3, 3, m, s)
-        checkEvaluation(Minute(Literal(new Timestamp(c.getTimeInMillis))),
-          c.get(Calendar.MINUTE))
+    for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) {
+      val timeZoneId = Option(tz.getID)
+      c.setTimeZone(tz)
+      (0 to 60 by 5).foreach { m =>
+        (0 to 60 by 15).foreach { s =>
+          c.set(2015, 18, 3, 3, m, s)
+          checkEvaluation(
+            Minute(Literal(new Timestamp(c.getTimeInMillis)), timeZoneId),
+            c.get(Calendar.MINUTE))
+        }
       }
+      checkConsistencyBetweenInterpretedAndCodegen(
+        (child: Expression) => Minute(child, timeZoneId), TimestampType)
     }
-    checkConsistencyBetweenInterpretedAndCodegen(Minute, TimestampType)
   }
 
   test("date_add") {
@@ -250,46 +308,86 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
   }
 
   test("time_add") {
-    checkEvaluation(
-      TimeAdd(Literal(Timestamp.valueOf("2016-01-29 10:00:00")),
-        Literal(new CalendarInterval(1, 123000L))),
-      DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2016-02-29 10:00:00.123")))
+    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS", Locale.US)
+    for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) {
+      val timeZoneId = Option(tz.getID)
+      sdf.setTimeZone(tz)
 
-    checkEvaluation(
-      TimeAdd(Literal.create(null, TimestampType), Literal(new CalendarInterval(1, 123000L))),
-      null)
-    checkEvaluation(
-      TimeAdd(Literal(Timestamp.valueOf("2016-01-29 10:00:00")),
-        Literal.create(null, CalendarIntervalType)),
-      null)
-    checkEvaluation(
-      TimeAdd(Literal.create(null, TimestampType), Literal.create(null, CalendarIntervalType)),
-      null)
-    checkConsistencyBetweenInterpretedAndCodegen(TimeAdd, TimestampType, CalendarIntervalType)
+      checkEvaluation(
+        TimeAdd(
+          Literal(new Timestamp(sdf.parse("2016-01-29 10:00:00.000").getTime)),
+          Literal(new CalendarInterval(1, 123000L)),
+          timeZoneId),
+        DateTimeUtils.fromJavaTimestamp(
+          new Timestamp(sdf.parse("2016-02-29 10:00:00.123").getTime)))
+
+      checkEvaluation(
+        TimeAdd(
+          Literal.create(null, TimestampType),
+          Literal(new CalendarInterval(1, 123000L)),
+          timeZoneId),
+        null)
+      checkEvaluation(
+        TimeAdd(
+          Literal(new Timestamp(sdf.parse("2016-01-29 10:00:00.000").getTime)),
+          Literal.create(null, CalendarIntervalType),
+          timeZoneId),
+        null)
+      checkEvaluation(
+        TimeAdd(
+          Literal.create(null, TimestampType),
+          Literal.create(null, CalendarIntervalType),
+          timeZoneId),
+        null)
+      checkConsistencyBetweenInterpretedAndCodegen(
+        (start: Expression, interval: Expression) => TimeAdd(start, interval, timeZoneId),
+        TimestampType, CalendarIntervalType)
+    }
   }
 
   test("time_sub") {
-    checkEvaluation(
-      TimeSub(Literal(Timestamp.valueOf("2016-03-31 10:00:00")),
-        Literal(new CalendarInterval(1, 0))),
-      DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2016-02-29 10:00:00")))
-    checkEvaluation(
-      TimeSub(
-        Literal(Timestamp.valueOf("2016-03-30 00:00:01")),
-        Literal(new CalendarInterval(1, 2000000.toLong))),
-      DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2016-02-28 23:59:59")))
+    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS", Locale.US)
+    for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) {
+      val timeZoneId = Option(tz.getID)
+      sdf.setTimeZone(tz)
 
-    checkEvaluation(
-      TimeSub(Literal.create(null, TimestampType), Literal(new CalendarInterval(1, 123000L))),
-      null)
-    checkEvaluation(
-      TimeSub(Literal(Timestamp.valueOf("2016-01-29 10:00:00")),
-        Literal.create(null, CalendarIntervalType)),
-      null)
-    checkEvaluation(
-      TimeSub(Literal.create(null, TimestampType), Literal.create(null, CalendarIntervalType)),
-      null)
-    checkConsistencyBetweenInterpretedAndCodegen(TimeSub, TimestampType, CalendarIntervalType)
+      checkEvaluation(
+        TimeSub(
+          Literal(new Timestamp(sdf.parse("2016-03-31 10:00:00.000").getTime)),
+          Literal(new CalendarInterval(1, 0)),
+          timeZoneId),
+        DateTimeUtils.fromJavaTimestamp(
+          new Timestamp(sdf.parse("2016-02-29 10:00:00.000").getTime)))
+      checkEvaluation(
+        TimeSub(
+          Literal(new Timestamp(sdf.parse("2016-03-30 00:00:01.000").getTime)),
+          Literal(new CalendarInterval(1, 2000000.toLong)),
+          timeZoneId),
+        DateTimeUtils.fromJavaTimestamp(
+          new Timestamp(sdf.parse("2016-02-28 23:59:59.000").getTime)))
+
+      checkEvaluation(
+        TimeSub(
+          Literal.create(null, TimestampType),
+          Literal(new CalendarInterval(1, 123000L)),
+          timeZoneId),
+        null)
+      checkEvaluation(
+        TimeSub(
+          Literal(new Timestamp(sdf.parse("2016-01-29 10:00:00.000").getTime)),
+          Literal.create(null, CalendarIntervalType),
+          timeZoneId),
+        null)
+      checkEvaluation(
+        TimeSub(
+          Literal.create(null, TimestampType),
+          Literal.create(null, CalendarIntervalType),
+          timeZoneId),
+        null)
+      checkConsistencyBetweenInterpretedAndCodegen(
+        (start: Expression, interval: Expression) => TimeSub(start, interval, timeZoneId),
+        TimestampType, CalendarIntervalType)
+    }
   }
 
   test("add_months") {
@@ -313,28 +411,44 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
   }
 
   test("months_between") {
-    checkEvaluation(
-      MonthsBetween(Literal(Timestamp.valueOf("1997-02-28 10:30:00")),
-        Literal(Timestamp.valueOf("1996-10-30 00:00:00"))),
-      3.94959677)
-    checkEvaluation(
-      MonthsBetween(Literal(Timestamp.valueOf("2015-01-30 11:52:00")),
-        Literal(Timestamp.valueOf("2015-01-30 11:50:00"))),
-      0.0)
-    checkEvaluation(
-      MonthsBetween(Literal(Timestamp.valueOf("2015-01-31 00:00:00")),
-        Literal(Timestamp.valueOf("2015-03-31 22:00:00"))),
-      -2.0)
-    checkEvaluation(
-      MonthsBetween(Literal(Timestamp.valueOf("2015-03-31 22:00:00")),
-        Literal(Timestamp.valueOf("2015-02-28 00:00:00"))),
-      1.0)
-    val t = Literal(Timestamp.valueOf("2015-03-31 22:00:00"))
-    val tnull = Literal.create(null, TimestampType)
-    checkEvaluation(MonthsBetween(t, tnull), null)
-    checkEvaluation(MonthsBetween(tnull, t), null)
-    checkEvaluation(MonthsBetween(tnull, tnull), null)
-    checkConsistencyBetweenInterpretedAndCodegen(MonthsBetween, TimestampType, TimestampType)
+    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
+    for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) {
+      val timeZoneId = Option(tz.getID)
+      sdf.setTimeZone(tz)
+
+      checkEvaluation(
+        MonthsBetween(
+          Literal(new Timestamp(sdf.parse("1997-02-28 10:30:00").getTime)),
+          Literal(new Timestamp(sdf.parse("1996-10-30 00:00:00").getTime)),
+          timeZoneId),
+        3.94959677)
+      checkEvaluation(
+        MonthsBetween(
+          Literal(new Timestamp(sdf.parse("2015-01-30 11:52:00").getTime)),
+          Literal(new Timestamp(sdf.parse("2015-01-30 11:50:00").getTime)),
+          timeZoneId),
+        0.0)
+      checkEvaluation(
+        MonthsBetween(
+          Literal(new Timestamp(sdf.parse("2015-01-31 00:00:00").getTime)),
+          Literal(new Timestamp(sdf.parse("2015-03-31 22:00:00").getTime)),
+          timeZoneId),
+        -2.0)
+      checkEvaluation(
+        MonthsBetween(
+          Literal(new Timestamp(sdf.parse("2015-03-31 22:00:00").getTime)),
+          Literal(new Timestamp(sdf.parse("2015-02-28 00:00:00").getTime)),
+          timeZoneId),
+        1.0)
+      val t = Literal(Timestamp.valueOf("2015-03-31 22:00:00"))
+      val tnull = Literal.create(null, TimestampType)
+      checkEvaluation(MonthsBetween(t, tnull, timeZoneId), null)
+      checkEvaluation(MonthsBetween(tnull, t, timeZoneId), null)
+      checkEvaluation(MonthsBetween(tnull, tnull, timeZoneId), null)
+      checkConsistencyBetweenInterpretedAndCodegen(
+        (time1: Expression, time2: Expression) => MonthsBetween(time1, time2, timeZoneId),
+        TimestampType, TimestampType)
+    }
   }
 
   test("last_day") {
@@ -398,7 +512,7 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
         expected)
     }
     val date = Date.valueOf("2015-07-22")
-    Seq("yyyy", "YYYY", "year", "YEAR", "yy", "YY").foreach{ fmt =>
+    Seq("yyyy", "YYYY", "year", "YEAR", "yy", "YY").foreach { fmt =>
       testTrunc(date, fmt, Date.valueOf("2015-01-01"))
     }
     Seq("month", "MONTH", "mon", "MON", "mm", "MM").foreach { fmt =>
@@ -414,19 +528,32 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
     val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
     val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
-    checkEvaluation(
-      FromUnixTime(Literal(0L), Literal("yyyy-MM-dd HH:mm:ss")), sdf1.format(new Timestamp(0)))
-    checkEvaluation(FromUnixTime(
-      Literal(1000L), Literal("yyyy-MM-dd HH:mm:ss")), sdf1.format(new Timestamp(1000000)))
-    checkEvaluation(
-      FromUnixTime(Literal(-1000L), Literal(fmt2)), sdf2.format(new Timestamp(-1000000)))
-    checkEvaluation(
-      FromUnixTime(Literal.create(null, LongType), Literal.create(null, StringType)), null)
-    checkEvaluation(
-      FromUnixTime(Literal.create(null, LongType), Literal("yyyy-MM-dd HH:mm:ss")), null)
-    checkEvaluation(FromUnixTime(Literal(1000L), Literal.create(null, StringType)), null)
-    checkEvaluation(
-      FromUnixTime(Literal(0L), Literal("not a valid format")), null)
+    for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) {
+      val timeZoneId = Option(tz.getID)
+      sdf1.setTimeZone(tz)
+      sdf2.setTimeZone(tz)
+
+      checkEvaluation(
+        FromUnixTime(Literal(0L), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId),
+        sdf1.format(new Timestamp(0)))
+      checkEvaluation(FromUnixTime(
+        Literal(1000L), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId),
+        sdf1.format(new Timestamp(1000000)))
+      checkEvaluation(
+        FromUnixTime(Literal(-1000L), Literal(fmt2), timeZoneId),
+        sdf2.format(new Timestamp(-1000000)))
+      checkEvaluation(
+        FromUnixTime(Literal.create(null, LongType), Literal.create(null, StringType), timeZoneId),
+        null)
+      checkEvaluation(
+        FromUnixTime(Literal.create(null, LongType), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId),
+        null)
+      checkEvaluation(
+        FromUnixTime(Literal(1000L), Literal.create(null, StringType), timeZoneId),
+        null)
+      checkEvaluation(
+        FromUnixTime(Literal(0L), Literal("not a valid format"), timeZoneId), null)
+    }
   }
 
   test("unix_timestamp") {
@@ -435,34 +562,53 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
     val fmt3 = "yy-MM-dd"
     val sdf3 = new SimpleDateFormat(fmt3, Locale.US)
-    val date1 = Date.valueOf("2015-07-24")
-    checkEvaluation(
-      UnixTimestamp(Literal(sdf1.format(new Timestamp(0))), Literal("yyyy-MM-dd HH:mm:ss")), 0L)
-    checkEvaluation(UnixTimestamp(
-      Literal(sdf1.format(new Timestamp(1000000))), Literal("yyyy-MM-dd HH:mm:ss")), 1000L)
-    checkEvaluation(
-      UnixTimestamp(Literal(new Timestamp(1000000)), Literal("yyyy-MM-dd HH:mm:ss")), 1000L)
-    checkEvaluation(
-      UnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss")),
-      DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1)) / 1000L)
-    checkEvaluation(
-      UnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))), Literal(fmt2)), -1000L)
-    checkEvaluation(UnixTimestamp(
-      Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3)),
-      DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24"))) / 1000L)
-    val t1 = UnixTimestamp(
-      CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long]
-    val t2 = UnixTimestamp(
-      CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long]
-    assert(t2 - t1 <= 1)
-    checkEvaluation(
-      UnixTimestamp(Literal.create(null, DateType), Literal.create(null, StringType)), null)
-    checkEvaluation(
-      UnixTimestamp(Literal.create(null, DateType), Literal("yyyy-MM-dd HH:mm:ss")), null)
-    checkEvaluation(UnixTimestamp(
-      Literal(date1), Literal.create(null, StringType)), date1.getTime / 1000L)
-    checkEvaluation(
-      UnixTimestamp(Literal("2015-07-24"), Literal("not a valid format")), null)
+    sdf3.setTimeZone(TimeZoneGMT)
+
+    withDefaultTimeZone(TimeZoneGMT) {
+      for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) {
+        val timeZoneId = Option(tz.getID)
+        sdf1.setTimeZone(tz)
+        sdf2.setTimeZone(tz)
+
+        val date1 = Date.valueOf("2015-07-24")
+        checkEvaluation(UnixTimestamp(
+          Literal(sdf1.format(new Timestamp(0))), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), 0L)
+        checkEvaluation(UnixTimestamp(
+          Literal(sdf1.format(new Timestamp(1000000))), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId),
+          1000L)
+        checkEvaluation(
+          UnixTimestamp(
+            Literal(new Timestamp(1000000)), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId),
+          1000L)
+        checkEvaluation(
+          UnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId),
+          DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz) / 1000L)
+        checkEvaluation(
+          UnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))), Literal(fmt2), timeZoneId),
+          -1000L)
+        checkEvaluation(UnixTimestamp(
+          Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId),
+          DateTimeUtils.daysToMillis(
+            DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz) / 1000L)
+        val t1 = UnixTimestamp(
+          CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long]
+        val t2 = UnixTimestamp(
+          CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long]
+        assert(t2 - t1 <= 1)
+        checkEvaluation(
+          UnixTimestamp(
+            Literal.create(null, DateType), Literal.create(null, StringType), timeZoneId),
+          null)
+        checkEvaluation(
+          UnixTimestamp(Literal.create(null, DateType), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId),
+          null)
+        checkEvaluation(
+          UnixTimestamp(Literal(date1), Literal.create(null, StringType), timeZoneId),
+          DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz) / 1000L)
+        checkEvaluation(
+          UnixTimestamp(Literal("2015-07-24"), Literal("not a valid format"), timeZoneId), null)
+      }
+    }
   }
 
   test("to_unix_timestamp") {
@@ -471,34 +617,51 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
     val fmt3 = "yy-MM-dd"
     val sdf3 = new SimpleDateFormat(fmt3, Locale.US)
-    val date1 = Date.valueOf("2015-07-24")
-    checkEvaluation(
-      ToUnixTimestamp(Literal(sdf1.format(new Timestamp(0))), Literal("yyyy-MM-dd HH:mm:ss")), 0L)
-    checkEvaluation(ToUnixTimestamp(
-      Literal(sdf1.format(new Timestamp(1000000))), Literal("yyyy-MM-dd HH:mm:ss")), 1000L)
-    checkEvaluation(
-      ToUnixTimestamp(Literal(new Timestamp(1000000)), Literal("yyyy-MM-dd HH:mm:ss")), 1000L)
-    checkEvaluation(
-      ToUnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss")),
-      DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1)) / 1000L)
-    checkEvaluation(
-      ToUnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))), Literal(fmt2)), -1000L)
-    checkEvaluation(ToUnixTimestamp(
-      Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3)),
-      DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24"))) / 1000L)
-    val t1 = ToUnixTimestamp(
-      CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long]
-    val t2 = ToUnixTimestamp(
-      CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long]
-    assert(t2 - t1 <= 1)
-    checkEvaluation(
-      ToUnixTimestamp(Literal.create(null, DateType), Literal.create(null, StringType)), null)
-    checkEvaluation(
-      ToUnixTimestamp(Literal.create(null, DateType), Literal("yyyy-MM-dd HH:mm:ss")), null)
-    checkEvaluation(ToUnixTimestamp(
-      Literal(date1), Literal.create(null, StringType)), date1.getTime / 1000L)
-    checkEvaluation(
-      ToUnixTimestamp(Literal("2015-07-24"), Literal("not a valid format")), null)
+    sdf3.setTimeZone(TimeZoneGMT)
+
+    withDefaultTimeZone(TimeZoneGMT) {
+      for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) {
+        val timeZoneId = Option(tz.getID)
+        sdf1.setTimeZone(tz)
+        sdf2.setTimeZone(tz)
+
+        val date1 = Date.valueOf("2015-07-24")
+        checkEvaluation(ToUnixTimestamp(
+          Literal(sdf1.format(new Timestamp(0))), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), 0L)
+        checkEvaluation(ToUnixTimestamp(
+          Literal(sdf1.format(new Timestamp(1000000))), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId),
+          1000L)
+        checkEvaluation(ToUnixTimestamp(
+          Literal(new Timestamp(1000000)), Literal("yyyy-MM-dd HH:mm:ss")),
+          1000L)
+        checkEvaluation(
+          ToUnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId),
+          DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz) / 1000L)
+        checkEvaluation(
+          ToUnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))), Literal(fmt2), timeZoneId),
+          -1000L)
+        checkEvaluation(ToUnixTimestamp(
+          Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId),
+          DateTimeUtils.daysToMillis(
+            DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz) / 1000L)
+        val t1 = ToUnixTimestamp(
+          CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long]
+        val t2 = ToUnixTimestamp(
+          CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long]
+        assert(t2 - t1 <= 1)
+        checkEvaluation(ToUnixTimestamp(
+          Literal.create(null, DateType), Literal.create(null, StringType), timeZoneId), null)
+        checkEvaluation(
+          ToUnixTimestamp(
+            Literal.create(null, DateType), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId),
+          null)
+        checkEvaluation(ToUnixTimestamp(
+          Literal(date1), Literal.create(null, StringType), timeZoneId),
+          DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz) / 1000L)
+        checkEvaluation(
+          ToUnixTimestamp(Literal("2015-07-24"), Literal("not a valid format"), timeZoneId), null)
+      }
+    }
   }
 
   test("datediff") {

http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala
index a313681..a0d4896 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
@@ -33,7 +34,7 @@ class BinaryComparisonSimplificationSuite extends PlanTest with PredicateHelper
       Batch("AnalysisNodes", Once,
         EliminateSubqueryAliases) ::
       Batch("Constant Folding", FixedPoint(50),
-        NullPropagation,
+        NullPropagation(SimpleCatalystConf(caseSensitiveAnalysis = true)),
         ConstantFolding,
         BooleanSimplification,
         SimplifyBinaryComparison,

http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
index 8147d06..1b9db06 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
@@ -34,7 +34,7 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper {
       Batch("AnalysisNodes", Once,
         EliminateSubqueryAliases) ::
       Batch("Constant Folding", FixedPoint(50),
-        NullPropagation,
+        NullPropagation(SimpleCatalystConf(caseSensitiveAnalysis = true)),
         ConstantFolding,
         BooleanSimplification,
         PruneFilters) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
index 87ad81d..276b805 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -32,7 +33,7 @@ class CombiningLimitsSuite extends PlanTest {
       Batch("Combine Limit", FixedPoint(10),
         CombineLimits) ::
       Batch("Constant Folding", FixedPoint(10),
-        NullPropagation,
+        NullPropagation(SimpleCatalystConf(caseSensitiveAnalysis = true)),
         ConstantFolding,
         BooleanSimplification,
         SimplifyConditionals) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecimalAggregatesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecimalAggregatesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecimalAggregatesSuite.scala
index 711294e..a491f44 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecimalAggregatesSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecimalAggregatesSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
@@ -29,7 +30,7 @@ class DecimalAggregatesSuite extends PlanTest {
 
   object Optimize extends RuleExecutor[LogicalPlan] {
     val batches = Batch("Decimal Optimizations", FixedPoint(100),
-      DecimalAggregates) :: Nil
+      DecimalAggregates(SimpleCatalystConf(caseSensitiveAnalysis = true))) :: Nil
   }
 
   val testRelation = LocalRelation('a.decimal(2, 1), 'b.decimal(12, 1))

http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
index 0877207..9daede1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
@@ -34,7 +34,7 @@ class OptimizeInSuite extends PlanTest {
       Batch("AnalysisNodes", Once,
         EliminateSubqueryAliases) ::
       Batch("ConstantFolding", FixedPoint(10),
-        NullPropagation,
+        NullPropagation(SimpleCatalystConf(caseSensitiveAnalysis = true)),
         ConstantFolding,
         BooleanSimplification,
         OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true))) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
index a191aa8..908b370 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
@@ -17,13 +17,15 @@
 
 package org.apache.spark.sql.catalyst.plans
 
+import java.util.TimeZone
+
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType}
+import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StringType}
 
 class ConstraintPropagationSuite extends SparkFunSuite {
 
@@ -49,6 +51,10 @@ class ConstraintPropagationSuite extends SparkFunSuite {
     }
   }
 
+  private def castWithTimeZone(expr: Expression, dataType: DataType) = {
+    Cast(expr, dataType, Option(TimeZone.getDefault().getID))
+  }
+
   test("propagating constraints in filters") {
     val tr = LocalRelation('a.int, 'b.string, 'c.int)
 
@@ -276,14 +282,15 @@ class ConstraintPropagationSuite extends SparkFunSuite {
       tr.where('a.attr === 'b.attr &&
         'c.attr + 100 > 'd.attr &&
         IsNotNull(Cast(Cast(resolveColumn(tr, "e"), LongType), LongType))).analyze.constraints,
-      ExpressionSet(Seq(Cast(resolveColumn(tr, "a"), LongType) === resolveColumn(tr, "b"),
-        Cast(resolveColumn(tr, "c") + 100, LongType) > resolveColumn(tr, "d"),
+      ExpressionSet(Seq(
+        castWithTimeZone(resolveColumn(tr, "a"), LongType) === resolveColumn(tr, "b"),
+        castWithTimeZone(resolveColumn(tr, "c") + 100, LongType) > resolveColumn(tr, "d"),
         IsNotNull(resolveColumn(tr, "a")),
         IsNotNull(resolveColumn(tr, "b")),
         IsNotNull(resolveColumn(tr, "c")),
         IsNotNull(resolveColumn(tr, "d")),
         IsNotNull(resolveColumn(tr, "e")),
-        IsNotNull(Cast(Cast(resolveColumn(tr, "e"), LongType), LongType)))))
+        IsNotNull(castWithTimeZone(castWithTimeZone(resolveColumn(tr, "e"), LongType), LongType)))))
   }
 
   test("infer isnotnull constraints from compound expressions") {
@@ -294,22 +301,25 @@ class ConstraintPropagationSuite extends SparkFunSuite {
           Cast(
             Cast(Cast(resolveColumn(tr, "e"), LongType), LongType), LongType))).analyze.constraints,
       ExpressionSet(Seq(
-        Cast(resolveColumn(tr, "a"), LongType) + resolveColumn(tr, "b") ===
-          Cast(resolveColumn(tr, "c"), LongType),
+        castWithTimeZone(resolveColumn(tr, "a"), LongType) + resolveColumn(tr, "b") ===
+          castWithTimeZone(resolveColumn(tr, "c"), LongType),
         IsNotNull(resolveColumn(tr, "a")),
         IsNotNull(resolveColumn(tr, "b")),
         IsNotNull(resolveColumn(tr, "c")),
         IsNotNull(resolveColumn(tr, "e")),
-        IsNotNull(Cast(Cast(Cast(resolveColumn(tr, "e"), LongType), LongType), LongType)))))
+        IsNotNull(
+          castWithTimeZone(castWithTimeZone(castWithTimeZone(
+            resolveColumn(tr, "e"), LongType), LongType), LongType)))))
 
     verifyConstraints(
       tr.where(('a.attr * 'b.attr + 100) === 'c.attr && 'd / 10 === 'e).analyze.constraints,
       ExpressionSet(Seq(
-        Cast(resolveColumn(tr, "a"), LongType) * resolveColumn(tr, "b") + Cast(100, LongType) ===
-          Cast(resolveColumn(tr, "c"), LongType),
-        Cast(resolveColumn(tr, "d"), DoubleType) /
-          Cast(10, DoubleType) ===
-            Cast(resolveColumn(tr, "e"), DoubleType),
+        castWithTimeZone(resolveColumn(tr, "a"), LongType) * resolveColumn(tr, "b") +
+          castWithTimeZone(100, LongType) ===
+            castWithTimeZone(resolveColumn(tr, "c"), LongType),
+        castWithTimeZone(resolveColumn(tr, "d"), DoubleType) /
+          castWithTimeZone(10, DoubleType) ===
+            castWithTimeZone(resolveColumn(tr, "e"), DoubleType),
         IsNotNull(resolveColumn(tr, "a")),
         IsNotNull(resolveColumn(tr, "b")),
         IsNotNull(resolveColumn(tr, "c")),
@@ -319,11 +329,12 @@ class ConstraintPropagationSuite extends SparkFunSuite {
     verifyConstraints(
       tr.where(('a.attr * 'b.attr - 10) >= 'c.attr && 'd / 10 < 'e).analyze.constraints,
       ExpressionSet(Seq(
-        Cast(resolveColumn(tr, "a"), LongType) * resolveColumn(tr, "b") - Cast(10, LongType) >=
-          Cast(resolveColumn(tr, "c"), LongType),
-        Cast(resolveColumn(tr, "d"), DoubleType) /
-          Cast(10, DoubleType) <
-            Cast(resolveColumn(tr, "e"), DoubleType),
+        castWithTimeZone(resolveColumn(tr, "a"), LongType) * resolveColumn(tr, "b") -
+          castWithTimeZone(10, LongType) >=
+            castWithTimeZone(resolveColumn(tr, "c"), LongType),
+        castWithTimeZone(resolveColumn(tr, "d"), DoubleType) /
+          castWithTimeZone(10, DoubleType) <
+            castWithTimeZone(resolveColumn(tr, "e"), DoubleType),
         IsNotNull(resolveColumn(tr, "a")),
         IsNotNull(resolveColumn(tr, "b")),
         IsNotNull(resolveColumn(tr, "c")),
@@ -333,9 +344,9 @@ class ConstraintPropagationSuite extends SparkFunSuite {
     verifyConstraints(
       tr.where('a.attr + 'b.attr - 'c.attr * 'd.attr > 'e.attr * 1000).analyze.constraints,
       ExpressionSet(Seq(
-        (Cast(resolveColumn(tr, "a"), LongType) + resolveColumn(tr, "b")) -
-          (Cast(resolveColumn(tr, "c"), LongType) * resolveColumn(tr, "d")) >
-            Cast(resolveColumn(tr, "e") * 1000, LongType),
+        (castWithTimeZone(resolveColumn(tr, "a"), LongType) + resolveColumn(tr, "b")) -
+          (castWithTimeZone(resolveColumn(tr, "c"), LongType) * resolveColumn(tr, "d")) >
+            castWithTimeZone(resolveColumn(tr, "e") * 1000, LongType),
         IsNotNull(resolveColumn(tr, "a")),
         IsNotNull(resolveColumn(tr, "b")),
         IsNotNull(resolveColumn(tr, "c")),

http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index e0a9a0c..9799817 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -27,6 +27,8 @@ import org.apache.spark.unsafe.types.UTF8String
 
 class DateTimeUtilsSuite extends SparkFunSuite {
 
+  val TimeZonePST = TimeZone.getTimeZone("PST")
+
   private[this] def getInUTCDays(timestamp: Long): Int = {
     val tz = TimeZone.getDefault
     ((timestamp + tz.getOffset(timestamp)) / MILLIS_PER_DAY).toInt
@@ -177,180 +179,155 @@ class DateTimeUtilsSuite extends SparkFunSuite {
   }
 
   test("string to timestamp") {
-    var c = Calendar.getInstance()
-    c.set(1969, 11, 31, 16, 0, 0)
-    c.set(Calendar.MILLISECOND, 0)
-    assert(stringToTimestamp(UTF8String.fromString("1969-12-31 16:00:00")).get ===
-      c.getTimeInMillis * 1000)
-    c.set(1, 0, 1, 0, 0, 0)
-    c.set(Calendar.MILLISECOND, 0)
-    assert(stringToTimestamp(UTF8String.fromString("0001")).get ===
-      c.getTimeInMillis * 1000)
-    c = Calendar.getInstance()
-    c.set(2015, 2, 1, 0, 0, 0)
-    c.set(Calendar.MILLISECOND, 0)
-    assert(stringToTimestamp(UTF8String.fromString("2015-03")).get ===
-      c.getTimeInMillis * 1000)
-    c = Calendar.getInstance()
-    c.set(2015, 2, 18, 0, 0, 0)
-    c.set(Calendar.MILLISECOND, 0)
-    assert(stringToTimestamp(UTF8String.fromString("2015-03-18")).get ===
-      c.getTimeInMillis * 1000)
-    assert(stringToTimestamp(UTF8String.fromString("2015-03-18 ")).get ===
-      c.getTimeInMillis * 1000)
-    assert(stringToTimestamp(UTF8String.fromString("2015-03-18T")).get ===
-      c.getTimeInMillis * 1000)
-
-    c = Calendar.getInstance()
-    c.set(2015, 2, 18, 12, 3, 17)
-    c.set(Calendar.MILLISECOND, 0)
-    assert(stringToTimestamp(UTF8String.fromString("2015-03-18 12:03:17")).get ===
-      c.getTimeInMillis * 1000)
-    assert(stringToTimestamp(UTF8String.fromString("2015-03-18T12:03:17")).get ===
-      c.getTimeInMillis * 1000)
-
-    c = Calendar.getInstance(TimeZone.getTimeZone("GMT-13:53"))
-    c.set(2015, 2, 18, 12, 3, 17)
-    c.set(Calendar.MILLISECOND, 0)
-    assert(stringToTimestamp(
-      UTF8String.fromString("2015-03-18T12:03:17-13:53")).get === c.getTimeInMillis * 1000)
-
-    c = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
-    c.set(2015, 2, 18, 12, 3, 17)
-    c.set(Calendar.MILLISECOND, 0)
-    assert(stringToTimestamp(UTF8String.fromString("2015-03-18T12:03:17Z")).get ===
-      c.getTimeInMillis * 1000)
-    assert(stringToTimestamp(UTF8String.fromString("2015-03-18 12:03:17Z")).get ===
-      c.getTimeInMillis * 1000)
-
-    c = Calendar.getInstance(TimeZone.getTimeZone("GMT-01:00"))
-    c.set(2015, 2, 18, 12, 3, 17)
-    c.set(Calendar.MILLISECOND, 0)
-    assert(stringToTimestamp(UTF8String.fromString("2015-03-18T12:03:17-1:0")).get ===
-      c.getTimeInMillis * 1000)
-    assert(stringToTimestamp(
-      UTF8String.fromString("2015-03-18T12:03:17-01:00")).get === c.getTimeInMillis * 1000)
-
-    c = Calendar.getInstance(TimeZone.getTimeZone("GMT+07:30"))
-    c.set(2015, 2, 18, 12, 3, 17)
-    c.set(Calendar.MILLISECOND, 0)
-    assert(stringToTimestamp(
-      UTF8String.fromString("2015-03-18T12:03:17+07:30")).get === c.getTimeInMillis * 1000)
-
-    c = Calendar.getInstance(TimeZone.getTimeZone("GMT+07:03"))
-    c.set(2015, 2, 18, 12, 3, 17)
-    c.set(Calendar.MILLISECOND, 0)
-    assert(stringToTimestamp(
-      UTF8String.fromString("2015-03-18T12:03:17+07:03")).get === c.getTimeInMillis * 1000)
-
-    c = Calendar.getInstance()
-    c.set(2015, 2, 18, 12, 3, 17)
-    c.set(Calendar.MILLISECOND, 123)
-    assert(stringToTimestamp(
-      UTF8String.fromString("2015-03-18 12:03:17.123")).get === c.getTimeInMillis * 1000)
-    assert(stringToTimestamp(
-      UTF8String.fromString("2015-03-18T12:03:17.123")).get === c.getTimeInMillis * 1000)
-
-    c = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
-    c.set(2015, 2, 18, 12, 3, 17)
-    c.set(Calendar.MILLISECOND, 456)
-    assert(stringToTimestamp(
-      UTF8String.fromString("2015-03-18T12:03:17.456Z")).get  === c.getTimeInMillis * 1000)
-    assert(stringToTimestamp(
-      UTF8String.fromString("2015-03-18 12:03:17.456Z")).get  === c.getTimeInMillis * 1000)
-
-    c = Calendar.getInstance(TimeZone.getTimeZone("GMT-01:00"))
-    c.set(2015, 2, 18, 12, 3, 17)
-    c.set(Calendar.MILLISECOND, 123)
-    assert(stringToTimestamp(
-      UTF8String.fromString("2015-03-18T12:03:17.123-1:0")).get  === c.getTimeInMillis * 1000)
-    assert(stringToTimestamp(
-      UTF8String.fromString("2015-03-18T12:03:17.123-01:00")).get ===  c.getTimeInMillis * 1000)
-
-    c = Calendar.getInstance(TimeZone.getTimeZone("GMT+07:30"))
-    c.set(2015, 2, 18, 12, 3, 17)
-    c.set(Calendar.MILLISECOND, 123)
-    assert(stringToTimestamp(
-      UTF8String.fromString("2015-03-18T12:03:17.123+07:30")).get ===  c.getTimeInMillis * 1000)
-
-    c = Calendar.getInstance(TimeZone.getTimeZone("GMT+07:30"))
-    c.set(2015, 2, 18, 12, 3, 17)
-    c.set(Calendar.MILLISECOND, 123)
-    assert(stringToTimestamp(
-      UTF8String.fromString("2015-03-18T12:03:17.123+07:30")).get === c.getTimeInMillis * 1000)
-
-    c = Calendar.getInstance(TimeZone.getTimeZone("GMT+07:30"))
-    c.set(2015, 2, 18, 12, 3, 17)
-    c.set(Calendar.MILLISECOND, 123)
-    assert(stringToTimestamp(
-      UTF8String.fromString("2015-03-18T12:03:17.123121+7:30")).get ===
-        c.getTimeInMillis * 1000 + 121)
-
-    c = Calendar.getInstance(TimeZone.getTimeZone("GMT+07:30"))
-    c.set(2015, 2, 18, 12, 3, 17)
-    c.set(Calendar.MILLISECOND, 123)
-    assert(stringToTimestamp(
-      UTF8String.fromString("2015-03-18T12:03:17.12312+7:30")).get ===
-        c.getTimeInMillis * 1000 + 120)
-
-    c = Calendar.getInstance()
-    c.set(Calendar.HOUR_OF_DAY, 18)
-    c.set(Calendar.MINUTE, 12)
-    c.set(Calendar.SECOND, 15)
-    c.set(Calendar.MILLISECOND, 0)
-    assert(stringToTimestamp(
-      UTF8String.fromString("18:12:15")).get ===
-      c.getTimeInMillis * 1000)
-
-    c = Calendar.getInstance(TimeZone.getTimeZone("GMT+07:30"))
-    c.set(Calendar.HOUR_OF_DAY, 18)
-    c.set(Calendar.MINUTE, 12)
-    c.set(Calendar.SECOND, 15)
-    c.set(Calendar.MILLISECOND, 123)
-    assert(stringToTimestamp(
-      UTF8String.fromString("T18:12:15.12312+7:30")).get ===
-      c.getTimeInMillis * 1000 + 120)
-
-    c = Calendar.getInstance(TimeZone.getTimeZone("GMT+07:30"))
-    c.set(Calendar.HOUR_OF_DAY, 18)
-    c.set(Calendar.MINUTE, 12)
-    c.set(Calendar.SECOND, 15)
-    c.set(Calendar.MILLISECOND, 123)
-    assert(stringToTimestamp(
-      UTF8String.fromString("18:12:15.12312+7:30")).get ===
-      c.getTimeInMillis * 1000 + 120)
-
-    c = Calendar.getInstance()
-    c.set(2011, 4, 6, 7, 8, 9)
-    c.set(Calendar.MILLISECOND, 100)
-    assert(stringToTimestamp(
-      UTF8String.fromString("2011-05-06 07:08:09.1000")).get === c.getTimeInMillis * 1000)
-
-    assert(stringToTimestamp(UTF8String.fromString("238")).isEmpty)
-    assert(stringToTimestamp(UTF8String.fromString("00238")).isEmpty)
-    assert(stringToTimestamp(UTF8String.fromString("2015-03-18 123142")).isEmpty)
-    assert(stringToTimestamp(UTF8String.fromString("2015-03-18T123123")).isEmpty)
-    assert(stringToTimestamp(UTF8String.fromString("2015-03-18X")).isEmpty)
-    assert(stringToTimestamp(UTF8String.fromString("2015/03/18")).isEmpty)
-    assert(stringToTimestamp(UTF8String.fromString("2015.03.18")).isEmpty)
-    assert(stringToTimestamp(UTF8String.fromString("20150318")).isEmpty)
-    assert(stringToTimestamp(UTF8String.fromString("2015-031-8")).isEmpty)
-    assert(stringToTimestamp(UTF8String.fromString("02015-01-18")).isEmpty)
-    assert(stringToTimestamp(UTF8String.fromString("015-01-18")).isEmpty)
-    assert(stringToTimestamp(
-      UTF8String.fromString("2015-03-18T12:03.17-20:0")).isEmpty)
-    assert(stringToTimestamp(
-      UTF8String.fromString("2015-03-18T12:03.17-0:70")).isEmpty)
-    assert(stringToTimestamp(
-      UTF8String.fromString("2015-03-18T12:03.17-1:0:0")).isEmpty)
+    for (tz <- DateTimeTestUtils.ALL_TIMEZONES) {
+      def checkStringToTimestamp(str: String, expected: Option[Long]): Unit = {
+        assert(stringToTimestamp(UTF8String.fromString(str), tz) === expected)
+      }
 
-    // Truncating the fractional seconds
-    c = Calendar.getInstance(TimeZone.getTimeZone("GMT+00:00"))
-    c.set(2015, 2, 18, 12, 3, 17)
-    c.set(Calendar.MILLISECOND, 0)
-    assert(stringToTimestamp(
-      UTF8String.fromString("2015-03-18T12:03:17.123456789+0:00")).get ===
-        c.getTimeInMillis * 1000 + 123456)
+      var c = Calendar.getInstance(tz)
+      c.set(1969, 11, 31, 16, 0, 0)
+      c.set(Calendar.MILLISECOND, 0)
+      checkStringToTimestamp("1969-12-31 16:00:00", Option(c.getTimeInMillis * 1000))
+      c.set(1, 0, 1, 0, 0, 0)
+      c.set(Calendar.MILLISECOND, 0)
+      checkStringToTimestamp("0001", Option(c.getTimeInMillis * 1000))
+      c = Calendar.getInstance(tz)
+      c.set(2015, 2, 1, 0, 0, 0)
+      c.set(Calendar.MILLISECOND, 0)
+      checkStringToTimestamp("2015-03", Option(c.getTimeInMillis * 1000))
+      c = Calendar.getInstance(tz)
+      c.set(2015, 2, 18, 0, 0, 0)
+      c.set(Calendar.MILLISECOND, 0)
+      checkStringToTimestamp("2015-03-18", Option(c.getTimeInMillis * 1000))
+      checkStringToTimestamp("2015-03-18 ", Option(c.getTimeInMillis * 1000))
+      checkStringToTimestamp("2015-03-18T", Option(c.getTimeInMillis * 1000))
+
+      c = Calendar.getInstance(tz)
+      c.set(2015, 2, 18, 12, 3, 17)
+      c.set(Calendar.MILLISECOND, 0)
+      checkStringToTimestamp("2015-03-18 12:03:17", Option(c.getTimeInMillis * 1000))
+      checkStringToTimestamp("2015-03-18T12:03:17", Option(c.getTimeInMillis * 1000))
+
+      // If the string value includes timezone string, it represents the timestamp string
+      // in the timezone regardless of the tz parameter.
+      c = Calendar.getInstance(TimeZone.getTimeZone("GMT-13:53"))
+      c.set(2015, 2, 18, 12, 3, 17)
+      c.set(Calendar.MILLISECOND, 0)
+      checkStringToTimestamp("2015-03-18T12:03:17-13:53", Option(c.getTimeInMillis * 1000))
+
+      c = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+      c.set(2015, 2, 18, 12, 3, 17)
+      c.set(Calendar.MILLISECOND, 0)
+      checkStringToTimestamp("2015-03-18T12:03:17Z", Option(c.getTimeInMillis * 1000))
+      checkStringToTimestamp("2015-03-18 12:03:17Z", Option(c.getTimeInMillis * 1000))
+
+      c = Calendar.getInstance(TimeZone.getTimeZone("GMT-01:00"))
+      c.set(2015, 2, 18, 12, 3, 17)
+      c.set(Calendar.MILLISECOND, 0)
+      checkStringToTimestamp("2015-03-18T12:03:17-1:0", Option(c.getTimeInMillis * 1000))
+      checkStringToTimestamp("2015-03-18T12:03:17-01:00", Option(c.getTimeInMillis * 1000))
+
+      c = Calendar.getInstance(TimeZone.getTimeZone("GMT+07:30"))
+      c.set(2015, 2, 18, 12, 3, 17)
+      c.set(Calendar.MILLISECOND, 0)
+      checkStringToTimestamp("2015-03-18T12:03:17+07:30", Option(c.getTimeInMillis * 1000))
+
+      c = Calendar.getInstance(TimeZone.getTimeZone("GMT+07:03"))
+      c.set(2015, 2, 18, 12, 3, 17)
+      c.set(Calendar.MILLISECOND, 0)
+      checkStringToTimestamp("2015-03-18T12:03:17+07:03", Option(c.getTimeInMillis * 1000))
+
+      // tests for the string including milliseconds.
+      c = Calendar.getInstance(tz)
+      c.set(2015, 2, 18, 12, 3, 17)
+      c.set(Calendar.MILLISECOND, 123)
+      checkStringToTimestamp("2015-03-18 12:03:17.123", Option(c.getTimeInMillis * 1000))
+      checkStringToTimestamp("2015-03-18T12:03:17.123", Option(c.getTimeInMillis * 1000))
+
+      // If the string value includes timezone string, it represents the timestamp string
+      // in the timezone regardless of the tz parameter.
+      c = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+      c.set(2015, 2, 18, 12, 3, 17)
+      c.set(Calendar.MILLISECOND, 456)
+      checkStringToTimestamp("2015-03-18T12:03:17.456Z", Option(c.getTimeInMillis * 1000))
+      checkStringToTimestamp("2015-03-18 12:03:17.456Z", Option(c.getTimeInMillis * 1000))
+
+      c = Calendar.getInstance(TimeZone.getTimeZone("GMT-01:00"))
+      c.set(2015, 2, 18, 12, 3, 17)
+      c.set(Calendar.MILLISECOND, 123)
+      checkStringToTimestamp("2015-03-18T12:03:17.123-1:0", Option(c.getTimeInMillis * 1000))
+      checkStringToTimestamp("2015-03-18T12:03:17.123-01:00", Option(c.getTimeInMillis * 1000))
+
+      c = Calendar.getInstance(TimeZone.getTimeZone("GMT+07:30"))
+      c.set(2015, 2, 18, 12, 3, 17)
+      c.set(Calendar.MILLISECOND, 123)
+      checkStringToTimestamp("2015-03-18T12:03:17.123+07:30", Option(c.getTimeInMillis * 1000))
+
+      c = Calendar.getInstance(TimeZone.getTimeZone("GMT+07:30"))
+      c.set(2015, 2, 18, 12, 3, 17)
+      c.set(Calendar.MILLISECOND, 123)
+      checkStringToTimestamp("2015-03-18T12:03:17.123+07:30", Option(c.getTimeInMillis * 1000))
+
+      c = Calendar.getInstance(TimeZone.getTimeZone("GMT+07:30"))
+      c.set(2015, 2, 18, 12, 3, 17)
+      c.set(Calendar.MILLISECOND, 123)
+      checkStringToTimestamp(
+        "2015-03-18T12:03:17.123121+7:30", Option(c.getTimeInMillis * 1000 + 121))
+
+      c = Calendar.getInstance(TimeZone.getTimeZone("GMT+07:30"))
+      c.set(2015, 2, 18, 12, 3, 17)
+      c.set(Calendar.MILLISECOND, 123)
+      checkStringToTimestamp(
+        "2015-03-18T12:03:17.12312+7:30", Option(c.getTimeInMillis * 1000 + 120))
+
+      c = Calendar.getInstance(tz)
+      c.set(Calendar.HOUR_OF_DAY, 18)
+      c.set(Calendar.MINUTE, 12)
+      c.set(Calendar.SECOND, 15)
+      c.set(Calendar.MILLISECOND, 0)
+      checkStringToTimestamp("18:12:15", Option(c.getTimeInMillis * 1000))
+
+      c = Calendar.getInstance(TimeZone.getTimeZone("GMT+07:30"))
+      c.set(Calendar.HOUR_OF_DAY, 18)
+      c.set(Calendar.MINUTE, 12)
+      c.set(Calendar.SECOND, 15)
+      c.set(Calendar.MILLISECOND, 123)
+      checkStringToTimestamp("T18:12:15.12312+7:30", Option(c.getTimeInMillis * 1000 + 120))
+
+      c = Calendar.getInstance(TimeZone.getTimeZone("GMT+07:30"))
+      c.set(Calendar.HOUR_OF_DAY, 18)
+      c.set(Calendar.MINUTE, 12)
+      c.set(Calendar.SECOND, 15)
+      c.set(Calendar.MILLISECOND, 123)
+      checkStringToTimestamp("18:12:15.12312+7:30", Option(c.getTimeInMillis * 1000 + 120))
+
+      c = Calendar.getInstance(tz)
+      c.set(2011, 4, 6, 7, 8, 9)
+      c.set(Calendar.MILLISECOND, 100)
+      checkStringToTimestamp("2011-05-06 07:08:09.1000", Option(c.getTimeInMillis * 1000))
+
+      checkStringToTimestamp("238", None)
+      checkStringToTimestamp("00238", None)
+      checkStringToTimestamp("2015-03-18 123142", None)
+      checkStringToTimestamp("2015-03-18T123123", None)
+      checkStringToTimestamp("2015-03-18X", None)
+      checkStringToTimestamp("2015/03/18", None)
+      checkStringToTimestamp("2015.03.18", None)
+      checkStringToTimestamp("20150318", None)
+      checkStringToTimestamp("2015-031-8", None)
+      checkStringToTimestamp("02015-01-18", None)
+      checkStringToTimestamp("015-01-18", None)
+      checkStringToTimestamp("2015-03-18T12:03.17-20:0", None)
+      checkStringToTimestamp("2015-03-18T12:03.17-0:70", None)
+      checkStringToTimestamp("2015-03-18T12:03.17-1:0:0", None)
+
+      // Truncating the fractional seconds
+      c = Calendar.getInstance(TimeZone.getTimeZone("GMT+00:00"))
+      c.set(2015, 2, 18, 12, 3, 17)
+      c.set(Calendar.MILLISECOND, 0)
+      checkStringToTimestamp(
+        "2015-03-18T12:03:17.123456789+0:00", Option(c.getTimeInMillis * 1000 + 123456))
+    }
   }
 
   test("SPARK-15379: special invalid date string") {
@@ -373,27 +350,35 @@ class DateTimeUtilsSuite extends SparkFunSuite {
   }
 
   test("hours") {
-    val c = Calendar.getInstance()
+    val c = Calendar.getInstance(TimeZonePST)
     c.set(2015, 2, 18, 13, 2, 11)
-    assert(getHours(c.getTimeInMillis * 1000) === 13)
+    assert(getHours(c.getTimeInMillis * 1000, TimeZonePST) === 13)
+    assert(getHours(c.getTimeInMillis * 1000, TimeZoneGMT) === 20)
     c.set(2015, 12, 8, 2, 7, 9)
-    assert(getHours(c.getTimeInMillis * 1000) === 2)
+    assert(getHours(c.getTimeInMillis * 1000, TimeZonePST) === 2)
+    assert(getHours(c.getTimeInMillis * 1000, TimeZoneGMT) === 10)
   }
 
   test("minutes") {
-    val c = Calendar.getInstance()
+    val c = Calendar.getInstance(TimeZonePST)
     c.set(2015, 2, 18, 13, 2, 11)
-    assert(getMinutes(c.getTimeInMillis * 1000) === 2)
+    assert(getMinutes(c.getTimeInMillis * 1000, TimeZonePST) === 2)
+    assert(getMinutes(c.getTimeInMillis * 1000, TimeZoneGMT) === 2)
+    assert(getMinutes(c.getTimeInMillis * 1000, TimeZone.getTimeZone("Australia/North")) === 32)
     c.set(2015, 2, 8, 2, 7, 9)
-    assert(getMinutes(c.getTimeInMillis * 1000) === 7)
+    assert(getMinutes(c.getTimeInMillis * 1000, TimeZonePST) === 7)
+    assert(getMinutes(c.getTimeInMillis * 1000, TimeZoneGMT) === 7)
+    assert(getMinutes(c.getTimeInMillis * 1000, TimeZone.getTimeZone("Australia/North")) === 37)
   }
 
   test("seconds") {
-    val c = Calendar.getInstance()
+    val c = Calendar.getInstance(TimeZonePST)
     c.set(2015, 2, 18, 13, 2, 11)
-    assert(getSeconds(c.getTimeInMillis * 1000) === 11)
+    assert(getSeconds(c.getTimeInMillis * 1000, TimeZonePST) === 11)
+    assert(getSeconds(c.getTimeInMillis * 1000, TimeZoneGMT) === 11)
     c.set(2015, 2, 8, 2, 7, 9)
-    assert(getSeconds(c.getTimeInMillis * 1000) === 9)
+    assert(getSeconds(c.getTimeInMillis * 1000, TimeZonePST) === 9)
+    assert(getSeconds(c.getTimeInMillis * 1000, TimeZoneGMT) === 9)
   }
 
   test("hours / minutes / seconds") {
@@ -467,6 +452,21 @@ class DateTimeUtilsSuite extends SparkFunSuite {
     c2.set(Calendar.MILLISECOND, 123)
     val ts2 = c2.getTimeInMillis * 1000L
     assert(timestampAddInterval(ts1, 36, 123000) === ts2)
+
+    val c3 = Calendar.getInstance(TimeZonePST)
+    c3.set(1997, 1, 27, 16, 0, 0)
+    c3.set(Calendar.MILLISECOND, 0)
+    val ts3 = c3.getTimeInMillis * 1000L
+    val c4 = Calendar.getInstance(TimeZonePST)
+    c4.set(2000, 1, 27, 16, 0, 0)
+    c4.set(Calendar.MILLISECOND, 123)
+    val ts4 = c4.getTimeInMillis * 1000L
+    val c5 = Calendar.getInstance(TimeZoneGMT)
+    c5.set(2000, 1, 29, 0, 0, 0)
+    c5.set(Calendar.MILLISECOND, 123)
+    val ts5 = c5.getTimeInMillis * 1000L
+    assert(timestampAddInterval(ts3, 36, 123000, TimeZonePST) === ts4)
+    assert(timestampAddInterval(ts3, 36, 123000, TimeZoneGMT) === ts5)
   }
 
   test("monthsBetween") {
@@ -481,6 +481,17 @@ class DateTimeUtilsSuite extends SparkFunSuite {
     assert(monthsBetween(c1.getTimeInMillis * 1000L, c2.getTimeInMillis * 1000L) === -36)
     c2.set(1996, 2, 31, 0, 0, 0)
     assert(monthsBetween(c1.getTimeInMillis * 1000L, c2.getTimeInMillis * 1000L) === 11)
+
+    val c3 = Calendar.getInstance(TimeZonePST)
+    c3.set(2000, 1, 28, 16, 0, 0)
+    val c4 = Calendar.getInstance(TimeZonePST)
+    c4.set(1997, 1, 28, 16, 0, 0)
+    assert(
+      monthsBetween(c3.getTimeInMillis * 1000L, c4.getTimeInMillis * 1000L, TimeZonePST)
+      === 36.0)
+    assert(
+      monthsBetween(c3.getTimeInMillis * 1000L, c4.getTimeInMillis * 1000L, TimeZoneGMT)
+      === 35.90322581)
   }
 
   test("from UTC timestamp") {
@@ -537,6 +548,21 @@ class DateTimeUtilsSuite extends SparkFunSuite {
   }
 
   test("daysToMillis and millisToDays") {
+    val c = Calendar.getInstance(TimeZonePST)
+
+    c.set(2015, 11, 31, 16, 0, 0)
+    assert(millisToDays(c.getTimeInMillis, TimeZonePST) === 16800)
+    assert(millisToDays(c.getTimeInMillis, TimeZoneGMT) === 16801)
+
+    c.set(2015, 11, 31, 0, 0, 0)
+    c.set(Calendar.MILLISECOND, 0)
+    assert(daysToMillis(16800, TimeZonePST) === c.getTimeInMillis)
+
+    c.setTimeZone(TimeZoneGMT)
+    c.set(2015, 11, 31, 0, 0, 0)
+    c.set(Calendar.MILLISECOND, 0)
+    assert(daysToMillis(16800, TimeZoneGMT) === c.getTimeInMillis)
+
     // There are some days are skipped entirely in some timezone, skip them here.
     val skipped_days = Map[String, Int](
       "Kwajalein" -> 8632,
@@ -547,13 +573,11 @@ class DateTimeUtilsSuite extends SparkFunSuite {
       "Pacific/Kwajalein" -> 8632,
       "MIT" -> 15338)
     for (tz <- DateTimeTestUtils.ALL_TIMEZONES) {
-      DateTimeTestUtils.withDefaultTimeZone(tz) {
-        val skipped = skipped_days.getOrElse(tz.getID, Int.MinValue)
-        (-20000 to 20000).foreach { d =>
-          if (d != skipped) {
-            assert(millisToDays(daysToMillis(d)) === d,
-              s"Round trip of ${d} did not work in tz ${tz}")
-          }
+      val skipped = skipped_days.getOrElse(tz.getID, Int.MinValue)
+      (-20000 to 20000).foreach { d =>
+        if (d != skipped) {
+          assert(millisToDays(daysToMillis(d, tz), tz) === d,
+            s"Round trip of ${d} did not work in tz ${tz}")
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 60182be..3802955 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -174,7 +174,7 @@ class Column(val expr: Expression) extends Logging {
     // NamedExpression under this Cast.
     case c: Cast =>
       c.transformUp {
-        case Cast(ne: NamedExpression, to) => UnresolvedAlias(Cast(ne, to))
+        case c @ Cast(_: NamedExpression, _, _) => UnresolvedAlias(c)
       } match {
         case ne: NamedExpression => ne
         case other => Alias(expr, usePrettyExpression(expr).sql)()

http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 5ee173f..391c34f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.sql
 
 import java.io.CharArrayWriter
+import java.sql.{Date, Timestamp}
+import java.util.TimeZone
 
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
@@ -43,7 +45,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection}
-import org.apache.spark.sql.catalyst.util.usePrettyExpression
+import org.apache.spark.sql.catalyst.util.{usePrettyExpression, DateTimeUtils}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -250,6 +252,8 @@ class Dataset[T] private[sql](
     val hasMoreData = takeResult.length > numRows
     val data = takeResult.take(numRows)
 
+    lazy val timeZone = TimeZone.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)
+
     // For array values, replace Seq and Array with square brackets
     // For cells that are beyond `truncate` characters, replace it with the
     // first `truncate-3` and "..."
@@ -260,6 +264,10 @@ class Dataset[T] private[sql](
           case binary: Array[Byte] => binary.map("%02X".format(_)).mkString("[", " ", "]")
           case array: Array[_] => array.mkString("[", ", ", "]")
           case seq: Seq[_] => seq.mkString("[", ", ", "]")
+          case d: Date =>
+            DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))
+          case ts: Timestamp =>
+            DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(ts), timeZone)
           case _ => cell.toString
         }
         if (truncate > 0 && str.length > truncate) {

http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
index 0384c0f..d5a8566 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
@@ -369,7 +369,7 @@ class SQLBuilder private (
         case ar: AttributeReference if groupByAttrMap.contains(ar) => groupByAttrMap(ar)
         case a @ Cast(BitwiseAnd(
             ShiftRight(ar: AttributeReference, Literal(value: Any, IntegerType)),
-            Literal(1, IntegerType)), ByteType) if ar == gid =>
+            Literal(1, IntegerType)), ByteType, _) if ar == gid =>
           // for converting an expression to its original SQL format grouping(col)
           val idx = groupByExprs.length - 1 - value.asInstanceOf[Int]
           groupByExprs.lift(idx).map(Grouping).getOrElse(a)

http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index 1b7fedc..b8ac070 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.internal.SQLConf
 
@@ -104,7 +105,9 @@ case class OptimizeMetadataOnlyQuery(
             val partAttrs = getPartitionAttrs(relation.catalogTable.partitionColumnNames, relation)
             val partitionData = catalog.listPartitions(relation.catalogTable.identifier).map { p =>
               InternalRow.fromSeq(partAttrs.map { attr =>
-                Cast(Literal(p.spec(attr.name)), attr.dataType).eval()
+                // TODO: use correct timezone for partition values.
+                Cast(Literal(p.spec(attr.name)), attr.dataType,
+                  Option(DateTimeUtils.defaultTimeZone().getID)).eval()
               })
             }
             LocalRelation(partAttrs, partitionData)

http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index dcd9003..9d046c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.sql.execution
 
 import java.nio.charset.StandardCharsets
-import java.sql.Timestamp
+import java.sql.{Date, Timestamp}
+import java.util.TimeZone
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -139,22 +140,6 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
     val primitiveTypes = Seq(StringType, IntegerType, LongType, DoubleType, FloatType,
       BooleanType, ByteType, ShortType, DateType, TimestampType, BinaryType)
 
-    /** Implementation following Hive's TimestampWritable.toString */
-    def formatTimestamp(timestamp: Timestamp): String = {
-      val timestampString = timestamp.toString
-      if (timestampString.length() > 19) {
-        if (timestampString.length() == 21) {
-          if (timestampString.substring(19).compareTo(".0") == 0) {
-            return DateTimeUtils.threadLocalTimestampFormat.get().format(timestamp)
-          }
-        }
-        return DateTimeUtils.threadLocalTimestampFormat.get().format(timestamp) +
-          timestampString.substring(19)
-      }
-
-      return DateTimeUtils.threadLocalTimestampFormat.get().format(timestamp)
-    }
-
     def formatDecimal(d: java.math.BigDecimal): String = {
       if (d.compareTo(java.math.BigDecimal.ZERO) == 0) {
         java.math.BigDecimal.ZERO.toPlainString
@@ -195,8 +180,11 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
             toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
         }.toSeq.sorted.mkString("{", ",", "}")
       case (null, _) => "NULL"
-      case (d: Int, DateType) => new java.util.Date(DateTimeUtils.daysToMillis(d)).toString
-      case (t: Timestamp, TimestampType) => formatTimestamp(t)
+      case (d: Date, DateType) =>
+        DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))
+      case (t: Timestamp, TimestampType) =>
+        DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t),
+          TimeZone.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone))
       case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8)
       case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal)
       case (other, tpe) if primitiveTypes.contains(tpe) => other.toString

http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 16c5193..be13cbc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.{QueryExecution, SQLExecution, UnsafeKVExternalSorter}
 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
 import org.apache.spark.util.{SerializableConfiguration, Utils}
@@ -311,10 +312,11 @@ object FileFormatWriter extends Logging {
     /** Expressions that given a partition key build a string like: col1=val/col2=val/... */
     private def partitionStringExpression: Seq[Expression] = {
       description.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
+        // TODO: use correct timezone for partition values.
         val escaped = ScalaUDF(
           ExternalCatalogUtils.escapePathName _,
           StringType,
-          Seq(Cast(c, StringType)),
+          Seq(Cast(c, StringType, Option(DateTimeUtils.defaultTimeZone().getID))),
           Seq(StringType))
         val str = If(IsNull(c), Literal(ExternalCatalogUtils.DEFAULT_PARTITION_NAME), escaped)
         val partitionName = Literal(c.name + "=") :: str :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index fe9c657..75f87a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -30,6 +30,7 @@ import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{expressions, InternalRow}
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
@@ -135,9 +136,11 @@ abstract class PartitioningAwareFileIndex(
         // we need to cast into the data type that user specified.
         def castPartitionValuesToUserSchema(row: InternalRow) = {
           InternalRow((0 until row.numFields).map { i =>
+            // TODO: use correct timezone for partition values.
             Cast(
               Literal.create(row.getUTF8String(i), StringType),
-              userProvidedSchema.fields(i).dataType).eval()
+              userProvidedSchema.fields(i).dataType,
+              Option(DateTimeUtils.defaultTimeZone().getID)).eval()
           }: _*)
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 6ab6fa6..bd7cec3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -58,7 +58,7 @@ class IncrementalExecution(
    */
   override lazy val optimizedPlan: LogicalPlan = {
     sparkSession.sessionState.optimizer.execute(withCachedData) transformAllExpressions {
-      case ts @ CurrentBatchTimestamp(timestamp, _) =>
+      case ts @ CurrentBatchTimestamp(timestamp, _, _) =>
         logInfo(s"Current batch timestamp = $timestamp")
         ts.toLiteral
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index a35950e..ea37194 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -502,7 +502,7 @@ class StreamExecution(
           ct.dataType)
       case cd: CurrentDate =>
         CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
-          cd.dataType)
+          cd.dataType, cd.timeZoneId)
     }
 
     reportTimeTaken("queryPlanning") {

http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d0c86ff..5ba4192 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.internal
 
-import java.util.{NoSuchElementException, Properties}
+import java.util.{NoSuchElementException, Properties, TimeZone}
 import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters._
@@ -660,6 +660,12 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val SESSION_LOCAL_TIMEZONE =
+    SQLConfigBuilder("spark.sql.session.timeZone")
+      .doc("""The ID of session local timezone, e.g. "GMT", "America/Los_Angeles", etc.""")
+      .stringConf
+      .createWithDefault(TimeZone.getDefault().getID())
+
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
   }
@@ -858,6 +864,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   override def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
 
+  override def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE)
+
   def ndvMaxError: Double = getConf(NDV_MAX_ERROR)
 
   override def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED)

http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index cb7b979..6a190b9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -29,6 +29,7 @@ import org.scalatest.Matchers._
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, OneRowRelation, Project, Union}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.{FilterExec, QueryExecution}
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange}
@@ -869,6 +870,30 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
     assert(testData.select($"*").filter($"key" < 0).showString(1) === expectedAnswer)
   }
 
+  test("SPARK-18350 show with session local timezone") {
+    val d = Date.valueOf("2016-12-01")
+    val ts = Timestamp.valueOf("2016-12-01 00:00:00")
+    val df = Seq((d, ts)).toDF("d", "ts")
+    val expectedAnswer = """+----------+-------------------+
+                           ||d         |ts                 |
+                           |+----------+-------------------+
+                           ||2016-12-01|2016-12-01 00:00:00|
+                           |+----------+-------------------+
+                           |""".stripMargin
+    assert(df.showString(1, truncate = 0) === expectedAnswer)
+
+    withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "GMT") {
+
+      val expectedAnswer = """+----------+-------------------+
+                             ||d         |ts                 |
+                             |+----------+-------------------+
+                             ||2016-12-01|2016-12-01 08:00:00|
+                             |+----------+-------------------+
+                             |""".stripMargin
+      assert(df.showString(1, truncate = 0) === expectedAnswer)
+    }
+  }
+
   test("createDataFrame(RDD[Row], StructType) should convert UDTs (SPARK-6672)") {
     val rowRDD = sparkContext.parallelize(Seq(Row(new ExamplePoint(1.0, 2.0))))
     val schema = StructType(Array(StructField("point", new ExamplePointUDT(), false)))

http://git-wip-us.apache.org/repos/asf/spark/blob/2969fb43/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index d217e9b..f78660f 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -41,6 +41,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
   private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning
   private val originalConvertMetastoreOrc = TestHive.sessionState.convertMetastoreOrc
   private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled
+  private val originalSessionLocalTimeZone = TestHive.conf.sessionLocalTimeZone
 
   def testCases: Seq[(String, File)] = {
     hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f)
@@ -62,6 +63,9 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     TestHive.setConf(HiveUtils.CONVERT_METASTORE_ORC, false)
     // Ensures that cross joins are enabled so that we can test them
     TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, true)
+    // Fix session local timezone to America/Los_Angeles for those timezone sensitive tests
+    // (timestamp_*)
+    TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "America/Los_Angeles")
     RuleExecutor.resetTime()
   }
 
@@ -74,6 +78,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
       TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
       TestHive.setConf(HiveUtils.CONVERT_METASTORE_ORC, originalConvertMetastoreOrc)
       TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, originalCrossJoinEnabled)
+      TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, originalSessionLocalTimeZone)
 
       // For debugging dump some statistics about how much time was spent in various optimizer rules
       logWarning(RuleExecutor.dumpTimeSpent())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message