spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hvanhov...@apache.org
Subject spark git commit: [SPARK-18814][SQL] CheckAnalysis rejects TPCDS query 32
Date Wed, 14 Dec 2016 10:09:51 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 8ef005931 -> f999312e7


[SPARK-18814][SQL] CheckAnalysis rejects TPCDS query 32

## What changes were proposed in this pull request?
Move the checking of GROUP BY column in correlated scalar subquery from CheckAnalysis
to Analysis to fix a regression caused by SPARK-18504.

This problem can be reproduced with a simple script now.

Seq((1,1)).toDF("pk","pv").createOrReplaceTempView("p")
Seq((1,1)).toDF("ck","cv").createOrReplaceTempView("c")
sql("select * from p,c where p.pk=c.ck and c.cv = (select avg(c1.cv) from c c1 where c1.ck
= p.pk)").show

The requirements are:
1. We need to reference the same table twice in both the parent and the subquery. Here is
the table c.
2. We need to have a correlated predicate but to a different table. Here is from c (as c1)
in the subquery to p in the parent.
3. We will then "deduplicate" c1.ck in the subquery to `ck#<n1>#<n2>` at `Project`
above `Aggregate` of `avg`. Then when we compare `ck#<n1>#<n2>` and the original
group by column `ck#<n1>` by their canonicalized form, which is #<n2> != #<n1>.
That's how we trigger the exception added in SPARK-18504.

## How was this patch tested?

SubquerySuite and a simplified version of TPCDS-Q32

Author: Nattavut Sutyanyong <nsy.can@gmail.com>

Closes #16246 from nsyca/18814.

(cherry picked from commit cccd64393ea633e29d4a505fb0a7c01b51a79af8)
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f999312e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f999312e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f999312e

Branch: refs/heads/branch-2.1
Commit: f999312e72940b559738048646013eec9e68d657
Parents: 8ef0059
Author: Nattavut Sutyanyong <nsy.can@gmail.com>
Authored: Wed Dec 14 11:09:31 2016 +0100
Committer: Herman van Hovell <hvanhovell@databricks.com>
Committed: Wed Dec 14 11:09:45 2016 +0100

----------------------------------------------------------------------
 .../sql/catalyst/analysis/CheckAnalysis.scala   | 31 +++++++++----
 .../sql-tests/inputs/scalar-subquery.sql        | 20 +++++++++
 .../sql-tests/results/scalar-subquery.sql.out   | 46 ++++++++++++++++++++
 .../org/apache/spark/sql/SubquerySuite.scala    |  2 +-
 4 files changed, 90 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f999312e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 235a799..aa77a6e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -124,6 +124,10 @@ trait CheckAnalysis extends PredicateHelper {
                 s"Scalar subquery must return only one column, but got ${query.output.size}")
 
           case s @ ScalarSubquery(query, conditions, _) if conditions.nonEmpty =>
+
+            // Collect the columns from the subquery for further checking.
+            var subqueryColumns = conditions.flatMap(_.references).filter(query.output.contains)
+
             def checkAggregate(agg: Aggregate): Unit = {
               // Make sure correlated scalar subqueries contain one row for every outer row
by
               // enforcing that they are aggregates which contain exactly one aggregate expressions.
@@ -136,24 +140,35 @@ trait CheckAnalysis extends PredicateHelper {
                 failAnalysis("The output of a correlated scalar subquery must be aggregated")
               }
 
-              // SPARK-18504: block cases where GROUP BY columns
-              // are not part of the correlated columns
-              val groupByCols = ExpressionSet.apply(agg.groupingExpressions.flatMap(_.references))
-              val predicateCols = ExpressionSet.apply(conditions.flatMap(_.references))
-              val invalidCols = groupByCols.diff(predicateCols)
+              // SPARK-18504/SPARK-18814: Block cases where GROUP BY columns
+              // are not part of the correlated columns.
+              val groupByCols = AttributeSet(agg.groupingExpressions.flatMap(_.references))
+              val correlatedCols = AttributeSet(subqueryColumns)
+              val invalidCols = groupByCols -- correlatedCols
               // GROUP BY columns must be a subset of columns in the predicates
               if (invalidCols.nonEmpty) {
                 failAnalysis(
-                  "a GROUP BY clause in a scalar correlated subquery " +
+                  "A GROUP BY clause in a scalar correlated subquery " +
                     "cannot contain non-correlated columns: " +
                     invalidCols.mkString(","))
               }
             }
 
