spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject spark git commit: [SPARK-17698][SQL] Join predicates should not contain filter clauses
Date Sat, 22 Oct 2016 23:32:52 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 b959dab32 -> 3d5878751


[SPARK-17698][SQL] Join predicates should not contain filter clauses

## What changes were proposed in this pull request?

This is a backport of https://github.com/apache/spark/pull/15272 to 2.0 branch.

Jira : https://issues.apache.org/jira/browse/SPARK-17698

`ExtractEquiJoinKeys` is incorrectly using filter predicates as the join condition for joins.
`canEvaluate` [0] tries to see if the an `Expression` can be evaluated using output of a given
`Plan`. In case of filter predicates (eg. `a.id='1'`), the `Expression` passed for the right
hand side (ie. '1' ) is a `Literal` which does not have any attribute references. Thus `expr.references`
is an empty set which theoretically is a subset of any set. This leads to `canEvaluate` returning
`true` and `a.id='1'` is treated as a join predicate. While this does not lead to incorrect
results but in case of bucketed + sorted tables, we might miss out on avoiding un-necessary
shuffle + sort. See example below:

[0] : https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L91

eg.

```
val df = (1 until 10).toDF("id").coalesce(1)
hc.sql("DROP TABLE IF EXISTS table1").collect
df.write.bucketBy(8, "id").sortBy("id").saveAsTable("table1")
hc.sql("DROP TABLE IF EXISTS table2").collect
df.write.bucketBy(8, "id").sortBy("id").saveAsTable("table2")

sqlContext.sql("""
  SELECT a.id, b.id
  FROM table1 a
  FULL OUTER JOIN table2 b
  ON a.id = b.id AND a.id='1' AND b.id='1'
""").explain(true)
```

BEFORE: This is doing shuffle + sort over table scan outputs which is not needed as both tables
are bucketed and sorted on the same columns and have same number of buckets. This should be
a single stage job.

```
SortMergeJoin [id#38, cast(id#38 as double), 1.0], [id#39, 1.0, cast(id#39 as double)], FullOuter
:- *Sort [id#38 ASC NULLS FIRST, cast(id#38 as double) ASC NULLS FIRST, 1.0 ASC NULLS FIRST],
false, 0
:  +- Exchange hashpartitioning(id#38, cast(id#38 as double), 1.0, 200)
:     +- *FileScan parquet default.table1[id#38] Batched: true, Format: ParquetFormat, InputPaths:
file:spark-warehouse/table1, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
+- *Sort [id#39 ASC NULLS FIRST, 1.0 ASC NULLS FIRST, cast(id#39 as double) ASC NULLS FIRST],
false, 0
   +- Exchange hashpartitioning(id#39, 1.0, cast(id#39 as double), 200)
      +- *FileScan parquet default.table2[id#39] Batched: true, Format: ParquetFormat, InputPaths:
file:spark-warehouse/table2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
```

AFTER :

```
SortMergeJoin [id#32], [id#33], FullOuter, ((cast(id#32 as double) = 1.0) && (cast(id#33
as double) = 1.0))
:- *FileScan parquet default.table1[id#32] Batched: true, Format: ParquetFormat, InputPaths:
file:spark-warehouse/table1, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
+- *FileScan parquet default.table2[id#33] Batched: true, Format: ParquetFormat, InputPaths:
file:spark-warehouse/table2, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
```

## How was this patch tested?

- Added a new test case for this scenario : `SPARK-17698 Join predicates should not contain
filter clauses`
- Ran all the tests in `BucketedReadSuite`

Author: Tejas Patil <tejasp@fb.com>

Closes #15600 from tejasapatil/SPARK-17698_2.0_backport.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d587875
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d587875
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d587875

