spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject spark git commit: [SPARK-11135] [SQL] Exchange incorrectly skips sorts when existing ordering is non-empty subset of required ordering
Date Fri, 16 Oct 2015 00:37:11 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 13920d5fe -> 7aaf485f3


[SPARK-11135] [SQL] Exchange incorrectly skips sorts when existing ordering is non-empty subset
of required ordering

In Spark SQL, the Exchange planner tries to avoid unnecessary sorts in cases where the data
has already been sorted by a superset of the requested sorting columns. For instance, let's
say that a query calls for an operator's input to be sorted by `a.asc` and the input happens
to already be sorted by `[a.asc, b.asc]`. In this case, we do not need to re-sort the input.
The converse, however, is not true: if the query calls for `[a.asc, b.asc]`, then `a.asc`
alone will not satisfy the ordering requirements, requiring an additional sort to be planned
by Exchange.

However, the current Exchange code gets this wrong and incorrectly skips sorting when the
existing output ordering is a subset of the required ordering. This is simple to fix, however.

This bug was introduced in https://github.com/apache/spark/pull/7458, so it affects 1.5.0+.

This patch fixes the bug and significantly improves the unit test coverage of Exchange's sort-planning
logic.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #9140 from JoshRosen/SPARK-11135.

(cherry picked from commit eb0b4d6e2ddfb765f082d0d88472626336ad2609)
Signed-off-by: Michael Armbrust <michael@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7aaf485f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7aaf485f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7aaf485f

Branch: refs/heads/branch-1.5
Commit: 7aaf485f3e278e975556d156317422d4ce073b2b
Parents: 13920d5
Author: Josh Rosen <joshrosen@databricks.com>
Authored: Thu Oct 15 17:36:55 2015 -0700
Committer: Michael Armbrust <michael@databricks.com>
Committed: Thu Oct 15 17:37:07 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/Exchange.scala   |  5 +-
 .../spark/sql/execution/PlannerSuite.scala      | 49 ++++++++++++++++++++
 2 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7aaf485f/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 029f226..894c254 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -212,6 +212,8 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends
Rule[
     val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
     val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering
     var children: Seq[SparkPlan] = operator.children
+    assert(requiredChildDistributions.length == children.length)
+    assert(requiredChildOrderings.length == children.length)
 
     // Ensure that the operator's children satisfy their output distribution requirements:
     children = children.zip(requiredChildDistributions).map { case (child, distribution)
=>
@@ -241,8 +243,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends
Rule[
     children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering)
=>
       if (requiredOrdering.nonEmpty) {
         // If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need
to sort.
-        val minSize = Seq(requiredOrdering.size, child.outputOrdering.size).min
-        if (minSize == 0 || requiredOrdering.take(minSize) != child.outputOrdering.take(minSize))
{
+        if (requiredOrdering != child.outputOrdering.take(requiredOrdering.length)) {
           sqlContext.planner.BasicOperators.getSortOperator(requiredOrdering, global = false,
child)
         } else {
           child

http://git-wip-us.apache.org/repos/asf/spark/blob/7aaf485f/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index fad93b0..b78baa8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -355,6 +355,55 @@ class PlannerSuite extends SparkFunSuite with SharedSQLContext {
     }
   }
 
+  test("EnsureRequirements adds sort when there is no existing ordering") {
+    val orderingA = SortOrder(Literal(1), Ascending)
+    val orderingB = SortOrder(Literal(2), Ascending)
+    assert(orderingA != orderingB)
+    val inputPlan = DummySparkPlan(
+      children = DummySparkPlan(outputOrdering = Seq.empty) :: Nil,
+      requiredChildOrdering = Seq(Seq(orderingB)),
+      requiredChildDistribution = Seq(UnspecifiedDistribution)
+    )
+    val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
+    assertDistributionRequirementsAreSatisfied(outputPlan)
+    if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => true }.isEmpty)
{
+      fail(s"Sort should have been added:\n$outputPlan")
+    }
+  }
+
+  test("EnsureRequirements skips sort when required ordering is prefix of existing ordering")
{
+    val orderingA = SortOrder(Literal(1), Ascending)
+    val orderingB = SortOrder(Literal(2), Ascending)
+    assert(orderingA != orderingB)
+    val inputPlan = DummySparkPlan(
+      children = DummySparkPlan(outputOrdering = Seq(orderingA, orderingB)) :: Nil,
+      requiredChildOrdering = Seq(Seq(orderingA)),
+      requiredChildDistribution = Seq(UnspecifiedDistribution)
+    )
+    val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
+    assertDistributionRequirementsAreSatisfied(outputPlan)
+    if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => true }.nonEmpty)
{
+      fail(s"No sorts should have been added:\n$outputPlan")
+    }
+  }
+
+  // This is a regression test for SPARK-11135
+  test("EnsureRequirements adds sort when required ordering isn't a prefix of existing ordering")
{
+    val orderingA = SortOrder(Literal(1), Ascending)
+    val orderingB = SortOrder(Literal(2), Ascending)
+    assert(orderingA != orderingB)
+    val inputPlan = DummySparkPlan(
+      children = DummySparkPlan(outputOrdering = Seq(orderingA)) :: Nil,
+      requiredChildOrdering = Seq(Seq(orderingA, orderingB)),
+      requiredChildDistribution = Seq(UnspecifiedDistribution)
+    )
+    val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
+    assertDistributionRequirementsAreSatisfied(outputPlan)
+    if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => true }.isEmpty)
{
+      fail(s"Sort should have been added:\n$outputPlan")
+    }
+  }
+
   // ---------------------------------------------------------------------------------------------
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message