spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-21608][SPARK-9221][SQL] Window rangeBetween() API should allow literal boundary
Date Wed, 09 Aug 2017 05:23:56 GMT
Repository: spark
Updated Branches:
  refs/heads/master 6edfff055 -> 031910b0e


[SPARK-21608][SPARK-9221][SQL] Window rangeBetween() API should allow literal boundary

## What changes were proposed in this pull request?

Window rangeBetween() API should allow literal boundary, that means, the window range frame
can calculate frame of double/date/timestamp.

Example of the use case can be:
```
SELECT
	val_timestamp,
	cate,
	avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp RANGE BETWEEN CURRENT ROW
AND interval 23 days 4 hours FOLLOWING)
FROM testData
```

This PR refactors the Window `rangeBetween` and `rowsBetween` API, while the legacy user code
should still be valid.

## How was this patch tested?

Add new test cases both in `DataFrameWindowFunctionsSuite` and in `window.sql`.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #18814 from jiangxb1987/literal-boundary.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/031910b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/031910b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/031910b0

Branch: refs/heads/master
Commit: 031910b0ec24526d044fd31c05430dcda42b5be3
Parents: 6edfff0
Author: Xingbo Jiang <xingbo.jiang@databricks.com>
Authored: Wed Aug 9 13:23:49 2017 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Wed Aug 9 13:23:49 2017 +0800

