spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-21647][SQL] Fix SortMergeJoin when using CROSS
Date Mon, 07 Aug 2017 16:00:10 GMT
Repository: spark
Updated Branches:
  refs/heads/master 8b69b17f3 -> bbfd6b5d2


[SPARK-21647][SQL] Fix SortMergeJoin when using CROSS

### What changes were proposed in this pull request?
author: BoleynSu
closes https://github.com/apache/spark/pull/18836

```Scala
val df = Seq((1, 1)).toDF("i", "j")
df.createOrReplaceTempView("T")
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
  sql("select * from (select a.i from T a cross join T t where t.i = a.i) as t1 " +
    "cross join T t2 where t2.i = t1.i").explain(true)
}
```
The above code could cause the following exception:
```
SortMergeJoinExec should not take Cross as the JoinType
java.lang.IllegalArgumentException: SortMergeJoinExec should not take Cross as the JoinType
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputOrdering(SortMergeJoinExec.scala:100)
```

Our SortMergeJoinExec supports CROSS. We should not hit such an exception. This PR is to fix
the issue.

### How was this patch tested?
Modified the two existing test cases.

Author: Xiao Li <gatorsmile@gmail.com>
Author: Boleyn Su <boleyn.su@gmail.com>

Closes #18863 from gatorsmile/pr-18836.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bbfd6b5d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bbfd6b5d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bbfd6b5d

Branch: refs/heads/master
Commit: bbfd6b5d24be5919a3ab1ac3eaec46e33201df39
Parents: 8b69b17
Author: Xiao Li <gatorsmile@gmail.com>
Authored: Tue Aug 8 00:00:01 2017 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Tue Aug 8 00:00:01 2017 +0800

----------------------------------------------------------------------
 .../sql/execution/joins/SortMergeJoinExec.scala |  2 +-
 .../spark/sql/execution/PlannerSuite.scala      | 36 +++++++++++---------
 2 files changed, 21 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bbfd6b5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 639b8e0..f41fa14 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -82,7 +82,7 @@ case class SortMergeJoinExec(
 
   override def outputOrdering: Seq[SortOrder] = joinType match {
     // For inner join, orders of both sides keys should be kept.
-    case Inner =>
+    case _: InnerLike =>
       val leftKeyOrdering = getKeyOrdering(leftKeys, left.outputOrdering)
       val rightKeyOrdering = getKeyOrdering(rightKeys, right.outputOrdering)
       leftKeyOrdering.zip(rightKeyOrdering).map { case (lKey, rKey) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/bbfd6b5d/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 4d155d5..63e17c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{execution, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
@@ -513,26 +513,30 @@ class PlannerSuite extends SharedSQLContext {
   }
 
   test("EnsureRequirements skips sort when either side of join keys is required after inner
SMJ") {
-    val innerSmj = SortMergeJoinExec(exprA :: Nil, exprB :: Nil, Inner, None, planA, planB)
-    // Both left and right keys should be sorted after the SMJ.
-    Seq(orderingA, orderingB).foreach { ordering =>
-      assertSortRequirementsAreSatisfied(
-        childPlan = innerSmj,
-        requiredOrdering = Seq(ordering),
-        shouldHaveSort = false)
+    Seq(Inner, Cross).foreach { joinType =>
+      val innerSmj = SortMergeJoinExec(exprA :: Nil, exprB :: Nil, joinType, None, planA,
planB)
+      // Both left and right keys should be sorted after the SMJ.
+      Seq(orderingA, orderingB).foreach { ordering =>
+        assertSortRequirementsAreSatisfied(
+          childPlan = innerSmj,
+          requiredOrdering = Seq(ordering),
+          shouldHaveSort = false)
+      }
     }
   }
 
   test("EnsureRequirements skips sort when key order of a parent SMJ is propagated from its
" +
     "child SMJ") {
-    val childSmj = SortMergeJoinExec(exprA :: Nil, exprB :: Nil, Inner, None, planA, planB)
-    val parentSmj = SortMergeJoinExec(exprB :: Nil, exprC :: Nil, Inner, None, childSmj,
planC)
-    // After the second SMJ, exprA, exprB and exprC should all be sorted.
-    Seq(orderingA, orderingB, orderingC).foreach { ordering =>
-      assertSortRequirementsAreSatisfied(
-        childPlan = parentSmj,
-        requiredOrdering = Seq(ordering),
-        shouldHaveSort = false)
+    Seq(Inner, Cross).foreach { joinType =>
+      val childSmj = SortMergeJoinExec(exprA :: Nil, exprB :: Nil, joinType, None, planA,
planB)
+      val parentSmj = SortMergeJoinExec(exprB :: Nil, exprC :: Nil, joinType, None, childSmj,
planC)
+      // After the second SMJ, exprA, exprB and exprC should all be sorted.
+      Seq(orderingA, orderingB, orderingC).foreach { ordering =>
+        assertSortRequirementsAreSatisfied(
+          childPlan = parentSmj,
+          requiredOrdering = Seq(ordering),
+          shouldHaveSort = false)
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message