spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-22211][SQL] Remove incorrect FOJ limit pushdown
Date Sun, 05 Nov 2017 05:55:08 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 4074ed2e1 -> 5e3837380


[SPARK-22211][SQL] Remove incorrect FOJ limit pushdown

It's not safe in all cases to push down a LIMIT below a FULL OUTER
JOIN. If the limit is pushed to one side of the FOJ, the physical
join operator can not tell if a row in the non-limited side would have a
match in the other side.

*If* the join operator guarantees that unmatched tuples from the limited
side are emitted before any unmatched tuples from the other side,
pushing down the limit is safe. But this is impractical for some join
implementations, e.g. SortMergeJoin.

For now, disable limit pushdown through a FULL OUTER JOIN, and we can
evaluate whether a more complicated solution is necessary in the future.

Ran org.apache.spark.sql.* tests. Altered full outer join tests in
LimitPushdownSuite.

Author: Henry Robinson <henry@cloudera.com>

Closes #19647 from henryr/spark-22211.

(cherry picked from commit 6c6626614e59b2e8d66ca853a74638d3d6267d73)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5e383738
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5e383738
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5e383738

Branch: refs/heads/branch-2.2
Commit: 5e3837380ddcac5c4c0b489ee81e3415ce8ca633
Parents: 4074ed2
Author: Henry Robinson <henry@cloudera.com>
Authored: Sat Nov 4 22:47:25 2017 -0700
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Sat Nov 4 22:54:51 2017 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 24 ++++----------------
 .../catalyst/optimizer/LimitPushdownSuite.scala | 21 ++++++++---------
 2 files changed, 15 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5e383738/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 71e03ee..82bd759 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -299,12 +299,11 @@ case class LimitPushDown(conf: SQLConf) extends Rule[LogicalPlan] {
     // pushdown Limit.
     case LocalLimit(exp, Union(children)) =>
       LocalLimit(exp, Union(children.map(maybePushLimit(exp, _))))
-    // Add extra limits below OUTER JOIN. For LEFT OUTER and FULL OUTER JOIN we push limits
to the
-    // left and right sides, respectively. For FULL OUTER JOIN, we can only push limits to
one side
-    // because we need to ensure that rows from the limited side still have an opportunity
to match
-    // against all candidates from the non-limited side. We also need to ensure that this
limit
-    // pushdown rule will not eventually introduce limits on both sides if it is applied
multiple
-    // times. Therefore:
+    // Add extra limits below OUTER JOIN. For LEFT OUTER and RIGHT OUTER JOIN we push limits
to
+    // the left and right sides, respectively. It's not safe to push limits below FULL OUTER
+    // JOIN in the general case without a more invasive rewrite.
+    // We also need to ensure that this limit pushdown rule will not eventually introduce
limits
+    // on both sides if it is applied multiple times. Therefore:
     //   - If one side is already limited, stack another limit on top if the new limit is
smaller.
     //     The redundant limit will be collapsed by the CombineLimits rule.
     //   - If neither side is limited, limit the side that is estimated to be bigger.
@@ -312,19 +311,6 @@ case class LimitPushDown(conf: SQLConf) extends Rule[LogicalPlan] {
       val newJoin = joinType match {
         case RightOuter => join.copy(right = maybePushLimit(exp, right))
         case LeftOuter => join.copy(left = maybePushLimit(exp, left))
-        case FullOuter =>
-          (left.maxRows, right.maxRows) match {
-            case (None, None) =>
-              if (left.stats(conf).sizeInBytes >= right.stats(conf).sizeInBytes) {
-                join.copy(left = maybePushLimit(exp, left))
-              } else {
-                join.copy(right = maybePushLimit(exp, right))
-              }
-            case (Some(_), Some(_)) => join
-            case (Some(_), None) => join.copy(left = maybePushLimit(exp, left))
-            case (None, Some(_)) => join.copy(right = maybePushLimit(exp, right))
-
-          }
         case _ => join
       }
       LocalLimit(exp, newJoin)

http://git-wip-us.apache.org/repos/asf/spark/blob/5e383738/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
index 2885fd6..6e15cf2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
@@ -113,8 +113,8 @@ class LimitPushdownSuite extends PlanTest {
     assert(x.stats(conf).sizeInBytes === y.stats(conf).sizeInBytes)
     val originalQuery = x.join(y, FullOuter).limit(1)
     val optimized = Optimize.execute(originalQuery.analyze)
-    val correctAnswer = Limit(1, LocalLimit(1, x).join(y, FullOuter)).analyze
-    comparePlans(optimized, correctAnswer)
+    // No pushdown for FULL OUTER JOINS.
+    comparePlans(optimized, originalQuery)
   }
 
   test("full outer join where neither side is limited and left side has larger statistics")
{
@@ -122,8 +122,8 @@ class LimitPushdownSuite extends PlanTest {
     assert(xBig.stats(conf).sizeInBytes > y.stats(conf).sizeInBytes)
     val originalQuery = xBig.join(y, FullOuter).limit(1)
     val optimized = Optimize.execute(originalQuery.analyze)
-    val correctAnswer = Limit(1, LocalLimit(1, xBig).join(y, FullOuter)).analyze
-    comparePlans(optimized, correctAnswer)
+    // No pushdown for FULL OUTER JOINS.
+    comparePlans(optimized, originalQuery)
   }
 
   test("full outer join where neither side is limited and right side has larger statistics")
{
@@ -131,15 +131,14 @@ class LimitPushdownSuite extends PlanTest {
     assert(x.stats(conf).sizeInBytes < yBig.stats(conf).sizeInBytes)
     val originalQuery = x.join(yBig, FullOuter).limit(1)
     val optimized = Optimize.execute(originalQuery.analyze)
-    val correctAnswer = Limit(1, x.join(LocalLimit(1, yBig), FullOuter)).analyze
-    comparePlans(optimized, correctAnswer)
+    // No pushdown for FULL OUTER JOINS.
+    comparePlans(optimized, originalQuery)
   }
 
   test("full outer join where both sides are limited") {
-    val originalQuery = x.limit(2).join(y.limit(2), FullOuter).limit(1)
-    val optimized = Optimize.execute(originalQuery.analyze)
-    val correctAnswer = Limit(1, Limit(2, x).join(Limit(2, y), FullOuter)).analyze
-    comparePlans(optimized, correctAnswer)
+    val originalQuery = x.limit(2).join(y.limit(2), FullOuter).limit(1).analyze
+    val optimized = Optimize.execute(originalQuery)
+    // No pushdown for FULL OUTER JOINS.
+    comparePlans(optimized, originalQuery)
   }
 }
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message