Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6462F200BDC for ; Wed, 14 Dec 2016 11:09:53 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 62F34160B19; Wed, 14 Dec 2016 10:09:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 86828160B13 for ; Wed, 14 Dec 2016 11:09:52 +0100 (CET) Received: (qmail 81534 invoked by uid 500); 14 Dec 2016 10:09:51 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 81525 invoked by uid 99); 14 Dec 2016 10:09:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Dec 2016 10:09:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A05C7E3813; Wed, 14 Dec 2016 10:09:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hvanhovell@apache.org To: commits@spark.apache.org Message-Id: <4311ab4447d3478f9c1e7acc931320c7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-18814][SQL] CheckAnalysis rejects TPCDS query 32 Date: Wed, 14 Dec 2016 10:09:51 +0000 (UTC) archived-at: Wed, 14 Dec 2016 10:09:53 -0000 Repository: spark Updated Branches: refs/heads/branch-2.1 8ef005931 -> f999312e7 [SPARK-18814][SQL] CheckAnalysis rejects TPCDS query 32 ## What changes were proposed in this pull request? Move the checking of GROUP BY column in correlated scalar subquery from CheckAnalysis to Analysis to fix a regression caused by SPARK-18504. This problem can be reproduced with a simple script now. Seq((1,1)).toDF("pk","pv").createOrReplaceTempView("p") Seq((1,1)).toDF("ck","cv").createOrReplaceTempView("c") sql("select * from p,c where p.pk=c.ck and c.cv = (select avg(c1.cv) from c c1 where c1.ck = p.pk)").show The requirements are: 1. We need to reference the same table twice in both the parent and the subquery. Here is the table c. 2. We need to have a correlated predicate but to a different table. Here is from c (as c1) in the subquery to p in the parent. 3. We will then "deduplicate" c1.ck in the subquery to `ck##` at `Project` above `Aggregate` of `avg`. Then when we compare `ck##` and the original group by column `ck#` by their canonicalized form, which is # != #. That's how we trigger the exception added in SPARK-18504. ## How was this patch tested? SubquerySuite and a simplified version of TPCDS-Q32 Author: Nattavut Sutyanyong Closes #16246 from nsyca/18814. (cherry picked from commit cccd64393ea633e29d4a505fb0a7c01b51a79af8) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f999312e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f999312e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f999312e Branch: refs/heads/branch-2.1 Commit: f999312e72940b559738048646013eec9e68d657 Parents: 8ef0059 Author: Nattavut Sutyanyong Authored: Wed Dec 14 11:09:31 2016 +0100 Committer: Herman van Hovell Committed: Wed Dec 14 11:09:45 2016 +0100 ---------------------------------------------------------------------- .../sql/catalyst/analysis/CheckAnalysis.scala | 31 +++++++++---- .../sql-tests/inputs/scalar-subquery.sql | 20 +++++++++ .../sql-tests/results/scalar-subquery.sql.out | 46 ++++++++++++++++++++ .../org/apache/spark/sql/SubquerySuite.scala | 2 +- 4 files changed, 90 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f999312e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 235a799..aa77a6e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -124,6 +124,10 @@ trait CheckAnalysis extends PredicateHelper { s"Scalar subquery must return only one column, but got ${query.output.size}") case s @ ScalarSubquery(query, conditions, _) if conditions.nonEmpty => + + // Collect the columns from the subquery for further checking. + var subqueryColumns = conditions.flatMap(_.references).filter(query.output.contains) + def checkAggregate(agg: Aggregate): Unit = { // Make sure correlated scalar subqueries contain one row for every outer row by // enforcing that they are aggregates which contain exactly one aggregate expressions. @@ -136,24 +140,35 @@ trait CheckAnalysis extends PredicateHelper { failAnalysis("The output of a correlated scalar subquery must be aggregated") } - // SPARK-18504: block cases where GROUP BY columns - // are not part of the correlated columns - val groupByCols = ExpressionSet.apply(agg.groupingExpressions.flatMap(_.references)) - val predicateCols = ExpressionSet.apply(conditions.flatMap(_.references)) - val invalidCols = groupByCols.diff(predicateCols) + // SPARK-18504/SPARK-18814: Block cases where GROUP BY columns + // are not part of the correlated columns. + val groupByCols = AttributeSet(agg.groupingExpressions.flatMap(_.references)) + val correlatedCols = AttributeSet(subqueryColumns) + val invalidCols = groupByCols -- correlatedCols // GROUP BY columns must be a subset of columns in the predicates if (invalidCols.nonEmpty) { failAnalysis( - "a GROUP BY clause in a scalar correlated subquery " + + "A GROUP BY clause in a scalar correlated subquery " + "cannot contain non-correlated columns: " + invalidCols.mkString(",")) } } - // Skip projects and subquery aliases added by the Analyzer and the SQLBuilder. + // Skip subquery aliases added by the Analyzer and the SQLBuilder. + // For projects, do the necessary mapping and skip to its child. def cleanQuery(p: LogicalPlan): LogicalPlan = p match { case s: SubqueryAlias => cleanQuery(s.child) - case p: Project => cleanQuery(p.child) + case p: Project => + // SPARK-18814: Map any aliases to their AttributeReference children + // for the checking in the Aggregate operators below this Project. + subqueryColumns = subqueryColumns.map { + xs => p.projectList.collectFirst { + case e @ Alias(child : AttributeReference, _) if e.exprId == xs.exprId => + child + }.getOrElse(xs) + } + + cleanQuery(p.child) case child => child } http://git-wip-us.apache.org/repos/asf/spark/blob/f999312e/sql/core/src/test/resources/sql-tests/inputs/scalar-subquery.sql ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/inputs/scalar-subquery.sql b/sql/core/src/test/resources/sql-tests/inputs/scalar-subquery.sql new file mode 100644 index 0000000..3acc9db --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/scalar-subquery.sql @@ -0,0 +1,20 @@ +CREATE OR REPLACE TEMPORARY VIEW p AS VALUES (1, 1) AS T(pk, pv); +CREATE OR REPLACE TEMPORARY VIEW c AS VALUES (1, 1) AS T(ck, cv); + +-- SPARK-18814.1: Simplified version of TPCDS-Q32 +SELECT pk, cv +FROM p, c +WHERE p.pk = c.ck +AND c.cv = (SELECT avg(c1.cv) + FROM c c1 + WHERE c1.ck = p.pk); + +-- SPARK-18814.2: Adding stack of aggregates +SELECT pk, cv +FROM p, c +WHERE p.pk = c.ck +AND c.cv = (SELECT max(avg) + FROM (SELECT c1.cv, avg(c1.cv) avg + FROM c c1 + WHERE c1.ck = p.pk + GROUP BY c1.cv)); http://git-wip-us.apache.org/repos/asf/spark/blob/f999312e/sql/core/src/test/resources/sql-tests/results/scalar-subquery.sql.out ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/results/scalar-subquery.sql.out b/sql/core/src/test/resources/sql-tests/results/scalar-subquery.sql.out new file mode 100644 index 0000000..c249329 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/scalar-subquery.sql.out @@ -0,0 +1,46 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 4 + + +-- !query 0 +CREATE OR REPLACE TEMPORARY VIEW p AS VALUES (1, 1) AS T(pk, pv) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +CREATE OR REPLACE TEMPORARY VIEW c AS VALUES (1, 1) AS T(ck, cv) +-- !query 1 schema +struct<> +-- !query 1 output + + + +-- !query 2 +SELECT pk, cv +FROM p, c +WHERE p.pk = c.ck +AND c.cv = (SELECT avg(c1.cv) + FROM c c1 + WHERE c1.ck = p.pk) +-- !query 2 schema +struct +-- !query 2 output +1 1 + + +-- !query 3 +SELECT pk, cv +FROM p, c +WHERE p.pk = c.ck +AND c.cv = (SELECT max(avg) + FROM (SELECT c1.cv, avg(c1.cv) avg + FROM c c1 + WHERE c1.ck = p.pk + GROUP BY c1.cv)) +-- !query 3 schema +struct +-- !query 3 output +1 1 http://git-wip-us.apache.org/repos/asf/spark/blob/f999312e/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 0f2f520..5a4b1cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -491,7 +491,7 @@ class SubquerySuite extends QueryTest with SharedSQLContext { sql("select (select sum(-1) from t t2 where t1.c2 = t2.c1 group by t2.c2) sum from t t1") } assert(errMsg.getMessage.contains( - "a GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns:")) + "A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns:")) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org