spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject spark git commit: [SPARK-9082] [SQL] [FOLLOW-UP] use `partition` in `PushPredicateThroughProject`
Date Thu, 23 Jul 2015 16:38:50 GMT
Repository: spark
Updated Branches:
  refs/heads/master 26ed22aec -> 52ef76de2


[SPARK-9082] [SQL] [FOLLOW-UP] use `partition` in `PushPredicateThroughProject`

a follow up of https://github.com/apache/spark/pull/7446

Author: Wenchen Fan <cloud0fan@outlook.com>

Closes #7607 from cloud-fan/tmp and squashes the following commits:

7106989 [Wenchen Fan] use `partition` in `PushPredicateThroughProject`


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52ef76de
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52ef76de
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52ef76de

Branch: refs/heads/master
Commit: 52ef76de219c4bf19c54c99414b89a67d0bf457b
Parents: 26ed22a
Author: Wenchen Fan <cloud0fan@outlook.com>
Authored: Thu Jul 23 09:37:53 2015 -0700
Committer: Yin Huai <yhuai@databricks.com>
Committed: Thu Jul 23 09:38:02 2015 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 22 +++++++-------------
 1 file changed, 8 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/52ef76de/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d2db3dd..b59f800 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -553,33 +553,27 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelpe
       // Split the condition into small conditions by `And`, so that we can push down part
of this
       // condition without nondeterministic expressions.
       val andConditions = splitConjunctivePredicates(condition)
-      val nondeterministicConditions = andConditions.filter(hasNondeterministic(_, aliasMap))
+
+      val (deterministic, nondeterministic) = andConditions.partition(_.collect {
+        case a: Attribute if aliasMap.contains(a) => aliasMap(a)
+      }.forall(_.deterministic))
 
       // If there is no nondeterministic conditions, push down the whole condition.
-      if (nondeterministicConditions.isEmpty) {
+      if (nondeterministic.isEmpty) {
         project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
       } else {
         // If they are all nondeterministic conditions, leave it un-changed.
-        if (nondeterministicConditions.length == andConditions.length) {
+        if (deterministic.isEmpty) {
           filter
         } else {
-          val deterministicConditions = andConditions.filterNot(hasNondeterministic(_, aliasMap))
           // Push down the small conditions without nondeterministic expressions.
-          val pushedCondition = deterministicConditions.map(replaceAlias(_, aliasMap)).reduce(And)
-          Filter(nondeterministicConditions.reduce(And),
+          val pushedCondition = deterministic.map(replaceAlias(_, aliasMap)).reduce(And)
+          Filter(nondeterministic.reduce(And),
             project.copy(child = Filter(pushedCondition, grandChild)))
         }
       }
   }
 
-  private def hasNondeterministic(
-      condition: Expression,
-      sourceAliases: AttributeMap[Expression]) = {
-    condition.collect {
-      case a: Attribute if sourceAliases.contains(a) => sourceAliases(a)
-    }.exists(!_.deterministic)
-  }
-
   // Substitute any attributes that are produced by the child projection, so that we safely
   // eliminate it.
   private def replaceAlias(condition: Expression, sourceAliases: AttributeMap[Expression])
= {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message