spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject spark git commit: [SPARK-11973] [SQL] push filter through aggregation with alias and literals
Date Thu, 26 Nov 2015 08:19:48 GMT
Repository: spark
Updated Branches:
  refs/heads/master d3ef69332 -> 27d69a057


[SPARK-11973] [SQL] push filter through aggregation with alias and literals

Currently, filter can't be pushed through aggregation with alias or literals, this patch fix
that.

After this patch, the time of TPC-DS query 4 go down to 13 seconds from 141 seconds (10x improvements).

cc nongli  yhuai

Author: Davies Liu <davies@databricks.com>

Closes #9959 from davies/push_filter2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27d69a05
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27d69a05
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27d69a05

Branch: refs/heads/master
Commit: 27d69a0573ed55e916a464e268dcfd5ecc6ed849
Parents: d3ef693
Author: Davies Liu <davies@databricks.com>
Authored: Thu Nov 26 00:19:42 2015 -0800
Committer: Davies Liu <davies.liu@gmail.com>
Committed: Thu Nov 26 00:19:42 2015 -0800

----------------------------------------------------------------------
 .../sql/catalyst/expressions/predicates.scala   |  9 ++++
 .../sql/catalyst/optimizer/Optimizer.scala      | 28 +++++++----
 .../optimizer/FilterPushdownSuite.scala         | 53 ++++++++++++++++++++
 3 files changed, 79 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/27d69a05/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 6855747..304b438 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -65,6 +65,15 @@ trait PredicateHelper {
     }
   }
 
+  // Substitute any known alias from a map.
+  protected def replaceAlias(
+      condition: Expression,
+      aliases: AttributeMap[Expression]): Expression = {
+    condition.transform {
+      case a: Attribute => aliases.getOrElse(a, a)
+    }
+  }
+
   /**
    * Returns true if `expr` can be evaluated using only the output of `plan`.  This method
    * can be used to determine when it is acceptable to move expression evaluation within
a query

http://git-wip-us.apache.org/repos/asf/spark/blob/27d69a05/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index f4dba67..52f609b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -640,20 +640,14 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelpe
           filter
         } else {
           // Push down the small conditions without nondeterministic expressions.
-          val pushedCondition = deterministic.map(replaceAlias(_, aliasMap)).reduce(And)
+          val pushedCondition =
+            deterministic.map(replaceAlias(_, aliasMap)).reduce(And)
           Filter(nondeterministic.reduce(And),
             project.copy(child = Filter(pushedCondition, grandChild)))
         }
       }
   }
 
-  // Substitute any attributes that are produced by the child projection, so that we safely
-  // eliminate it.
-  private def replaceAlias(condition: Expression, sourceAliases: AttributeMap[Expression])
= {
-    condition.transform {
-      case a: Attribute => sourceAliases.getOrElse(a, a)
-    }
-  }
 }
 
 /**
@@ -690,12 +684,24 @@ object PushPredicateThroughAggregate extends Rule[LogicalPlan] with
PredicateHel
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case filter @ Filter(condition,
         aggregate @ Aggregate(groupingExpressions, aggregateExpressions, grandChild)) =>
-      val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition {
-        conjunct => conjunct.references subsetOf AttributeSet(groupingExpressions)
+
+      def hasAggregate(expression: Expression): Boolean = expression match {
+        case agg: AggregateExpression => true
+        case other => expression.children.exists(hasAggregate)
+      }
+      // Create a map of Alias for expressions that does not have AggregateExpression
+      val aliasMap = AttributeMap(aggregateExpressions.collect {
+        case a: Alias if !hasAggregate(a.child) => (a.toAttribute, a.child)
+      })
+
+      val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { conjunct
=>
+        val replaced = replaceAlias(conjunct, aliasMap)
+        replaced.references.subsetOf(grandChild.outputSet) && replaced.deterministic
       }
       if (pushDown.nonEmpty) {
         val pushDownPredicate = pushDown.reduce(And)
-        val withPushdown = aggregate.copy(child = Filter(pushDownPredicate, grandChild))
+        val replaced = replaceAlias(pushDownPredicate, aliasMap)
+        val withPushdown = aggregate.copy(child = Filter(replaced, grandChild))
         stayUp.reduceOption(And).map(Filter(_, withPushdown)).getOrElse(withPushdown)
       } else {
         filter

http://git-wip-us.apache.org/repos/asf/spark/blob/27d69a05/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 0290faf..0128c22 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -697,4 +697,57 @@ class FilterPushdownSuite extends PlanTest {
 
     comparePlans(optimized, correctAnswer)
   }
+
+  test("aggregate: push down filters with alias") {
+    val originalQuery = testRelation
+      .select('a, 'b)
+      .groupBy('a)(('a + 1) as 'aa, count('b) as 'c)
+      .where(('c === 2L || 'aa > 4) && 'aa < 3)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val correctAnswer = testRelation
+      .select('a, 'b)
+      .where('a + 1 < 3)
+      .groupBy('a)(('a + 1) as 'aa, count('b) as 'c)
+      .where('c === 2L || 'aa > 4)
+      .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("aggregate: push down filters with literal") {
+    val originalQuery = testRelation
+      .select('a, 'b)
+      .groupBy('a)('a, count('b) as 'c, "s" as 'd)
+      .where('c === 2L && 'd === "s")
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val correctAnswer = testRelation
+      .select('a, 'b)
+      .where("s" === "s")
+      .groupBy('a)('a, count('b) as 'c, "s" as 'd)
+      .where('c === 2L)
+      .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("aggregate: don't push down filters which is nondeterministic") {
+    val originalQuery = testRelation
+      .select('a, 'b)
+      .groupBy('a)('a + Rand(10) as 'aa, count('b) as 'c, Rand(11).as("rnd"))
+      .where('c === 2L && 'aa + Rand(10).as("rnd") === 3 && 'rnd === 5)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    val correctAnswer = testRelation
+      .select('a, 'b)
+      .groupBy('a)('a + Rand(10) as 'aa, count('b) as 'c, Rand(11).as("rnd"))
+      .where('c === 2L && 'aa + Rand(10).as("rnd") === 3 && 'rnd === 5)
+      .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message