spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject spark git commit: [SPARK-7303] [SQL] push down project if possible when the child is sort
Date Wed, 13 May 2015 23:14:07 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 51230f2a9 -> d5c52d9ac


[SPARK-7303] [SQL] push down project if possible when the child is sort

Optimize the case of `project(_, sort)` , a example is:

`select key from (select * from testData order by key) t`

before this PR:
```
== Parsed Logical Plan ==
'Project ['key]
 'Subquery t
  'Sort ['key ASC], true
   'Project [*]
    'UnresolvedRelation [testData], None

== Analyzed Logical Plan ==
Project [key#0]
 Subquery t
  Sort [key#0 ASC], true
   Project [key#0,value#1]
    Subquery testData
     LogicalRDD [key#0,value#1], MapPartitionsRDD[1]

== Optimized Logical Plan ==
Project [key#0]
 Sort [key#0 ASC], true
  LogicalRDD [key#0,value#1], MapPartitionsRDD[1]

== Physical Plan ==
Project [key#0]
 Sort [key#0 ASC], true
  Exchange (RangePartitioning [key#0 ASC], 5), []
   PhysicalRDD [key#0,value#1], MapPartitionsRDD[1]
```

after this PR
```
== Parsed Logical Plan ==
'Project ['key]
 'Subquery t
  'Sort ['key ASC], true
   'Project [*]
    'UnresolvedRelation [testData], None

== Analyzed Logical Plan ==
Project [key#0]
 Subquery t
  Sort [key#0 ASC], true
   Project [key#0,value#1]
    Subquery testData
     LogicalRDD [key#0,value#1], MapPartitionsRDD[1]

== Optimized Logical Plan ==
Sort [key#0 ASC], true
 Project [key#0]
  LogicalRDD [key#0,value#1], MapPartitionsRDD[1]

== Physical Plan ==
Sort [key#0 ASC], true
 Exchange (RangePartitioning [key#0 ASC], 5), []
  Project [key#0]
   PhysicalRDD [key#0,value#1], MapPartitionsRDD[1]
```

with this rule we will first do column pruning on the table and then do sorting.

Author: scwf <wangfei1@huawei.com>

This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <michael@databricks.com>

Closes #5838 from scwf/pruning and squashes the following commits:

b00d833 [scwf] address michael's comment
e230155 [scwf] fix tests failure
b09b895 [scwf] improve column pruning

(cherry picked from commit 59250fe51486908f9e3f3d9ef10aadbcb9b4d62d)
Signed-off-by: Michael Armbrust <michael@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5c52d9a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5c52d9a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5c52d9a

Branch: refs/heads/branch-1.4
Commit: d5c52d9ac1307522a5ed6ccb5d30a01c5d737c67
Parents: 51230f2
Author: scwf <wangfei1@huawei.com>
Authored: Wed May 13 16:13:48 2015 -0700
Committer: Michael Armbrust <michael@databricks.com>
Committed: Wed May 13 16:14:01 2015 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      |  5 +++
 .../optimizer/FilterPushdownSuite.scala         | 36 +++++++++++++++++++-
 2 files changed, 40 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d5c52d9a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index b163707..c2818d9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -156,6 +156,11 @@ object ColumnPruning extends Rule[LogicalPlan] {
     case Project(projectList, Limit(exp, child)) =>
       Limit(exp, Project(projectList, child))
 
+    // push down project if possible when the child is sort
+    case p @ Project(projectList, s @ Sort(_, _, grandChild))
+      if s.references.subsetOf(p.outputSet) =>
+      s.copy(child = Project(projectList, grandChild))
+
     // Eliminate no-op Projects
     case Project(projectList, child) if child.output == projectList => child
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d5c52d9a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 0c428f7..be33cb9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.sql.catalyst.analysis
 import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
-import org.apache.spark.sql.catalyst.expressions.{Count, Explode}
+import org.apache.spark.sql.catalyst.expressions.{SortOrder, Ascending, Count, Explode}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.rules._
@@ -542,4 +542,38 @@ class FilterPushdownSuite extends PlanTest {
 
     comparePlans(optimized, originalQuery)
   }
+
+  test("push down project past sort") {
+    val x = testRelation.subquery('x)
+
+    // push down valid
+    val originalQuery = {
+      x.select('a, 'b)
+       .sortBy(SortOrder('a, Ascending))
+       .select('a)
+    }
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val correctAnswer =
+      x.select('a)
+       .sortBy(SortOrder('a, Ascending)).analyze
+
+    comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer))
+
+    // push down invalid
+    val originalQuery1 = {
+      x.select('a, 'b)
+        .sortBy(SortOrder('a, Ascending))
+        .select('b)
+    }
+
+    val optimized1 = Optimize.execute(originalQuery1.analyze)
+    val correctAnswer1 =
+      x.select('a, 'b)
+        .sortBy(SortOrder('a, Ascending))
+        .select('b).analyze
+
+    comparePlans(optimized1, analysis.EliminateSubQueries(correctAnswer1))
+
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message