Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D3E27200D3F for ; Sun, 5 Nov 2017 06:47:33 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D2552160BFD; Sun, 5 Nov 2017 05:47:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F0771160BE9 for ; Sun, 5 Nov 2017 06:47:32 +0100 (CET) Received: (qmail 73847 invoked by uid 500); 5 Nov 2017 05:47:32 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 73838 invoked by uid 99); 5 Nov 2017 05:47:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 05 Nov 2017 05:47:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4E39EDFCFA; Sun, 5 Nov 2017 05:47:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lixiao@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-22211][SQL] Remove incorrect FOJ limit pushdown Date: Sun, 5 Nov 2017 05:47:30 +0000 (UTC) archived-at: Sun, 05 Nov 2017 05:47:34 -0000 Repository: spark Updated Branches: refs/heads/master f7f4e9c2d -> 6c6626614 [SPARK-22211][SQL] Remove incorrect FOJ limit pushdown ## What changes were proposed in this pull request? It's not safe in all cases to push down a LIMIT below a FULL OUTER JOIN. If the limit is pushed to one side of the FOJ, the physical join operator can not tell if a row in the non-limited side would have a match in the other side. *If* the join operator guarantees that unmatched tuples from the limited side are emitted before any unmatched tuples from the other side, pushing down the limit is safe. But this is impractical for some join implementations, e.g. SortMergeJoin. For now, disable limit pushdown through a FULL OUTER JOIN, and we can evaluate whether a more complicated solution is necessary in the future. ## How was this patch tested? Ran org.apache.spark.sql.* tests. Altered full outer join tests in LimitPushdownSuite. Author: Henry Robinson Closes #19647 from henryr/spark-22211. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c662661 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c662661 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c662661 Branch: refs/heads/master Commit: 6c6626614e59b2e8d66ca853a74638d3d6267d73 Parents: f7f4e9c Author: Henry Robinson Authored: Sat Nov 4 22:47:25 2017 -0700 Committer: gatorsmile Committed: Sat Nov 4 22:47:25 2017 -0700 ---------------------------------------------------------------------- .../sql/catalyst/optimizer/Optimizer.scala | 24 +++----------- .../catalyst/optimizer/LimitPushdownSuite.scala | 33 ++++++++++---------- 2 files changed, 21 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6c662661/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 3273a61..3a3ccd5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -332,12 +332,11 @@ object LimitPushDown extends Rule[LogicalPlan] { // pushdown Limit. case LocalLimit(exp, Union(children)) => LocalLimit(exp, Union(children.map(maybePushLocalLimit(exp, _)))) - // Add extra limits below OUTER JOIN. For LEFT OUTER and FULL OUTER JOIN we push limits to the - // left and right sides, respectively. For FULL OUTER JOIN, we can only push limits to one side - // because we need to ensure that rows from the limited side still have an opportunity to match - // against all candidates from the non-limited side. We also need to ensure that this limit - // pushdown rule will not eventually introduce limits on both sides if it is applied multiple - // times. Therefore: + // Add extra limits below OUTER JOIN. For LEFT OUTER and RIGHT OUTER JOIN we push limits to + // the left and right sides, respectively. It's not safe to push limits below FULL OUTER + // JOIN in the general case without a more invasive rewrite. + // We also need to ensure that this limit pushdown rule will not eventually introduce limits + // on both sides if it is applied multiple times. Therefore: // - If one side is already limited, stack another limit on top if the new limit is smaller. // The redundant limit will be collapsed by the CombineLimits rule. // - If neither side is limited, limit the side that is estimated to be bigger. @@ -345,19 +344,6 @@ object LimitPushDown extends Rule[LogicalPlan] { val newJoin = joinType match { case RightOuter => join.copy(right = maybePushLocalLimit(exp, right)) case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left)) - case FullOuter => - (left.maxRows, right.maxRows) match { - case (None, None) => - if (left.stats.sizeInBytes >= right.stats.sizeInBytes) { - join.copy(left = maybePushLocalLimit(exp, left)) - } else { - join.copy(right = maybePushLocalLimit(exp, right)) - } - case (Some(_), Some(_)) => join - case (Some(_), None) => join.copy(left = maybePushLocalLimit(exp, left)) - case (None, Some(_)) => join.copy(right = maybePushLocalLimit(exp, right)) - - } case _ => join } LocalLimit(exp, newJoin) http://git-wip-us.apache.org/repos/asf/spark/blob/6c662661/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala index f50e2e8..cc98d23 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala @@ -113,35 +113,34 @@ class LimitPushdownSuite extends PlanTest { test("full outer join where neither side is limited and both sides have same statistics") { assert(x.stats.sizeInBytes === y.stats.sizeInBytes) - val originalQuery = x.join(y, FullOuter).limit(1) - val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = Limit(1, LocalLimit(1, x).join(y, FullOuter)).analyze - comparePlans(optimized, correctAnswer) + val originalQuery = x.join(y, FullOuter).limit(1).analyze + val optimized = Optimize.execute(originalQuery) + // No pushdown for FULL OUTER JOINS. + comparePlans(optimized, originalQuery) } test("full outer join where neither side is limited and left side has larger statistics") { val xBig = testRelation.copy(data = Seq.fill(2)(null)).subquery('x) assert(xBig.stats.sizeInBytes > y.stats.sizeInBytes) - val originalQuery = xBig.join(y, FullOuter).limit(1) - val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = Limit(1, LocalLimit(1, xBig).join(y, FullOuter)).analyze - comparePlans(optimized, correctAnswer) + val originalQuery = xBig.join(y, FullOuter).limit(1).analyze + val optimized = Optimize.execute(originalQuery) + // No pushdown for FULL OUTER JOINS. + comparePlans(optimized, originalQuery) } test("full outer join where neither side is limited and right side has larger statistics") { val yBig = testRelation.copy(data = Seq.fill(2)(null)).subquery('y) assert(x.stats.sizeInBytes < yBig.stats.sizeInBytes) - val originalQuery = x.join(yBig, FullOuter).limit(1) - val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = Limit(1, x.join(LocalLimit(1, yBig), FullOuter)).analyze - comparePlans(optimized, correctAnswer) + val originalQuery = x.join(yBig, FullOuter).limit(1).analyze + val optimized = Optimize.execute(originalQuery) + // No pushdown for FULL OUTER JOINS. + comparePlans(optimized, originalQuery) } test("full outer join where both sides are limited") { - val originalQuery = x.limit(2).join(y.limit(2), FullOuter).limit(1) - val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = Limit(1, Limit(2, x).join(Limit(2, y), FullOuter)).analyze - comparePlans(optimized, correctAnswer) + val originalQuery = x.limit(2).join(y.limit(2), FullOuter).limit(1).analyze + val optimized = Optimize.execute(originalQuery) + // No pushdown for FULL OUTER JOINS. + comparePlans(optimized, originalQuery) } } - --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org