spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-24168][SQL] WindowExec should not access SQLConf at executor side
Date Fri, 04 May 2018 00:27:26 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 8509284e1 -> d35eb2f9b


[SPARK-24168][SQL] WindowExec should not access SQLConf at executor side

## What changes were proposed in this pull request?

This PR is extracted from #21190 , to make it easier to backport.

`WindowExec#createBoundOrdering` is called on executor side, so we can't use `conf.sessionLocalTimezone`
there.

## How was this patch tested?

tested in #21190

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21225 from cloud-fan/minor3.

(cherry picked from commit e646ae67f2e793204bc819ab2b90815214c2bbf3)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d35eb2f9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d35eb2f9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d35eb2f9

Branch: refs/heads/branch-2.3
Commit: d35eb2f9b0af1a625749ca8b7f12d8eceed28766
Parents: 8509284
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Thu May 3 17:27:13 2018 -0700
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Thu May 3 17:27:23 2018 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/window/WindowExec.scala      | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d35eb2f9/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
index 800a2ea..626f39d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -112,9 +112,11 @@ case class WindowExec(
    *
    * @param frame to evaluate. This can either be a Row or Range frame.
    * @param bound with respect to the row.
+   * @param timeZone the session local timezone for time related calculations.
    * @return a bound ordering object.
    */
-  private[this] def createBoundOrdering(frame: FrameType, bound: Expression): BoundOrdering
= {
+  private[this] def createBoundOrdering(
+      frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = {
     (frame, bound) match {
       case (RowFrame, CurrentRow) =>
         RowBoundOrdering(0)
@@ -144,7 +146,7 @@ case class WindowExec(
         val boundExpr = (expr.dataType, boundOffset.dataType) match {
           case (DateType, IntegerType) => DateAdd(expr, boundOffset)
           case (TimestampType, CalendarIntervalType) =>
-            TimeAdd(expr, boundOffset, Some(conf.sessionLocalTimeZone))
+            TimeAdd(expr, boundOffset, Some(timeZone))
           case (a, b) if a== b => Add(expr, boundOffset)
         }
         val bound = newMutableProjection(boundExpr :: Nil, child.output)
@@ -197,6 +199,7 @@ case class WindowExec(
 
     // Map the groups to a (unbound) expression and frame factory pair.
     var numExpressions = 0
+    val timeZone = conf.sessionLocalTimeZone
     framedFunctions.toSeq.map {
       case (key, (expressions, functionSeq)) =>
         val ordinal = numExpressions
@@ -237,7 +240,7 @@ case class WindowExec(
               new UnboundedPrecedingWindowFunctionFrame(
                 target,
                 processor,
-                createBoundOrdering(frameType, upper))
+                createBoundOrdering(frameType, upper, timeZone))
             }
 
           // Shrinking Frame.
@@ -246,7 +249,7 @@ case class WindowExec(
               new UnboundedFollowingWindowFunctionFrame(
                 target,
                 processor,
-                createBoundOrdering(frameType, lower))
+                createBoundOrdering(frameType, lower, timeZone))
             }
 
           // Moving Frame.
@@ -255,8 +258,8 @@ case class WindowExec(
               new SlidingWindowFunctionFrame(
                 target,
                 processor,
-                createBoundOrdering(frameType, lower),
-                createBoundOrdering(frameType, upper))
+                createBoundOrdering(frameType, lower, timeZone),
+                createBoundOrdering(frameType, upper, timeZone))
             }
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message