Branch: refs/heads/branch-2.0
Commit: 3d587875102fc2f10f03956ef50457203cb4a840
Parents: b959dab
Author: Tejas Patil <tejasp@fb.com>
Authored: Sat Oct 22 16:32:49 2016 -0700
Committer: Reynold Xin <rxin@databricks.com>
Committed: Sat Oct 22 16:32:49 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/expressions/predicates.scala   |  5 +-
 .../spark/sql/catalyst/optimizer/joins.scala    |  4 +-
 .../spark/sql/catalyst/planning/patterns.scala  |  2 +
 .../spark/sql/sources/BucketedReadSuite.scala   | 82 +++++++++++++++++---
 4 files changed, 79 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3d587875/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 100087e..abe0f08 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -84,8 +84,9 @@ trait PredicateHelper {
    *
    * For example consider a join between two relations R(a, b) and S(c, d).
    *
-   * `canEvaluate(EqualTo(a,b), R)` returns `true` where as `canEvaluate(EqualTo(a,c), R)`
returns
-   * `false`.
+   * - `canEvaluate(EqualTo(a,b), R)` returns `true`
+   * - `canEvaluate(EqualTo(a,c), R)` returns `false`
+   * - `canEvaluate(Literal(1), R)` returns `true` as literals CAN be evaluated on any plan
    */
   protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean =
     expr.references.subsetOf(plan.outputSet)

http://git-wip-us.apache.org/repos/asf/spark/blob/3d587875/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index ae4cd8e..08062bd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -59,7 +59,9 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
       // find out the first join that have at least one join condition
       val conditionalJoin = rest.find { plan =>
         val refs = left.outputSet ++ plan.outputSet
-        conditions.filterNot(canEvaluate(_, left)).filterNot(canEvaluate(_, plan))
+        conditions
+          .filterNot(l => l.references.nonEmpty && canEvaluate(l, left))
+          .filterNot(r => r.references.nonEmpty && canEvaluate(r, plan))
           .exists(_.references.subsetOf(refs))
       }
       // pick the next one if no condition left

http://git-wip-us.apache.org/repos/asf/spark/blob/3d587875/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index f42e67c..d952c9e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -112,6 +112,7 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
       // as join keys.
       val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil)
       val joinKeys = predicates.flatMap {
+        case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => None
         case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) =>
Some((l, r))
         case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) =>
Some((r, l))
         // Replace null with default value for joining key, then those rows with null in
it could
@@ -125,6 +126,7 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
         case other => None
       }
       val otherPredicates = predicates.filterNot {
+        case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => false
         case EqualTo(l, r) =>
           canEvaluate(l, left) && canEvaluate(r, right) ||
             canEvaluate(l, right) && canEvaluate(r, left)

http://git-wip-us.apache.org/repos/asf/spark/blob/3d587875/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index fc01ff3..3554b0e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -234,7 +234,8 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
   private def testBucketing(
       bucketSpecLeft: Option[BucketSpec],
       bucketSpecRight: Option[BucketSpec],
-      joinColumns: Seq[String],
+      joinType: String = "inner",
+      joinCondition: (DataFrame, DataFrame) => Column,
       shuffleLeft: Boolean,
       shuffleRight: Boolean): Unit = {
     withTable("bucketed_table1", "bucketed_table2") {
@@ -256,12 +257,12 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
         SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
         val t1 = spark.table("bucketed_table1")
         val t2 = spark.table("bucketed_table2")
-        val joined = t1.join(t2, joinCondition(t1, t2, joinColumns))
+        val joined = t1.join(t2, joinCondition(t1, t2), joinType)
 
         // First check the result is corrected.
         checkAnswer(
           joined.sort("bucketed_table1.k", "bucketed_table2.k"),
-          df1.join(df2, joinCondition(df1, df2, joinColumns)).sort("df1.k", "df2.k"))
+          df1.join(df2, joinCondition(df1, df2), joinType).sort("df1.k", "df2.k"))
 
         assert(joined.queryExecution.executedPlan.isInstanceOf[SortMergeJoinExec])
         val joinOperator = joined.queryExecution.executedPlan.asInstanceOf[SortMergeJoinExec]
@@ -276,47 +277,89 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
     }
   }
 
-  private def joinCondition(left: DataFrame, right: DataFrame, joinCols: Seq[String]): Column
= {
+  private def joinCondition(joinCols: Seq[String]) (left: DataFrame, right: DataFrame): Column
= {
     joinCols.map(col => left(col) === right(col)).reduce(_ && _)
   }
 
   test("avoid shuffle when join 2 bucketed tables") {
     val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
-    testBucketing(bucketSpec, bucketSpec, Seq("i", "j"), shuffleLeft = false, shuffleRight
= false)
+    testBucketing(
+      bucketSpecLeft = bucketSpec,
+      bucketSpecRight = bucketSpec,
+      joinCondition = joinCondition(Seq("i", "j")),
+      shuffleLeft = false,
+      shuffleRight = false
+    )
   }
 
   // Enable it after fix https://issues.apache.org/jira/browse/SPARK-12704
   ignore("avoid shuffle when join keys are a super-set of bucket keys") {
     val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil))