-            // Skip projects and subquery aliases added by the Analyzer and the SQLBuilder.
+            // Skip subquery aliases added by the Analyzer and the SQLBuilder.
+            // For projects, do the necessary mapping and skip to its child.
             def cleanQuery(p: LogicalPlan): LogicalPlan = p match {
               case s: SubqueryAlias => cleanQuery(s.child)
-              case p: Project => cleanQuery(p.child)
+              case p: Project =>
+                // SPARK-18814: Map any aliases to their AttributeReference children
+                // for the checking in the Aggregate operators below this Project.
+                subqueryColumns = subqueryColumns.map {
+                  xs => p.projectList.collectFirst {
+                    case e @ Alias(child : AttributeReference, _) if e.exprId == xs.exprId
=>
+                      child
+                  }.getOrElse(xs)
+                }
+
+                cleanQuery(p.child)
               case child => child
             }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f999312e/sql/core/src/test/resources/sql-tests/inputs/scalar-subquery.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/scalar-subquery.sql b/sql/core/src/test/resources/sql-tests/inputs/scalar-subquery.sql
new file mode 100644
index 0000000..3acc9db
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/scalar-subquery.sql
@@ -0,0 +1,20 @@
+CREATE OR REPLACE TEMPORARY VIEW p AS VALUES (1, 1) AS T(pk, pv);
+CREATE OR REPLACE TEMPORARY VIEW c AS VALUES (1, 1) AS T(ck, cv);
+
+-- SPARK-18814.1: Simplified version of TPCDS-Q32
+SELECT pk, cv
+FROM   p, c
+WHERE  p.pk = c.ck
+AND    c.cv = (SELECT avg(c1.cv)
+               FROM   c c1
+               WHERE  c1.ck = p.pk);
+
+-- SPARK-18814.2: Adding stack of aggregates
+SELECT pk, cv
+FROM   p, c
+WHERE  p.pk = c.ck
+AND    c.cv = (SELECT max(avg)
+	       FROM   (SELECT   c1.cv, avg(c1.cv) avg
+		       FROM     c c1
+		       WHERE    c1.ck = p.pk
+                       GROUP BY c1.cv));

http://git-wip-us.apache.org/repos/asf/spark/blob/f999312e/sql/core/src/test/resources/sql-tests/results/scalar-subquery.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/scalar-subquery.sql.out b/sql/core/src/test/resources/sql-tests/results/scalar-subquery.sql.out
new file mode 100644
index 0000000..c249329
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/scalar-subquery.sql.out
@@ -0,0 +1,46 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 4
+
+
+-- !query 0
+CREATE OR REPLACE TEMPORARY VIEW p AS VALUES (1, 1) AS T(pk, pv)
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+CREATE OR REPLACE TEMPORARY VIEW c AS VALUES (1, 1) AS T(ck, cv)
+-- !query 1 schema
+struct<>
+-- !query 1 output
+
+
+
+-- !query 2
+SELECT pk, cv
+FROM   p, c
+WHERE  p.pk = c.ck
+AND    c.cv = (SELECT avg(c1.cv)
+               FROM   c c1
+               WHERE  c1.ck = p.pk)
+-- !query 2 schema
+struct<pk:int,cv:int>
+-- !query 2 output
+1	1
+
+
+-- !query 3
+SELECT pk, cv
+FROM   p, c
+WHERE  p.pk = c.ck
+AND    c.cv = (SELECT max(avg)
+	       FROM   (SELECT   c1.cv, avg(c1.cv) avg
+		       FROM     c c1
+		       WHERE    c1.ck = p.pk
+                       GROUP BY c1.cv))
+-- !query 3 schema
+struct<pk:int,cv:int>
+-- !query 3 output
+1	1

http://git-wip-us.apache.org/repos/asf/spark/blob/f999312e/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 0f2f520..5a4b1cf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -491,7 +491,7 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
         sql("select (select sum(-1) from t t2 where t1.c2 = t2.c1 group by t2.c2) sum from
t t1")
       }
       assert(errMsg.getMessage.contains(
-        "a GROUP BY clause in a scalar correlated subquery cannot contain non-correlated
columns:"))
+        "A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated
columns:"))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message