spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-23973][SQL] Remove consecutive Sorts
Date Tue, 24 Apr 2018 02:11:15 GMT
Repository: spark
Updated Branches:
  refs/heads/master 428b90385 -> 281c1ca0d


[SPARK-23973][SQL] Remove consecutive Sorts

## What changes were proposed in this pull request?

In SPARK-23375 we introduced the ability of removing `Sort` operation during query optimization
if the data is already sorted. In this follow-up we remove also a `Sort` which is followed
by another `Sort`: in this case the first sort is not needed and can be safely removed.

The PR starts from henryr's comment: https://github.com/apache/spark/pull/20560#discussion_r180601594.
So credit should be given to him.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21072 from mgaido91/SPARK-23973.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/281c1ca0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/281c1ca0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/281c1ca0

Branch: refs/heads/master
Commit: 281c1ca0dc96b0441a60c32df3d16fbb1c61e99f
Parents: 428b903
Author: Marco Gaido <marcogaido91@gmail.com>
Authored: Tue Apr 24 10:11:09 2018 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Tue Apr 24 10:11:09 2018 +0800

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 21 +++++++-
 .../optimizer/RemoveRedundantSortsSuite.scala   | 51 +++++++++++++++++---
 2 files changed, 63 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/281c1ca0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index f00d40d..45f1395 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -767,12 +767,29 @@ object EliminateSorts extends Rule[LogicalPlan] {
 }
 
 /**
- * Removes Sort operation if the child is already sorted
+ * Removes redundant Sort operation. This can happen:
+ * 1) if the child is already sorted
+ * 2) if there is another Sort operator separated by 0...n Project/Filter operators
  */
 object RemoveRedundantSorts extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
     case Sort(orders, true, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders)
=>
       child
+    case s @ Sort(_, _, child) => s.copy(child = recursiveRemoveSort(child))
+  }
+
+  def recursiveRemoveSort(plan: LogicalPlan): LogicalPlan = plan match {
+    case Sort(_, _, child) => recursiveRemoveSort(child)
+    case other if canEliminateSort(other) =>
+      other.withNewChildren(other.children.map(recursiveRemoveSort))
+    case _ => plan
+  }
+
+  def canEliminateSort(plan: LogicalPlan): Boolean = plan match {
+    case p: Project => p.projectList.forall(_.deterministic)
+    case f: Filter => f.condition.deterministic
+    case _: ResolvedHint => true
+    case _ => false
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/281c1ca0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala
index 2319ab8..dae5e6f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantSortsSuite.scala
@@ -17,16 +17,12 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE, ORDER_BY_ORDINAL}
 
 class RemoveRedundantSortsSuite extends PlanTest {
 
@@ -42,15 +38,15 @@ class RemoveRedundantSortsSuite extends PlanTest {
 
   test("remove redundant order by") {
     val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst)
-    val unnecessaryReordered = orderedPlan.select('a).orderBy('a.asc, 'b.desc_nullsFirst)
+    val unnecessaryReordered = orderedPlan.limit(2).select('a).orderBy('a.asc, 'b.desc_nullsFirst)
     val optimized = Optimize.execute(unnecessaryReordered.analyze)
-    val correctAnswer = orderedPlan.select('a).analyze
+    val correctAnswer = orderedPlan.limit(2).select('a).analyze
     comparePlans(Optimize.execute(optimized), correctAnswer)
   }
 
   test("do not remove sort if the order is different") {
     val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst)
-    val reorderedDifferently = orderedPlan.select('a).orderBy('a.asc, 'b.desc)
+    val reorderedDifferently = orderedPlan.limit(2).select('a).orderBy('a.asc, 'b.desc)
     val optimized = Optimize.execute(reorderedDifferently.analyze)
     val correctAnswer = reorderedDifferently.analyze
     comparePlans(optimized, correctAnswer)
@@ -72,6 +68,14 @@ class RemoveRedundantSortsSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
+  test("different sorts are not simplified if limit is in between") {
+    val orderedPlan = testRelation.select('a, 'b).orderBy('b.desc).limit(Literal(10))
+      .orderBy('a.asc)
+    val optimized = Optimize.execute(orderedPlan.analyze)
+    val correctAnswer = orderedPlan.analyze
+    comparePlans(optimized, correctAnswer)
+  }
+
   test("range is already sorted") {
     val inputPlan = Range(1L, 1000L, 1, 10)
     val orderedPlan = inputPlan.orderBy('id.asc)
@@ -98,4 +102,37 @@ class RemoveRedundantSortsSuite extends PlanTest {
     val correctAnswer = groupedAndResorted.analyze
     comparePlans(optimized, correctAnswer)
   }
+
+  test("remove two consecutive sorts") {
+    val orderedTwice = testRelation.orderBy('a.asc).orderBy('b.desc)
+    val optimized = Optimize.execute(orderedTwice.analyze)
+    val correctAnswer = testRelation.orderBy('b.desc).analyze
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("remove sorts separated by Filter/Project operators") {
+    val orderedTwiceWithProject = testRelation.orderBy('a.asc).select('b).orderBy('b.desc)
+    val optimizedWithProject = Optimize.execute(orderedTwiceWithProject.analyze)
+    val correctAnswerWithProject = testRelation.select('b).orderBy('b.desc).analyze
+    comparePlans(optimizedWithProject, correctAnswerWithProject)
+
+    val orderedTwiceWithFilter =
+      testRelation.orderBy('a.asc).where('b > Literal(0)).orderBy('b.desc)
+    val optimizedWithFilter = Optimize.execute(orderedTwiceWithFilter.analyze)
+    val correctAnswerWithFilter = testRelation.where('b > Literal(0)).orderBy('b.desc).analyze
+    comparePlans(optimizedWithFilter, correctAnswerWithFilter)
+
+    val orderedTwiceWithBoth =
+      testRelation.orderBy('a.asc).select('b).where('b > Literal(0)).orderBy('b.desc)
+    val optimizedWithBoth = Optimize.execute(orderedTwiceWithBoth.analyze)
+    val correctAnswerWithBoth =
+      testRelation.select('b).where('b > Literal(0)).orderBy('b.desc).analyze
+    comparePlans(optimizedWithBoth, correctAnswerWithBoth)
+
+    val orderedThrice = orderedTwiceWithBoth.select(('b + 1).as('c)).orderBy('c.asc)
+    val optimizedThrice = Optimize.execute(orderedThrice.analyze)
+    val correctAnswerThrice = testRelation.select('b).where('b > Literal(0))
+      .select(('b + 1).as('c)).orderBy('c.asc).analyze
+    comparePlans(optimizedThrice, correctAnswerThrice)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message