spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-20588][SQL] Cache TimeZone instances.
Date Mon, 15 May 2017 23:52:26 GMT
Repository: spark
Updated Branches:
  refs/heads/master bbd163d58 -> c8c878a41


[SPARK-20588][SQL] Cache TimeZone instances.

## What changes were proposed in this pull request?

Because the method `TimeZone.getTimeZone(String ID)` is synchronized on the TimeZone class,
concurrent call of this method will become a bottleneck.
This especially happens when casting from string value containing timezone info to timestamp
value, which uses `DateTimeUtils.stringToTimestamp()` and gets TimeZone instance on the site.

This pr makes a cache of the generated TimeZone instances to avoid the synchronization.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #17933 from ueshin/issues/SPARK-20588.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8c878a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8c878a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8c878a4

Branch: refs/heads/master
Commit: c8c878a4166415728f6e940504766a099a2f6744
Parents: bbd163d
Author: Takuya UESHIN <ueshin@databricks.com>
Authored: Mon May 15 16:52:22 2017 -0700
Committer: Xiao Li <gatorsmile@gmail.com>
Committed: Mon May 15 16:52:22 2017 -0700

----------------------------------------------------------------------
 .../catalyst/expressions/datetimeExpressions.scala | 17 ++++++++++-------
 .../spark/sql/catalyst/json/JSONOptions.scala      |  2 +-
 .../sql/catalyst/optimizer/finishAnalysis.scala    |  4 +---
 .../spark/sql/catalyst/util/DateTimeUtils.scala    | 17 ++++++++++++++---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  4 ++--
 .../spark/sql/execution/QueryExecution.scala       |  3 +--
 .../execution/datasources/PartitioningUtils.scala  |  2 +-
 .../sql/execution/datasources/csv/CSVOptions.scala |  2 +-
 .../sql/execution/streaming/ProgressReporter.scala |  5 +++--
 9 files changed, 34 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c8c878a4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index de4c94d..43ca2cf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -43,7 +43,7 @@ trait TimeZoneAwareExpression extends Expression {
   /** Returns a copy of this expression with the specified timeZoneId. */
   def withTimeZone(timeZoneId: String): TimeZoneAwareExpression
 
-  @transient lazy val timeZone: TimeZone = TimeZone.getTimeZone(timeZoneId.get)
+  @transient lazy val timeZone: TimeZone = DateTimeUtils.getTimeZone(timeZoneId.get)
 }
 
 /**
@@ -416,7 +416,7 @@ case class WeekOfYear(child: Expression) extends UnaryExpression with
ImplicitCa
   override def dataType: DataType = IntegerType
 
   @transient private lazy val c = {
-    val c = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+    val c = Calendar.getInstance(DateTimeUtils.getTimeZone("UTC"))
     c.setFirstDayOfWeek(Calendar.MONDAY)
     c.setMinimalDaysInFirstWeek(4)
     c
@@ -431,9 +431,10 @@ case class WeekOfYear(child: Expression) extends UnaryExpression with
ImplicitCa
     nullSafeCodeGen(ctx, ev, time => {
       val cal = classOf[Calendar].getName
       val c = ctx.freshName("cal")
+      val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
       ctx.addMutableState(cal, c,
         s"""
-          $c = $cal.getInstance(java.util.TimeZone.getTimeZone("UTC"));
+          $c = $cal.getInstance($dtu.getTimeZone("UTC"));
           $c.setFirstDayOfWeek($cal.MONDAY);
           $c.setMinimalDaysInFirstWeek(4);
          """)
@@ -954,8 +955,9 @@ case class FromUTCTimestamp(left: Expression, right: Expression)
         val tzTerm = ctx.freshName("tz")
         val utcTerm = ctx.freshName("utc")
         val tzClass = classOf[TimeZone].getName
-        ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""")
-        ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $tzClass.getTimeZone("UTC");""")
+        val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
+        ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $dtu.getTimeZone("$tz");""")
+        ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $dtu.getTimeZone("UTC");""")
         val eval = left.genCode(ctx)
         ev.copy(code = s"""
            |${eval.code}
@@ -1125,8 +1127,9 @@ case class ToUTCTimestamp(left: Expression, right: Expression)
         val tzTerm = ctx.freshName("tz")
         val utcTerm = ctx.freshName("utc")
         val tzClass = classOf[TimeZone].getName
-        ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $tzClass.getTimeZone("$tz");""")
-        ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $tzClass.getTimeZone("UTC");""")
+        val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
+        ctx.addMutableState(tzClass, tzTerm, s"""$tzTerm = $dtu.getTimeZone("$tz");""")
+        ctx.addMutableState(tzClass, utcTerm, s"""$utcTerm = $dtu.getTimeZone("UTC");""")
         val eval = left.genCode(ctx)
         ev.copy(code = s"""
            |${eval.code}

http://git-wip-us.apache.org/repos/asf/spark/blob/c8c878a4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 23ba5ed..7930515 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -70,7 +70,7 @@ private[sql] class JSONOptions(
   val columnNameOfCorruptRecord =
     parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)
 
-  val timeZone: TimeZone = TimeZone.getTimeZone(
+  val timeZone: TimeZone = DateTimeUtils.getTimeZone(
     parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
 
   // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.

http://git-wip-us.apache.org/repos/asf/spark/blob/c8c878a4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
index 89e1dc9..af0837e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import java.util.TimeZone
-
 import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
@@ -55,7 +53,7 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
       case CurrentDate(Some(timeZoneId)) =>
         currentDates.getOrElseUpdate(timeZoneId, {
           Literal.create(
-            DateTimeUtils.millisToDays(timestamp / 1000L, TimeZone.getTimeZone(timeZoneId)),
+            DateTimeUtils.millisToDays(timestamp / 1000L, DateTimeUtils.getTimeZone(timeZoneId)),
             DateType)
         })
       case CurrentTimestamp() => currentTime

http://git-wip-us.apache.org/repos/asf/spark/blob/c8c878a4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 0a3bb51..b9a3d98 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.util
 import java.sql.{Date, Timestamp}
 import java.text.{DateFormat, SimpleDateFormat}
 import java.util.{Calendar, Locale, TimeZone}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.{Function => JFunction}
 import javax.xml.bind.DatatypeConverter
 
 import scala.annotation.tailrec
@@ -98,6 +100,15 @@ object DateTimeUtils {
     sdf
   }
 
+  private val computedTimeZones = new ConcurrentHashMap[String, TimeZone]
+  private val computeTimeZone = new JFunction[String, TimeZone] {
+    override def apply(timeZoneId: String): TimeZone = TimeZone.getTimeZone(timeZoneId)
+  }
+
+  def getTimeZone(timeZoneId: String): TimeZone = {
+    computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
+  }
+
   def newDateFormat(formatString: String, timeZone: TimeZone): DateFormat = {
     val sdf = new SimpleDateFormat(formatString, Locale.US)
     sdf.setTimeZone(timeZone)
@@ -407,7 +418,7 @@ object DateTimeUtils {
       Calendar.getInstance(timeZone)
     } else {
       Calendar.getInstance(
-        TimeZone.getTimeZone(f"GMT${tz.get.toChar}${segments(7)}%02d:${segments(8)}%02d"))
+        getTimeZone(f"GMT${tz.get.toChar}${segments(7)}%02d:${segments(8)}%02d"))
     }
     c.set(Calendar.MILLISECOND, 0)
 
@@ -1027,7 +1038,7 @@ object DateTimeUtils {
    * representation in their timezone.
    */
   def fromUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = {
-    convertTz(time, TimeZoneGMT, TimeZone.getTimeZone(timeZone))
+    convertTz(time, TimeZoneGMT, getTimeZone(timeZone))
   }
 
   /**
@@ -1035,7 +1046,7 @@ object DateTimeUtils {
    * string representation in their timezone.
    */
   def toUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = {
-    convertTz(time, TimeZone.getTimeZone(timeZone), TimeZoneGMT)
+    convertTz(time, getTimeZone(timeZone), TimeZoneGMT)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c8c878a4/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index c75921e..53773f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql
 
 import java.io.CharArrayWriter
 import java.sql.{Date, Timestamp}
-import java.util.TimeZone
 
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
@@ -249,7 +248,8 @@ class Dataset[T] private[sql](
     val hasMoreData = takeResult.length > numRows
     val data = takeResult.take(numRows)
 
-    lazy val timeZone = TimeZone.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)
+    lazy val timeZone =
+      DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)
 
     // For array values, replace Seq and Array with square brackets
     // For cells that are beyond `truncate` characters, replace it with the