----------------------------------------------------------------------
 .../sql/catalyst/analysis/TypeCoercion.scala    |   9 +-
 .../expressions/windowExpressions.scala         |   8 +-
 .../spark/sql/execution/window/WindowExec.scala |   9 +-
 .../apache/spark/sql/expressions/Window.scala   |  63 ++++++++-
 .../spark/sql/expressions/WindowSpec.scala      |  69 +++++++--
 .../scala/org/apache/spark/sql/functions.scala  |  27 ++++
 .../test/resources/sql-tests/inputs/window.sql  |  22 ++-
 .../resources/sql-tests/results/window.sql.out  | 141 +++++++++++++------
 .../sql/DataFrameWindowFunctionsSuite.scala     |  53 ++++++-
 9 files changed, 335 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/031910b0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index 25af014..06d8350 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -821,9 +821,12 @@ object TypeCoercion {
     }
 
     private def createBoundaryCast(boundary: Expression, dt: DataType): Expression = {
-      boundary match {
-        case e: SpecialFrameBoundary => e
-        case e: Expression if e.dataType != dt && Cast.canCast(e.dataType, dt) =>
Cast(e, dt)
+      (boundary, dt) match {
+        case (e: SpecialFrameBoundary, _) => e
+        case (e, _: DateType) => e
+        case (e, _: TimestampType) => e
+        case (e: Expression, t) if e.dataType != t && Cast.canCast(e.dataType, t)
=>
+          Cast(e, t)
         case _ => boundary
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/031910b0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index a829dcc..e11e3a1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -89,7 +89,11 @@ case class WindowSpecDefinition(
     elements.mkString("(", " ", ")")
   }
 
-  private def isValidFrameType(ft: DataType): Boolean = orderSpec.head.dataType == ft
+  private def isValidFrameType(ft: DataType): Boolean = (orderSpec.head.dataType, ft) match
{
+    case (DateType, IntegerType) => true
+    case (TimestampType, CalendarIntervalType) => true
+    case (a, b) => a == b
+  }
 }
 
 /**
@@ -129,7 +133,7 @@ case object RowFrame extends FrameType {
  * of the current row.
  */
 case object RangeFrame extends FrameType {
-  override def inputType: AbstractDataType = NumericType
+  override def inputType: AbstractDataType = TypeCollection.NumericAndInterval
   override def sql: String = "RANGE"
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/031910b0/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
index 0766e37..f8bb667 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -25,7 +25,9 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, TimestampType}
 
 /**
  * This class calculates and outputs (windowed) aggregates over the rows in a single (sorted)
@@ -139,7 +141,12 @@ case class WindowExec(
         }
 
         // Create the projection which returns the current 'value' modified by adding the
offset.
-        val boundExpr = Add(expr, Cast(boundOffset, expr.dataType))
+        val boundExpr = (expr.dataType, boundOffset.dataType) match {
+          case (DateType, IntegerType) => DateAdd(expr, boundOffset)
+          case (TimestampType, CalendarIntervalType) =>
+            TimeAdd(expr, boundOffset, Some(conf.sessionLocalTimeZone))
+          case (a, b) if a== b => Add(expr, boundOffset)
+        }
         val bound = newMutableProjection(boundExpr :: Nil, child.output)
 
         // Construct the ordering. This is used to compare the result of current value projection

http://git-wip-us.apache.org/repos/asf/spark/blob/031910b0/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
index cd79128..1caa243 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Window.scala
@@ -75,7 +75,7 @@ object Window {
   }
 
   /**
-   * Value representing the last row in the partition, equivalent to "UNBOUNDED PRECEDING"
in SQL.
+   * Value representing the first row in the partition, equivalent to "UNBOUNDED PRECEDING"
in SQL.
    * This can be used to specify the frame boundaries:
    *
    * {{{
@@ -167,17 +167,17 @@ object Window {
    * current row.
    *
    * We recommend users use `Window.unboundedPreceding`, `Window.unboundedFollowing`,
-   * and `Window.currentRow` to specify special boundary values, rather than using integral
-   * values directly.
+   * and `Window.currentRow` to specify special boundary values, rather than using long values
+   * directly.
    *
    * A range-based boundary is based on the actual value of the ORDER BY
    * expression(s). An offset is used to alter the value of the ORDER BY expression, for
    * instance if the current order by expression has a value of 10 and the lower bound offset
    * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however
puts a
    * number of constraints on the ORDER BY expressions: there can be only one expression
and this
-   * expression must have a numerical data type. An exception can be made when the offset
is 0,
-   * because no value modification is needed, in this case multiple and non-numeric ORDER
BY
-   * expression are allowed.
+   * expression must have a numerical data type. An exception can be made when the offset
is
+   * unbounded, because no value modification is needed, in this case multiple and non-numeric
+   * ORDER BY expression are allowed.
    *
    * {{{
    *   import org.apache.spark.sql.expressions.Window
@@ -210,6 +210,57 @@ object Window {
     spec.rangeBetween(start, end)
   }
 
+  /**
+   * Creates a [[WindowSpec]] with the frame boundaries defined,
+   * from `start` (inclusive) to `end` (inclusive).
+   *
+   * Both `start` and `end` are relative to the current row. For example, "lit(0)" means
+   * "current row", while "lit(-1)" means one off before the current row, and "lit(5)" means
the
+   * five off after the current row.
+   *
+   * Users should use `unboundedPreceding()`, `unboundedFollowing()`, and `currentRow()`
from
+   * [[org.apache.spark.sql.functions]] to specify special boundary values, literals are
not
+   * transformed to [[org.apache.spark.sql.catalyst.expressions.SpecialFrameBoundary]]s.
+   *
+   * A range-based boundary is based on the actual value of the ORDER BY
+   * expression(s). An offset is used to alter the value of the ORDER BY expression, for
+   * instance if the current order by expression has a value of 10 and the lower bound offset
+   * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however
puts a
+   * number of constraints on the ORDER BY expressions: there can be only one expression
and this
+   * expression must have a numerical/date/timestamp data type. An exception can be made
when the
+   * offset is unbounded, because no value modification is needed, in this case multiple
and
+   * non-numerical/date/timestamp data type ORDER BY expression are allowed.
+   *
+   * {{{
+   *   import org.apache.spark.sql.expressions.Window
+   *   val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"))
+   *     .toDF("id", "category")
+   *   val byCategoryOrderedById =
+   *     Window.partitionBy('category).orderBy('id).rangeBetween(currentRow(), lit(1))
+   *   df.withColumn("sum", sum('id) over byCategoryOrderedById).show()
+   *
+   *   +---+--------+---+
+   *   | id|category|sum|
+   *   +---+--------+---+
+   *   |  1|       b|  3|
+   *   |  2|       b|  5|
+   *   |  3|       b|  3|
+   *   |  1|       a|  4|
+   *   |  1|       a|  4|
+   *   |  2|       a|  2|
+   *   +---+--------+---+
+   * }}}
+   *
+   * @param start boundary start, inclusive. The frame is unbounded if the expression is
+   *              [[org.apache.spark.sql.catalyst.expressions.UnboundedPreceding]].
+   * @param end boundary end, inclusive. The frame is unbounded if the expression is
+   *            [[org.apache.spark.sql.catalyst.expressions.UnboundedFollowing]].
+   * @since 2.3.0
+   */
+  def rangeBetween(start: Column, end: Column): WindowSpec = {
+    spec.rangeBetween(start, end)
+  }
+
   private[sql] def spec: WindowSpec = {
     new WindowSpec(Seq.empty, Seq.empty, UnspecifiedFrame)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/031910b0/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
index f8b404d..4c41aa3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
@@ -146,22 +146,22 @@ class WindowSpec private[sql](
   /**
    * Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
    *
-   * Both `start` and `end` are relative from the current row. For example, "0" means "current
row",
-   * while "-1" means one off before the current row, and "5" means the five off after the
-   * current row.
+   * Both `start` and `end` are relative from the current row. For example, "0" means
+   * "current row", while "-1" means one off before the current row, and "5" means the five
off
+   * after the current row.
    *
    * We recommend users use `Window.unboundedPreceding`, `Window.unboundedFollowing`,
-   * and `Window.currentRow` to specify special boundary values, rather than using integral
-   * values directly.
+   * and `Window.currentRow` to specify special boundary values, rather than using long values
+   * directly.
    *
    * A range-based boundary is based on the actual value of the ORDER BY
    * expression(s). An offset is used to alter the value of the ORDER BY expression, for
    * instance if the current order by expression has a value of 10 and the lower bound offset
    * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however
puts a
    * number of constraints on the ORDER BY expressions: there can be only one expression
and this
-   * expression must have a numerical data type. An exception can be made when the offset
is 0,
-   * because no value modification is needed, in this case multiple and non-numeric ORDER
BY
-   * expression are allowed.
+   * expression must have a numerical data type. An exception can be made when the offset
is
+   * unbounded, because no value modification is needed, in this case multiple and non-numeric
+   * ORDER BY expression are allowed.
    *
    * {{{
    *   import org.apache.spark.sql.expressions.Window
@@ -210,6 +210,59 @@ class WindowSpec private[sql](
   }
 
   /**
+   * Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
+   *
+   * Both `start` and `end` are relative to the current row. For example, "lit(0)" means
+   * "current row", while "lit(-1)" means one off before the current row, and "lit(5)" means
the
+   * five off after the current row.
+   *
+   * Users should use `unboundedPreceding()`, `unboundedFollowing()`, and `currentRow()`
from
+   * [[org.apache.spark.sql.functions]] to specify special boundary values, literals are
not
+   * transformed to [[org.apache.spark.sql.catalyst.expressions.SpecialFrameBoundary]]s.
+   *
+   * A range-based boundary is based on the actual value of the ORDER BY
+   * expression(s). An offset is used to alter the value of the ORDER BY expression, for
+   * instance if the current order by expression has a value of 10 and the lower bound offset
+   * is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however
puts a
+   * number of constraints on the ORDER BY expressions: there can be only one expression
and this
+   * expression must have a numerical/date/timestamp data type. An exception can be made
when the
+   * offset is unbounded, because no value modification is needed, in this case multiple
and
+   * non-numerical/date/timestamp data type ORDER BY expression are allowed.
+   *
+   * {{{
+   *   import org.apache.spark.sql.expressions.Window
+   *   val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"))
+   *     .toDF("id", "category")
+   *   val byCategoryOrderedById =
+   *     Window.partitionBy('category).orderBy('id).rangeBetween(currentRow(), lit(1))
+   *   df.withColumn("sum", sum('id) over byCategoryOrderedById).show()
+   *
+   *   +---+--------+---+
+   *   | id|category|sum|
+   *   +---+--------+---+
+   *   |  1|       b|  3|
+   *   |  2|       b|  5|
+   *   |  3|       b|  3|
+   *   |  1|       a|  4|
+   *   |  1|       a|  4|
+   *   |  2|       a|  2|
+   *   +---+--------+---+
+   * }}}
+   *
+   * @param start boundary start, inclusive. The frame is unbounded if the expression is
+   *              [[org.apache.spark.sql.catalyst.expressions.UnboundedPreceding]].
+   * @param end boundary end, inclusive. The frame is unbounded if the expression is
+   *            [[org.apache.spark.sql.catalyst.expressions.UnboundedFollowing]].
+   * @since 2.3.0
+   */
+  def rangeBetween(start: Column, end: Column): WindowSpec = {
+    new WindowSpec(
+      partitionSpec,
+      orderSpec,
+      SpecifiedWindowFrame(RangeFrame, start.expr, end.expr))
+  }
+
+  /**
    * Converts this [[WindowSpec]] into a [[Column]] with an aggregate expression.
    */
   private[sql] def withAggregate(aggregate: Column): Column = {

http://git-wip-us.apache.org/repos/asf/spark/blob/031910b0/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 496619a..14ab8a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, ResolvedHint}
 import org.apache.spark.sql.execution.SparkSqlParser
 import org.apache.spark.sql.expressions.UserDefinedFunction
+import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -777,6 +778,32 @@ object functions {
   //////////////////////////////////////////////////////////////////////////////////////////////
   // Window functions
   //////////////////////////////////////////////////////////////////////////////////////////////
+  /**
+   * Window function: returns the special frame boundary that represents the first row in
the
+   * window partition.
+   *
+   * @group window_funcs
+   * @since 2.3.0
+   */
+  def unboundedPreceding(): Column = Column(UnboundedPreceding)
+
+  /**
+   * Window function: returns the special frame boundary that represents the last row in
the
+   * window partition.
+   *
+   * @group window_funcs
+   * @since 2.3.0
+   */
+  def unboundedFollowing(): Column = Column(UnboundedFollowing)
+
+  /**
+   * Window function: returns the special frame boundary that represents the current row
in the
+   * window partition.
+   *
+   * @group window_funcs
+   * @since 2.3.0
+   */
+  def currentRow(): Column = Column(CurrentRow)
 
   /**
    * Window function: returns the cumulative distribution of values within a window partition,

http://git-wip-us.apache.org/repos/asf/spark/blob/031910b0/sql/core/src/test/resources/sql-tests/inputs/window.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/window.sql b/sql/core/src/test/resources/sql-tests/inputs/window.sql
index 342e571..c4bea34 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/window.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/window.sql
@@ -1,8 +1,15 @@
 -- Test data.
 CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES
-(null, 1L, "a"), (1, 1L, "a"), (1, 2L, "a"), (2, 2147483650L, "a"), (1, null, "b"), (2, 3L,
"b"),
-(3, 2147483650L, "b"), (null, null, null), (3, 1L, null)
-AS testData(val, val_long, cate);
+(null, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
+(1, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
+(1, 2L, 2.5D, date("2017-08-02"), timestamp(1502000000), "a"),
+(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "a"),
+(1, null, 1.0D, date("2017-08-01"), timestamp(1501545600), "b"),
+(2, 3L, 3.3D, date("2017-08-03"), timestamp(1503000000), "b"),
+(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "b"),
+(null, null, null, null, null, null),
+(3, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), null)
+AS testData(val, val_long, val_double, val_date, val_timestamp, cate);
 
 -- RowsBetween
 SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val ROWS CURRENT ROW) FROM testData
@@ -19,6 +26,13 @@ SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
 SELECT val_long, cate, sum(val_long) OVER(PARTITION BY cate ORDER BY val_long
 RANGE BETWEEN CURRENT ROW AND 2147483648 FOLLOWING) FROM testData ORDER BY cate, val_long;
+SELECT val_double, cate, sum(val_double) OVER(PARTITION BY cate ORDER BY val_double
+RANGE BETWEEN CURRENT ROW AND 2.5 FOLLOWING) FROM testData ORDER BY cate, val_double;
+SELECT val_date, cate, max(val_date) OVER(PARTITION BY cate ORDER BY val_date
+RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING) FROM testData ORDER BY cate, val_date;
+SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp
+RANGE BETWEEN CURRENT ROW AND interval 23 days 4 hours FOLLOWING) FROM testData
+ORDER BY cate, val_timestamp;
 
 -- RangeBetween with reverse OrderBy
 SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val DESC
@@ -31,7 +45,7 @@ SELECT val, cate, count(val) OVER(PARTITION BY cate
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
 SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val, cate
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_date
+SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_timestamp
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
 SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
 RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM testData ORDER BY cate, val;

http://git-wip-us.apache.org/repos/asf/spark/blob/031910b0/sql/core/src/test/resources/sql-tests/results/window.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/window.sql.out b/sql/core/src/test/resources/sql-tests/results/window.sql.out
index 9751106..73ad27e 100644
--- a/sql/core/src/test/resources/sql-tests/results/window.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/window.sql.out
@@ -1,12 +1,19 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 19
+-- Number of queries: 22
 
 
 -- !query 0
 CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES
-(null, 1L, "a"), (1, 1L, "a"), (1, 2L, "a"), (2, 2147483650L, "a"), (1, null, "b"), (2, 3L,
"b"),
-(3, 2147483650L, "b"), (null, null, null), (3, 1L, null)
-AS testData(val, val_long, cate)
+(null, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
+(1, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), "a"),
+(1, 2L, 2.5D, date("2017-08-02"), timestamp(1502000000), "a"),
+(2, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "a"),
+(1, null, 1.0D, date("2017-08-01"), timestamp(1501545600), "b"),
+(2, 3L, 3.3D, date("2017-08-03"), timestamp(1503000000), "b"),
+(3, 2147483650L, 100.001D, date("2020-12-31"), timestamp(1609372800), "b"),
+(null, null, null, null, null, null),
+(3, 1L, 1.0D, date("2017-08-01"), timestamp(1501545600), null)
+AS testData(val, val_long, val_double, val_date, val_timestamp, cate)
 -- !query 0 schema
 struct<>
 -- !query 0 output
@@ -109,11 +116,63 @@ NULL	b	NULL
 
 
 -- !query 7
+SELECT val_double, cate, sum(val_double) OVER(PARTITION BY cate ORDER BY val_double
+RANGE BETWEEN CURRENT ROW AND 2.5 FOLLOWING) FROM testData ORDER BY cate, val_double
+-- !query 7 schema
+struct<val_double:double,cate:string,sum(val_double) OVER (PARTITION BY cate ORDER BY
val_double ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND CAST(2.5 AS DOUBLE) FOLLOWING):double>
+-- !query 7 output
+NULL	NULL	NULL
+1.0	NULL	1.0
+1.0	a	4.5
+1.0	a	4.5
+2.5	a	2.5
+100.001	a	100.001
+1.0	b	4.3
+3.3	b	3.3
+100.001	b	100.001
+
+
+-- !query 8
+SELECT val_date, cate, max(val_date) OVER(PARTITION BY cate ORDER BY val_date
+RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING) FROM testData ORDER BY cate, val_date
+-- !query 8 schema
+struct<val_date:date,cate:string,max(val_date) OVER (PARTITION BY cate ORDER BY val_date
ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 2 FOLLOWING):date>
+-- !query 8 output
+NULL	NULL	NULL
+2017-08-01	NULL	2017-08-01
+2017-08-01	a	2017-08-02
+2017-08-01	a	2017-08-02
+2017-08-02	a	2017-08-02
+2020-12-31	a	2020-12-31
+2017-08-01	b	2017-08-03
+2017-08-03	b	2017-08-03
+2020-12-31	b	2020-12-31
+
+
+-- !query 9
+SELECT val_timestamp, cate, avg(val_timestamp) OVER(PARTITION BY cate ORDER BY val_timestamp
+RANGE BETWEEN CURRENT ROW AND interval 23 days 4 hours FOLLOWING) FROM testData
+ORDER BY cate, val_timestamp
+-- !query 9 schema
+struct<val_timestamp:timestamp,cate:string,avg(CAST(val_timestamp AS DOUBLE)) OVER (PARTITION
BY cate ORDER BY val_timestamp ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND interval 3 weeks
2 days 4 hours FOLLOWING):double>
+-- !query 9 output
+NULL	NULL	NULL
+2017-07-31 17:00:00	NULL	1.5015456E9
+2017-07-31 17:00:00	a	1.5016970666666667E9
+2017-07-31 17:00:00	a	1.5016970666666667E9
+2017-08-05 23:13:20	a	1.502E9
+2020-12-30 16:00:00	a	1.6093728E9
+2017-07-31 17:00:00	b	1.5022728E9
+2017-08-17 13:00:00	b	1.503E9
+2020-12-30 16:00:00	b	1.6093728E9
+
+
+-- !query 10
 SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val DESC
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 7 schema
+-- !query 10 schema
 struct<val:int,cate:string,sum(val) OVER (PARTITION BY cate ORDER BY val DESC NULLS LAST
RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING):bigint>
--- !query 7 output
+-- !query 10 output
 NULL	NULL	NULL
 3	NULL	3
 NULL	a	NULL
@@ -125,62 +184,62 @@ NULL	a	NULL
 3	b	5
 
 
--- !query 8
+-- !query 11
 SELECT val, cate, count(val) OVER(PARTITION BY cate
 ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 8 schema
+-- !query 11 schema
 struct<>
--- !query 8 output
+-- !query 11 output
 org.apache.spark.sql.AnalysisException
 cannot resolve 'ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING' due to data type mismatch:
Window frame upper bound '1' does not followes the lower bound 'unboundedfollowing$()'.; line
1 pos 33
 
 
--- !query 9
+-- !query 12
 SELECT val, cate, count(val) OVER(PARTITION BY cate
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 9 schema
+-- !query 12 schema
 struct<>
--- !query 9 output
+-- !query 12 output
 org.apache.spark.sql.AnalysisException
 cannot resolve '(PARTITION BY testdata.`cate` RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)'
due to data type mismatch: A range window frame cannot be used in an unordered window specification.;
line 1 pos 33
 
 
--- !query 10
+-- !query 13
 SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val, cate
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 10 schema
+-- !query 13 schema
 struct<>
--- !query 10 output
+-- !query 13 output
 org.apache.spark.sql.AnalysisException
 cannot resolve '(PARTITION BY testdata.`cate` ORDER BY testdata.`val` ASC NULLS FIRST, testdata.`cate`
ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range
window frame with value boundaries cannot be used in a window specification with multiple
order by expressions: val#x ASC NULLS FIRST,cate#x ASC NULLS FIRST; line 1 pos 33
 
 
--- !query 11
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_date
+-- !query 14
+SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_timestamp
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 11 schema
+-- !query 14 schema
 struct<>
--- !query 11 output
+-- !query 14 output
 org.apache.spark.sql.AnalysisException
-cannot resolve '(PARTITION BY testdata.`cate` ORDER BY current_date() ASC NULLS FIRST RANGE
BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: The data type 'DateType'
used in the order specification does not match the data type 'IntegerType' which is used in
the range frame.; line 1 pos 33
+cannot resolve '(PARTITION BY testdata.`cate` ORDER BY current_timestamp() ASC NULLS FIRST
RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: The data type 'TimestampType'
used in the order specification does not match the data type 'IntegerType' which is used in
the range frame.; line 1 pos 33
 
 
--- !query 12
+-- !query 15
 SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
 RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM testData ORDER BY cate, val
--- !query 12 schema
+-- !query 15 schema
 struct<>
--- !query 12 output
+-- !query 15 output
 org.apache.spark.sql.AnalysisException
 cannot resolve 'RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING' due to data type mismatch: The
lower bound of a window frame must be less than or equal to the upper bound; line 1 pos 33
 
 
--- !query 13
+-- !query 16
 SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
 RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY cate, val
--- !query 13 schema
+-- !query 16 schema
 struct<>
--- !query 13 output
+-- !query 16 output
 org.apache.spark.sql.catalyst.parser.ParseException
 
 Frame bound value must be a literal.(line 2, pos 30)
@@ -191,7 +250,7 @@ RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER
BY cat
 ------------------------------^^^
 
 
--- !query 14
+-- !query 17
 SELECT val, cate,
 max(val) OVER w AS max,
 min(val) OVER w AS min,
@@ -218,9 +277,9 @@ approx_count_distinct(val) OVER w AS approx_count_distinct
 FROM testData
 WINDOW w AS (PARTITION BY cate ORDER BY val)
 ORDER BY cate, val
--- !query 14 schema
+-- !query 17 schema
 struct<val:int,cate:string,max:int,min:int,min:int,count:bigint,sum:bigint,avg:double,stddev:double,first_value:int,first_value_ignore_null:int,first_value_contain_null:int,last_value:int,last_value_ignore_null:int,last_value_contain_null:int,rank:int,dense_rank:int,cume_dist:double,percent_rank:double,ntile:int,row_number:int,var_pop:double,var_samp:double,approx_count_distinct:bigint>
--- !query 14 output
+-- !query 17 output
 NULL	NULL	NULL	NULL	NULL	0	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	1	1	0.5	0.0	1	1	NULL
NULL	0
 3	NULL	3	3	3	1	3	3.0	NaN	NULL	3	NULL	3	3	3	2	2	1.0	1.0	2	2	0.0	NaN	1
 NULL	a	NULL	NULL	NULL	0	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	1	1	0.25	0.0	1	1	NULL
NULL	0
@@ -232,11 +291,11 @@ NULL	a	NULL	NULL	NULL	0	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
1	1	0.25	0.
 3	b	3	1	1	3	6	2.0	1.0	1	1	1	3	3	3	3	3	1.0	1.0	2	3	0.6666666666666666	1.0	3
 
 
--- !query 15
+-- !query 18
 SELECT val, cate, avg(null) OVER(PARTITION BY cate ORDER BY val) FROM testData ORDER BY cate,
val
--- !query 15 schema
+-- !query 18 schema
 struct<val:int,cate:string,avg(CAST(NULL AS DOUBLE)) OVER (PARTITION BY cate ORDER BY
val ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):double>
--- !query 15 output
+-- !query 18 output
 NULL	NULL	NULL
 3	NULL	NULL
 NULL	a	NULL
@@ -248,20 +307,20 @@ NULL	a	NULL
 3	b	NULL
 
 
--- !query 16
+-- !query 19
 SELECT val, cate, row_number() OVER(PARTITION BY cate) FROM testData ORDER BY cate, val
--- !query 16 schema
+-- !query 19 schema
 struct<>
--- !query 16 output
+-- !query 19 output
 org.apache.spark.sql.AnalysisException
 Window function row_number() requires window to be ordered, please add ORDER BY clause. For
example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering)
from table;
 
 
--- !query 17
+-- !query 20
 SELECT val, cate, sum(val) OVER(), avg(val) OVER() FROM testData ORDER BY cate, val
--- !query 17 schema
+-- !query 20 schema
 struct<val:int,cate:string,sum(CAST(val AS BIGINT)) OVER (ROWS BETWEEN UNBOUNDED PRECEDING
AND UNBOUNDED FOLLOWING):bigint,avg(CAST(val AS BIGINT)) OVER (ROWS BETWEEN UNBOUNDED PRECEDING
AND UNBOUNDED FOLLOWING):double>
--- !query 17 output
+-- !query 20 output
 NULL	NULL	13	1.8571428571428572
 3	NULL	13	1.8571428571428572
 NULL	a	13	1.8571428571428572
@@ -273,7 +332,7 @@ NULL	a	13	1.8571428571428572
 3	b	13	1.8571428571428572
 
 
--- !query 18
+-- !query 21
 SELECT val, cate,
 first_value(false) OVER w AS first_value,
 first_value(true, true) OVER w AS first_value_ignore_null,
@@ -284,9 +343,9 @@ last_value(false, false) OVER w AS last_value_contain_null
 FROM testData
 WINDOW w AS ()
 ORDER BY cate, val
--- !query 18 schema
+-- !query 21 schema
 struct<val:int,cate:string,first_value:boolean,first_value_ignore_null:boolean,first_value_contain_null:boolean,last_value:boolean,last_value_ignore_null:boolean,last_value_contain_null:boolean>
--- !query 18 output
+-- !query 21 output
 NULL	NULL	false	true	false	false	true	false
 3	NULL	false	true	false	false	true	false
 NULL	a	false	true	false	false	true	false

http://git-wip-us.apache.org/repos/asf/spark/blob/031910b0/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 9806e57..ea725af 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -17,11 +17,14 @@
 
 package org.apache.spark.sql
 
+import java.sql.{Date, Timestamp}
+
 import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction,
Window}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
 
 /**
  * Window function testing for DataFrame API.
@@ -172,7 +175,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext
{
     assert(e.message.contains("Boundary end is not a valid integer: 2147483648"))
   }
 
-  test("range between should accept integer/long values as boundary") {
+  test("range between should accept int/long values as boundary") {
     val df = Seq((1L, "1"), (1L, "1"), (2147483650L, "1"),
       (3L, "2"), (2L, "1"), (2147483650L, "2"))
       .toDF("key", "value")
@@ -191,6 +194,54 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext
{
           Window.partitionBy($"value").orderBy($"key").rangeBetween(-2147483649L, 0))),
       Seq(Row(1, 2), Row(1, 2), Row(2, 3), Row(2147483650L, 2), Row(2147483650L, 4), Row(3,
1))
     )
+
+    def dt(date: String): Date = Date.valueOf(date)
+
+    val df2 = Seq((dt("2017-08-01"), "1"), (dt("2017-08-01"), "1"), (dt("2020-12-31"), "1"),
+      (dt("2017-08-03"), "2"), (dt("2017-08-02"), "1"), (dt("2020-12-31"), "2"))
+      .toDF("key", "value")
+    checkAnswer(
+      df2.select(
+        $"key",
+        count("key").over(
+          Window.partitionBy($"value").orderBy($"key").rangeBetween(lit(0), lit(2)))),
+      Seq(Row(dt("2017-08-01"), 3), Row(dt("2017-08-01"), 3), Row(dt("2020-12-31"), 1),
+        Row(dt("2017-08-03"), 1), Row(dt("2017-08-02"), 1), Row(dt("2020-12-31"), 1))
+    )
+  }
+
+  test("range between should accept double values as boundary") {
+    val df = Seq((1.0D, "1"), (1.0D, "1"), (100.001D, "1"),
+      (3.3D, "2"), (2.02D, "1"), (100.001D, "2"))
+      .toDF("key", "value")
+    df.createOrReplaceTempView("window_table")
+    checkAnswer(
+      df.select(
+        $"key",
+        count("key").over(
+          Window.partitionBy($"value").orderBy($"key")
+            .rangeBetween(currentRow, lit(2.5D)))),
+      Seq(Row(1.0, 3), Row(1.0, 3), Row(100.001, 1), Row(3.3, 1), Row(2.02, 1), Row(100.001,
1))
+    )
+  }
+
+  test("range between should accept interval values as boundary") {
+    def ts(timestamp: Long): Timestamp = new Timestamp(timestamp * 1000)
+
+    val df = Seq((ts(1501545600), "1"), (ts(1501545600), "1"), (ts(1609372800), "1"),
+      (ts(1503000000), "2"), (ts(1502000000), "1"), (ts(1609372800), "2"))
+      .toDF("key", "value")
+    df.createOrReplaceTempView("window_table")
+    checkAnswer(
+      df.select(
+        $"key",
+        count("key").over(
+          Window.partitionBy($"value").orderBy($"key")
+            .rangeBetween(currentRow,
+              lit(CalendarInterval.fromString("interval 23 days 4 hours"))))),
+      Seq(Row(ts(1501545600), 3), Row(ts(1501545600), 3), Row(ts(1609372800), 1),
+        Row(ts(1503000000), 1), Row(ts(1502000000), 1), Row(ts(1609372800), 1))
+    )
   }
 
   test("aggregation and rows between with unbounded") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message