spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-24495][SQL] EnsureRequirement returns wrong plan when reordering equal keys
Date Thu, 14 Jun 2018 16:20:47 GMT
Repository: spark
Updated Branches:
  refs/heads/master 534065efe -> fdadc4be0


[SPARK-24495][SQL] EnsureRequirement returns wrong plan when reordering equal keys

## What changes were proposed in this pull request?

`EnsureRequirement` in its `reorder` method currently assumes that the same key appears only
once in the join condition. This of course might not be the case, and when it is not satisfied,
it returns a wrong plan which produces a wrong result of the query.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21529 from mgaido91/SPARK-24495.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fdadc4be
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fdadc4be
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fdadc4be

Branch: refs/heads/master
Commit: fdadc4be08dcf1a06383bbb05e53540da2092c63
Parents: 534065e
Author: Marco Gaido <marcogaido91@gmail.com>
Authored: Thu Jun 14 09:20:41 2018 -0700
Committer: Xiao Li <gatorsmile@gmail.com>
Committed: Thu Jun 14 09:20:41 2018 -0700

----------------------------------------------------------------------
 .../execution/exchange/EnsureRequirements.scala    | 14 ++++++++++++--
 .../scala/org/apache/spark/sql/JoinSuite.scala     | 11 +++++++++++
 .../apache/spark/sql/execution/PlannerSuite.scala  | 17 +++++++++++++++++
 3 files changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fdadc4be/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index e3d2838..ad95879 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.exchange
 
+import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.catalyst.expressions._
@@ -227,9 +228,16 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan]
{
       currentOrderOfKeys: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
     val leftKeysBuffer = ArrayBuffer[Expression]()
     val rightKeysBuffer = ArrayBuffer[Expression]()
+    val pickedIndexes = mutable.Set[Int]()
+    val keysAndIndexes = currentOrderOfKeys.zipWithIndex
 
     expectedOrderOfKeys.foreach(expression => {
-      val index = currentOrderOfKeys.indexWhere(e => e.semanticEquals(expression))
+      val index = keysAndIndexes.find { case (e, idx) =>
+        // As we may have the same key used many times, we need to filter out its occurrence
we
+        // have already used.
+        e.semanticEquals(expression) && !pickedIndexes.contains(idx)
+      }.map(_._2).get
+      pickedIndexes += index
       leftKeysBuffer.append(leftKeys(index))
       rightKeysBuffer.append(rightKeys(index))
     })
@@ -270,7 +278,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
    * partitioning of the join nodes' children.
    */
   private def reorderJoinPredicates(plan: SparkPlan): SparkPlan = {
-    plan.transformUp {
+    plan match {
       case BroadcastHashJoinExec(leftKeys, rightKeys, joinType, buildSide, condition, left,
         right) =>
         val (reorderedLeftKeys, reorderedRightKeys) =
@@ -288,6 +296,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
         val (reorderedLeftKeys, reorderedRightKeys) =
           reorderJoinKeys(leftKeys, rightKeys, left.outputPartitioning, right.outputPartitioning)
         SortMergeJoinExec(reorderedLeftKeys, reorderedRightKeys, joinType, condition, left,
right)
+
+      case other => other
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fdadc4be/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 8fa7474..44767df 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -882,4 +882,15 @@ class JoinSuite extends QueryTest with SharedSQLContext {
       checkAnswer(df, Row(3, 8, 7, 2) :: Row(3, 8, 4, 2) :: Nil)
     }
   }
+
+  test("SPARK-24495: Join may return wrong result when having duplicated equal-join keys")
{
+    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1",
+      SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key -> "false",
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+      val df1 = spark.range(0, 100, 1, 2)
+      val df2 = spark.range(100).select($"id".as("b1"), (- $"id").as("b2"))
+      val res = df1.join(df2, $"id" === $"b1" && $"id" === $"b2").select($"b1", $"b2",
$"id")
+      checkAnswer(res, Row(0, 0, 0))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fdadc4be/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index ed0ff1b..37d4687 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -680,6 +680,23 @@ class PlannerSuite extends SharedSQLContext {
     assert(rangeExecInZeroPartition.head.outputPartitioning == UnknownPartitioning(0))
   }
 
+  test("SPARK-24495: EnsureRequirements can return wrong plan when reusing the same key in
join") {
+    val plan1 = DummySparkPlan(outputOrdering = Seq(orderingA),
+      outputPartitioning = HashPartitioning(exprA :: exprA :: Nil, 5))
+    val plan2 = DummySparkPlan(outputOrdering = Seq(orderingB),
+      outputPartitioning = HashPartitioning(exprB :: Nil, 5))
+    val smjExec = SortMergeJoinExec(
+      exprA :: exprA :: Nil, exprB :: exprC :: Nil, Inner, None, plan1, plan2)
+
+    val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(smjExec)
+    outputPlan match {
+      case SortMergeJoinExec(leftKeys, rightKeys, _, _, _, _) =>
+        assert(leftKeys == Seq(exprA, exprA))
+        assert(rightKeys == Seq(exprB, exprC))
+      case _ => fail()
+    }
+  }
+
   test("SPARK-24500: create union with stream of children") {
     val df = Union(Stream(
       Range(1, 1, 1, 1),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message