-    testBucketing(bucketSpec, bucketSpec, Seq("i", "j"), shuffleLeft = false, shuffleRight
= false)
+    testBucketing(
+      bucketSpecLeft = bucketSpec,
+      bucketSpecRight = bucketSpec,
+      joinCondition = joinCondition(Seq("i", "j")),
+      shuffleLeft = false,
+      shuffleRight = false
+    )
   }
 
   test("only shuffle one side when join bucketed table and non-bucketed table") {
     val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
-    testBucketing(bucketSpec, None, Seq("i", "j"), shuffleLeft = false, shuffleRight = true)
+    testBucketing(
+      bucketSpecLeft = bucketSpec,
+      bucketSpecRight = None,
+      joinCondition = joinCondition(Seq("i", "j")),
+      shuffleLeft = false,
+      shuffleRight = true
+    )
   }
 
   test("only shuffle one side when 2 bucketed tables have different bucket number") {
     val bucketSpec1 = Some(BucketSpec(8, Seq("i", "j"), Nil))
     val bucketSpec2 = Some(BucketSpec(5, Seq("i", "j"), Nil))
-    testBucketing(bucketSpec1, bucketSpec2, Seq("i", "j"), shuffleLeft = false, shuffleRight
= true)
+    testBucketing(
+      bucketSpecLeft = bucketSpec1,
+      bucketSpecRight = bucketSpec2,
+      joinCondition = joinCondition(Seq("i", "j")),
+      shuffleLeft = false,
+      shuffleRight = true
+    )
   }
 
   test("only shuffle one side when 2 bucketed tables have different bucket keys") {
     val bucketSpec1 = Some(BucketSpec(8, Seq("i"), Nil))
     val bucketSpec2 = Some(BucketSpec(8, Seq("j"), Nil))
-    testBucketing(bucketSpec1, bucketSpec2, Seq("i"), shuffleLeft = false, shuffleRight =
true)
+    testBucketing(
+      bucketSpecLeft = bucketSpec1,
+      bucketSpecRight = bucketSpec2,
+      joinCondition = joinCondition(Seq("i")),
+      shuffleLeft = false,
+      shuffleRight = true
+    )
   }
 
   test("shuffle when join keys are not equal to bucket keys") {
     val bucketSpec = Some(BucketSpec(8, Seq("i"), Nil))
-    testBucketing(bucketSpec, bucketSpec, Seq("j"), shuffleLeft = true, shuffleRight = true)
+    testBucketing(
+      bucketSpecLeft = bucketSpec,
+      bucketSpecRight = bucketSpec,
+      joinCondition = joinCondition(Seq("j")),
+      shuffleLeft = true,
+      shuffleRight = true
+    )
   }
 
   test("shuffle when join 2 bucketed tables with bucketing disabled") {
     val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil))
     withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") {
-      testBucketing(bucketSpec, bucketSpec, Seq("i", "j"), shuffleLeft = true, shuffleRight
= true)
+      testBucketing(
+        bucketSpecLeft = bucketSpec,
+        bucketSpecRight = bucketSpec,
+        joinCondition = joinCondition(Seq("i", "j")),
+        shuffleLeft = true,
+        shuffleRight = true
+      )
     }
   }
 
@@ -348,6 +391,23 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
     }
   }
 
+  test("SPARK-17698 Join predicates should not contain filter clauses") {
+    val bucketSpec = Some(BucketSpec(8, Seq("i"), Seq("i")))
+    testBucketing(
+      bucketSpecLeft = bucketSpec,
+      bucketSpecRight = bucketSpec,
+      joinType = "fullouter",
+      joinCondition = (left: DataFrame, right: DataFrame) => {
+        val joinPredicates = left("i") === right("i")
+        val filterLeft = left("i") === Literal("1")
+        val filterRight = right("i") === Literal("1")
+        joinPredicates && filterLeft && filterRight
+      },
+      shuffleLeft = false,
+      shuffleRight = false
+    )
+  }
+
   test("error if there exists any malformed bucket files") {
     withTable("bucketed_table") {
       df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message