http://git-wip-us.apache.org/repos/asf/spark/blob/c8c878a4/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 8e8210e..2e05e5d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution
 
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
-import java.util.TimeZone
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -187,7 +186,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan)
{
         DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))
       case (t: Timestamp, TimestampType) =>
         DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t),
-          TimeZone.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone))
+          DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone))
       case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8)
       case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal)
       case (other, tpe) if primitiveTypes.contains(tpe) => other.toString

http://git-wip-us.apache.org/repos/asf/spark/blob/c8c878a4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 2d70172..f61c673 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -94,7 +94,7 @@ object PartitioningUtils {
       typeInference: Boolean,
       basePaths: Set[Path],
       timeZoneId: String): PartitionSpec = {
-    parsePartitions(paths, typeInference, basePaths, TimeZone.getTimeZone(timeZoneId))
+    parsePartitions(paths, typeInference, basePaths, DateTimeUtils.getTimeZone(timeZoneId))
   }
 
   private[datasources] def parsePartitions(

http://git-wip-us.apache.org/repos/asf/spark/blob/c8c878a4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 62e4c6e..78c16b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -117,7 +117,7 @@ class CSVOptions(
     name.map(CompressionCodecs.getCodecClassName)
   }
 
-  val timeZone: TimeZone = TimeZone.getTimeZone(
+  val timeZone: TimeZone = DateTimeUtils.getTimeZone(
     parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
 
   // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.

http://git-wip-us.apache.org/repos/asf/spark/blob/c8c878a4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 693933f..a4e4ca8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.streaming
 
 import java.text.SimpleDateFormat
-import java.util.{Date, TimeZone, UUID}
+import java.util.{Date, UUID}
 
 import scala.collection.mutable
 import scala.collection.JavaConverters._
@@ -26,6 +26,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
@@ -82,7 +83,7 @@ trait ProgressReporter extends Logging {
   private var lastNoDataProgressEventTime = Long.MinValue
 
   private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
-  timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC"))
+  timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
 
   @volatile
   protected var currentStatus: StreamingQueryStatus = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message