spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hvanhov...@apache.org
Subject spark git commit: [SPARK-18597][SQL] Do not push-down join conditions to the left side of a Left Anti join [BRANCH-2.0]
Date Mon, 28 Nov 2016 19:21:05 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 759bd4a6a -> f158045fd


[SPARK-18597][SQL] Do not push-down join conditions to the left side of a Left Anti join [BRANCH-2.0]

## What changes were proposed in this pull request?
We currently push down join conditions of a Left Anti join to both sides of the join. This
is similar to Inner, Left Semi and Existence (a specialized left semi) join. The problem is
that this changes the semantics of the join; a left anti join filters out rows that matches
the join condition.

This PR fixes this by only pushing down conditions to the right hand side of the join. This
is similar to the behavior of left outer join.

This PR is a backport of https://github.com/apache/spark/pull/16026

## How was this patch tested?
Added tests to `FilterPushdownSuite.scala` and created a SQLQueryTestSuite file for left anti
joins with a regression test.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #16039 from hvanhovell/SPARK-18597-branch-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f158045f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f158045f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f158045f

Branch: refs/heads/branch-2.0
Commit: f158045fde14b3018493e9059e9dcb2095f11c54
Parents: 759bd4a
Author: Herman van Hovell <hvanhovell@databricks.com>
Authored: Mon Nov 28 11:20:59 2016 -0800
Committer: Herman van Hovell <hvanhovell@databricks.com>
Committed: Mon Nov 28 11:20:59 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      |  6 ++--
 .../optimizer/FilterPushdownSuite.scala         | 33 ++++++++++++++++++++
 .../resources/sql-tests/inputs/anti-join.sql    |  7 +++++
 .../sql-tests/results/anti-join.sql.out         | 29 +++++++++++++++++
 4 files changed, 72 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f158045f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 0a28ef4..3a71463 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1289,7 +1289,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper
{
         split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
 
       joinType match {
-        case Inner | LeftExistence(_) =>
+        case Inner |  LeftSemi | ExistenceJoin(_) =>
           // push down the single side only join filter for both sides sub queries
           val newLeft = leftJoinConditions.
             reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
@@ -1306,14 +1306,14 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper
{
           val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
 
           Join(newLeft, newRight, RightOuter, newJoinCond)
-        case LeftOuter =>
+        case LeftOuter | LeftAnti =>
           // push down the right side only join filter for right sub query
           val newLeft = left
           val newRight = rightJoinConditions.
             reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
           val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
 
-          Join(newLeft, newRight, LeftOuter, newJoinCond)
+          Join(newLeft, newRight, joinType, newJoinCond)
         case FullOuter => j
         case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node")
         case UsingJoin(_, _) => sys.error("Untransformed Using join node")

http://git-wip-us.apache.org/repos/asf/spark/blob/f158045f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 019f132..3e67282 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -514,6 +514,39 @@ class FilterPushdownSuite extends PlanTest {
     comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
   }
 
+  test("joins: push down where clause into left anti join") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+    val originalQuery =
+      x.join(y, LeftAnti, Some("x.b".attr === "y.b".attr))
+        .where("x.a".attr > 10)
+        .analyze
+    val optimized = Optimize.execute(originalQuery)
+    val correctAnswer =
+      x.where("x.a".attr > 10)
+        .join(y, LeftAnti, Some("x.b".attr === "y.b".attr))
+        .analyze
+    comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
+  }
+
+  test("joins: only push down join conditions to the right of a left anti join") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+    val originalQuery =
+      x.join(y,
+        LeftAnti,
+        Some("x.b".attr === "y.b".attr && "y.a".attr > 10 && "x.a".attr
> 10)).analyze
+    val optimized = Optimize.execute(originalQuery)
+    val correctAnswer =
+      x.join(
+        y.where("y.a".attr > 10),
+        LeftAnti,
+        Some("x.b".attr === "y.b".attr && "x.a".attr > 10))
+        .analyze
+    comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
+  }
+
+
   val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType))
 
   test("generate: predicate referenced no generated column") {

http://git-wip-us.apache.org/repos/asf/spark/blob/f158045f/sql/core/src/test/resources/sql-tests/inputs/anti-join.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/anti-join.sql b/sql/core/src/test/resources/sql-tests/inputs/anti-join.sql
new file mode 100644
index 0000000..0346f57
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/anti-join.sql
@@ -0,0 +1,7 @@
+-- SPARK-18597: Do not push down predicates to left hand side in an anti-join
+CREATE OR REPLACE TEMPORARY VIEW tbl_a AS VALUES (1, 1), (2, 1), (3, 6) AS T(c1, c2);
+CREATE OR REPLACE TEMPORARY VIEW tbl_b AS VALUES 1 AS T(c1);
+
+SELECT *
+FROM   tbl_a
+       LEFT ANTI JOIN tbl_b ON ((tbl_a.c1 = tbl_a.c2) IS NULL OR tbl_a.c1 = tbl_a.c2);

http://git-wip-us.apache.org/repos/asf/spark/blob/f158045f/sql/core/src/test/resources/sql-tests/results/anti-join.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/anti-join.sql.out b/sql/core/src/test/resources/sql-tests/results/anti-join.sql.out
new file mode 100644
index 0000000..6f38c4d
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/anti-join.sql.out
@@ -0,0 +1,29 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 3
+
+
+-- !query 0
+CREATE OR REPLACE TEMPORARY VIEW tbl_a AS VALUES (1, 1), (2, 1), (3, 6) AS T(c1, c2)
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+CREATE OR REPLACE TEMPORARY VIEW tbl_b AS VALUES 1 AS T(c1)
+-- !query 1 schema
+struct<>
+-- !query 1 output
+
+
+
+-- !query 2
+SELECT *
+FROM   tbl_a
+       LEFT ANTI JOIN tbl_b ON ((tbl_a.c1 = tbl_a.c2) IS NULL OR tbl_a.c1 = tbl_a.c2)
+-- !query 2 schema
+struct<c1:int,c2:int>
+-- !query 2 output
+2	1
+3	6


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message