spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-24369][SQL] Correct handling for multiple distinct aggregations having the same argument set
Date Mon, 04 Jun 2018 04:58:04 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 18194544f -> 36f1d5e17


[SPARK-24369][SQL] Correct handling for multiple distinct aggregations having the same argument
set

## What changes were proposed in this pull request?

bring back https://github.com/apache/spark/pull/21443

This is a different approach: just change the check to count distinct columns with `toSet`

## How was this patch tested?

a new test to verify the planner behavior.

Author: Wenchen Fan <wenchen@databricks.com>
Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21487 from cloud-fan/back.

(cherry picked from commit 416cd1fd96c0db9194e32ba877b1396b6dc13c8e)
Signed-off-by: Xiao Li <gatorsmile@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36f1d5e1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36f1d5e1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36f1d5e1

Branch: refs/heads/branch-2.3
Commit: 36f1d5e17a79bc343d6ea88af2d1fed1f02c132f
Parents: 1819454
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Sun Jun 3 21:57:42 2018 -0700
Committer: Xiao Li <gatorsmile@gmail.com>
Committed: Sun Jun 3 21:57:58 2018 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/SparkStrategies.scala   |  4 ++--
 .../resources/sql-tests/inputs/group-by.sql     |  6 +++++-
 .../sql-tests/results/group-by.sql.out          | 11 +++++++++-
 .../spark/sql/execution/PlannerSuite.scala      | 21 ++++++++++++++++++++
 4 files changed, 38 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/36f1d5e1/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 25436e1..c6565fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -372,9 +372,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
         val (functionsWithDistinct, functionsWithoutDistinct) =
           aggregateExpressions.partition(_.isDistinct)
-        if (functionsWithDistinct.map(_.aggregateFunction.children).distinct.length >
1) {
+        if (functionsWithDistinct.map(_.aggregateFunction.children.toSet).distinct.length
> 1) {
           // This is a sanity check. We should not reach here when we have multiple distinct
-          // column sets. Our MultipleDistinctRewriter should take care this case.
+          // column sets. Our `RewriteDistinctAggregates` should take care this case.
           sys.error("You hit a query analyzer bug. Please report your query to " +
               "Spark user mailing list.")
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/36f1d5e1/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
index c5070b7..2c18d6a 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
@@ -68,4 +68,8 @@ SELECT 1 from (
   FROM (select 1 as x) a
   WHERE false
 ) b
-where b.z != b.z
+where b.z != b.z;
+
+-- SPARK-24369 multiple distinct aggregations having the same argument set
+SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*)
+  FROM (VALUES (1, 1), (2, 2), (2, 2)) t(x, y);

http://git-wip-us.apache.org/repos/asf/spark/blob/36f1d5e1/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
index c1abc6d..581aa17 100644
--- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 26
+-- Number of queries: 27
 
 
 -- !query 0
@@ -241,3 +241,12 @@ where b.z != b.z
 struct<1:int>
 -- !query 25 output
 
+
+
+-- !query 26
+SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*)
+  FROM (VALUES (1, 1), (2, 2), (2, 2)) t(x, y)
+-- !query 26 schema
+struct<corr(DISTINCT CAST(x AS DOUBLE), CAST(y AS DOUBLE)):double,corr(DISTINCT CAST(y
AS DOUBLE), CAST(x AS DOUBLE)):double,count(1):bigint>
+-- !query 26 output
+1.0	1.0	3

http://git-wip-us.apache.org/repos/asf/spark/blob/36f1d5e1/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index f8b26f5..dfbc034 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -69,6 +69,27 @@ class PlannerSuite extends SharedSQLContext {
     testPartialAggregationPlan(query)
   }
 
+  test("mixed aggregates with same distinct columns") {
+    def assertNoExpand(plan: SparkPlan): Unit = {
+      assert(plan.collect { case e: ExpandExec => e }.isEmpty)
+    }
+
+    withTempView("v") {
+      Seq((1, 1.0, 1.0), (1, 2.0, 2.0)).toDF("i", "j", "k").createTempView("v")
+      // one distinct column
+      val query1 = sql("SELECT sum(DISTINCT j), max(DISTINCT j) FROM v GROUP BY i")
+      assertNoExpand(query1.queryExecution.executedPlan)
+
+      // 2 distinct columns
+      val query2 = sql("SELECT corr(DISTINCT j, k), count(DISTINCT j, k) FROM v GROUP BY
i")
+      assertNoExpand(query2.queryExecution.executedPlan)
+
+      // 2 distinct columns with different order
+      val query3 = sql("SELECT corr(DISTINCT j, k), count(DISTINCT k, j) FROM v GROUP BY
i")
+      assertNoExpand(query3.queryExecution.executedPlan)
+    }
+  }
+
   test("sizeInBytes estimation of limit operator for broadcast hash join optimization") {
     def checkPlan(fieldTypes: Seq[DataType]): Unit = {
       withTempView("testLimit") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message