spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-23564][SQL] infer additional filters from constraints for join's children
Date Mon, 23 Apr 2018 12:21:05 GMT
Repository: spark
Updated Branches:
  refs/heads/master f70f46d1e -> d87d30e4f


[SPARK-23564][SQL] infer additional filters from constraints for join's children

## What changes were proposed in this pull request?

The existing query constraints framework has 2 steps:
1. propagate constraints bottom up.
2. use constraints to infer additional filters for better data pruning.

For step 2, it mostly helps with Join, because we can connect the constraints from children
to the join condition and infer powerful filters to prune the data of the join sides. e.g.,
the left side has constraints `a = 1`, the join condition is `left.a = right.a`, then we can
infer `right.a = 1` to the right side and prune the right side a lot.

However, the current logic of inferring filters from constraints for Join is pretty weak.
It infers the filters from Join's constraints. Some joins like left semi/anti exclude output
from right side and the right side constraints will be lost here.

This PR propose to check the left and right constraints individually, expand the constraints
with join condition and add filters to children of join directly, instead of adding to the
join condition.

This reverts https://github.com/apache/spark/pull/20670 , covers https://github.com/apache/spark/pull/20717
and https://github.com/apache/spark/pull/20816

This is inspired by the original PRs and the tests are all from these PRs. Thanks to the authors
mgaido91 maryannxue KaiXinXiaoLei !

## How was this patch tested?

new tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21083 from cloud-fan/join.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d87d30e4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d87d30e4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d87d30e4

Branch: refs/heads/master
Commit: d87d30e4fe9c9e91c462351e9f744a830db8d6fc
Parents: f70f46d
Author: Wenchen Fan <wenchen@databricks.com>
Authored: Mon Apr 23 20:21:01 2018 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Mon Apr 23 20:21:01 2018 +0800

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 97 ++++++++++----------
 .../plans/logical/QueryPlanConstraints.scala    | 95 ++++++++-----------
 .../InferFiltersFromConstraintsSuite.scala      | 53 ++++++++---
 3 files changed, 124 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d87d30e4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 913354e..f00d40d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -637,13 +637,11 @@ object CollapseWindow extends Rule[LogicalPlan] {
  * constraints. These filters are currently inserted to the existing conditions in the Filter
  * operators and on either side of Join operators.
  *
- * In addition, for left/right outer joins, infer predicate from the preserved side of the
Join
- * operator and push the inferred filter over to the null-supplying side. For example, if
the
- * preserved side has constraints of the form 'a > 5' and the join condition is 'a = b',
in
- * which 'b' is an attribute from the null-supplying side, a [[Filter]] operator of 'b >
5' will
- * be applied to the null-supplying side.
+ * Note: While this optimization is applicable to a lot of types of join, it primarily benefits
+ * Inner and LeftSemi joins.
  */
-object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
+object InferFiltersFromConstraints extends Rule[LogicalPlan]
+  with PredicateHelper with ConstraintHelper {
 
   def apply(plan: LogicalPlan): LogicalPlan = {
     if (SQLConf.get.constraintPropagationEnabled) {
@@ -664,53 +662,52 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
       }
 
     case join @ Join(left, right, joinType, conditionOpt) =>
-      // Only consider constraints that can be pushed down completely to either the left
or the
-      // right child
-      val constraints = join.allConstraints.filter { c =>
-        c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)
-      }
-      // Remove those constraints that are already enforced by either the left or the right
child
-      val additionalConstraints = constraints -- (left.constraints ++ right.constraints)
-      val newConditionOpt = conditionOpt match {
-        case Some(condition) =>
-          val newFilters = additionalConstraints -- splitConjunctivePredicates(condition)
-          if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt
-        case None =>
-          additionalConstraints.reduceOption(And)
-      }
-      // Infer filter for left/right outer joins
-      val newLeftOpt = joinType match {
-        case RightOuter if newConditionOpt.isDefined =>
-          val inferredConstraints = left.getRelevantConstraints(
-            left.constraints
-              .union(right.constraints)
-              .union(splitConjunctivePredicates(newConditionOpt.get).toSet))
-          val newFilters = inferredConstraints
-            .filterNot(left.constraints.contains)
-            .reduceLeftOption(And)
-          newFilters.map(Filter(_, left))
-        case _ => None
-      }
-      val newRightOpt = joinType match {
-        case LeftOuter if newConditionOpt.isDefined =>
-          val inferredConstraints = right.getRelevantConstraints(
-            right.constraints
-              .union(left.constraints)
-              .union(splitConjunctivePredicates(newConditionOpt.get).toSet))
-          val newFilters = inferredConstraints
-            .filterNot(right.constraints.contains)
-            .reduceLeftOption(And)
-          newFilters.map(Filter(_, right))
-        case _ => None
-      }
+      joinType match {
+        // For inner join, we can infer additional filters for both sides. LeftSemi is kind
of an
+        // inner join, it just drops the right side in the final output.
+        case _: InnerLike | LeftSemi =>
+          val allConstraints = getAllConstraints(left, right, conditionOpt)
+          val newLeft = inferNewFilter(left, allConstraints)
+          val newRight = inferNewFilter(right, allConstraints)
+          join.copy(left = newLeft, right = newRight)
 
-      if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt))
-        || newLeftOpt.isDefined || newRightOpt.isDefined) {
-        Join(newLeftOpt.getOrElse(left), newRightOpt.getOrElse(right), joinType, newConditionOpt)
-      } else {
-        join
+        // For right outer join, we can only infer additional filters for left side.
+        case RightOuter =>
+          val allConstraints = getAllConstraints(left, right, conditionOpt)
+          val newLeft = inferNewFilter(left, allConstraints)
+          join.copy(left = newLeft)
+
+        // For left join, we can only infer additional filters for right side.
+        case LeftOuter | LeftAnti =>
+          val allConstraints = getAllConstraints(left, right, conditionOpt)
+          val newRight = inferNewFilter(right, allConstraints)
+          join.copy(right = newRight)
+
+        case _ => join
       }
   }
+
+  private def getAllConstraints(
+      left: LogicalPlan,
+      right: LogicalPlan,
+      conditionOpt: Option[Expression]): Set[Expression] = {
+    val baseConstraints = left.constraints.union(right.constraints)
+      .union(conditionOpt.map(splitConjunctivePredicates).getOrElse(Nil).toSet)
+    baseConstraints.union(inferAdditionalConstraints(baseConstraints))
+  }
+
+  private def inferNewFilter(plan: LogicalPlan, constraints: Set[Expression]): LogicalPlan
= {
+    val newPredicates = constraints
+      .union(constructIsNotNullConstraints(constraints, plan.output))
+      .filter { c =>
+        c.references.nonEmpty && c.references.subsetOf(plan.outputSet) &&
c.deterministic
+      } -- plan.constraints
+    if (newPredicates.isEmpty) {
+      plan
+    } else {
+      Filter(newPredicates.reduce(And), plan)
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d87d30e4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
index a29f3d2..cc352c5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
@@ -20,30 +20,29 @@ package org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.expressions._
 
 
-trait QueryPlanConstraints { self: LogicalPlan =>
+trait QueryPlanConstraints extends ConstraintHelper { self: LogicalPlan =>
 
   /**
-   * An [[ExpressionSet]] that contains an additional set of constraints, such as equality
-   * constraints and `isNotNull` constraints, etc.
+   * An [[ExpressionSet]] that contains invariants about the rows output by this operator.
For
+   * example, if this set contains the expression `a = 2` then that expression is guaranteed
to
+   * evaluate to `true` for all rows produced.
    */
-  lazy val allConstraints: ExpressionSet = {
+  lazy val constraints: ExpressionSet = {
     if (conf.constraintPropagationEnabled) {
-      ExpressionSet(validConstraints
-        .union(inferAdditionalConstraints(validConstraints))
-        .union(constructIsNotNullConstraints(validConstraints)))
+      ExpressionSet(
+        validConstraints
+          .union(inferAdditionalConstraints(validConstraints))
+          .union(constructIsNotNullConstraints(validConstraints, output))
+          .filter { c =>
+            c.references.nonEmpty && c.references.subsetOf(outputSet) &&
c.deterministic
+          }
+      )
     } else {
       ExpressionSet(Set.empty)
     }
   }
 
   /**
-   * An [[ExpressionSet]] that contains invariants about the rows output by this operator.
For
-   * example, if this set contains the expression `a = 2` then that expression is guaranteed
to
-   * evaluate to `true` for all rows produced.
-   */
-  lazy val constraints: ExpressionSet = ExpressionSet(allConstraints.filter(selfReferenceOnly))
-
-  /**
    * This method can be overridden by any child class of QueryPlan to specify a set of constraints
    * based on the given operator's constraint propagation logic. These constraints are then
    * canonicalized and filtered automatically to contain only those attributes that appear
in the
@@ -52,30 +51,42 @@ trait QueryPlanConstraints { self: LogicalPlan =>
    * See [[Canonicalize]] for more details.
    */
   protected def validConstraints: Set[Expression] = Set.empty
+}
+
+trait ConstraintHelper {
 
   /**
-   * Returns an [[ExpressionSet]] that contains an additional set of constraints, such as
-   * equality constraints and `isNotNull` constraints, etc., and that only contains references
-   * to this [[LogicalPlan]] node.
+   * Infers an additional set of constraints from a given set of equality constraints.
+   * For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), this returns
an
+   * additional constraint of the form `b = 5`.
    */
-  def getRelevantConstraints(constraints: Set[Expression]): ExpressionSet = {
-    val allRelevantConstraints =
-      if (conf.constraintPropagationEnabled) {
-        constraints
-          .union(inferAdditionalConstraints(constraints))
-          .union(constructIsNotNullConstraints(constraints))
-      } else {
-        constraints
-      }
-    ExpressionSet(allRelevantConstraints.filter(selfReferenceOnly))
+  def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] = {
+    var inferredConstraints = Set.empty[Expression]
+    constraints.foreach {
+      case eq @ EqualTo(l: Attribute, r: Attribute) =>
+        val candidateConstraints = constraints - eq
+        inferredConstraints ++= replaceConstraints(candidateConstraints, l, r)
+        inferredConstraints ++= replaceConstraints(candidateConstraints, r, l)
+      case _ => // No inference
+    }
+    inferredConstraints -- constraints
   }
 
+  private def replaceConstraints(
+      constraints: Set[Expression],
+      source: Expression,
+      destination: Attribute): Set[Expression] = constraints.map(_ transform {
+    case e: Expression if e.semanticEquals(source) => destination
+  })
+
   /**
    * Infers a set of `isNotNull` constraints from null intolerant expressions as well as
    * non-nullable attributes. For e.g., if an expression is of the form (`a > 5`), this
    * returns a constraint of the form `isNotNull(a)`
    */
-  private def constructIsNotNullConstraints(constraints: Set[Expression]): Set[Expression]
= {
+  def constructIsNotNullConstraints(
+      constraints: Set[Expression],
+      output: Seq[Attribute]): Set[Expression] = {
     // First, we propagate constraints from the null intolerant expressions.
     var isNotNullConstraints: Set[Expression] = constraints.flatMap(inferIsNotNullConstraints)
 
@@ -111,32 +122,4 @@ trait QueryPlanConstraints { self: LogicalPlan =>
     case _: NullIntolerant => expr.children.flatMap(scanNullIntolerantAttribute)
     case _ => Seq.empty[Attribute]
   }
-
-  /**
-   * Infers an additional set of constraints from a given set of equality constraints.
-   * For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), this returns
an
-   * additional constraint of the form `b = 5`.
-   */
-  private def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] =
{
-    var inferredConstraints = Set.empty[Expression]
-    constraints.foreach {
-      case eq @ EqualTo(l: Attribute, r: Attribute) =>
-        val candidateConstraints = constraints - eq
-        inferredConstraints ++= replaceConstraints(candidateConstraints, l, r)
-        inferredConstraints ++= replaceConstraints(candidateConstraints, r, l)
-      case _ => // No inference
-    }
-    inferredConstraints -- constraints
-  }
-
-  private def replaceConstraints(
-      constraints: Set[Expression],
-      source: Expression,
-      destination: Attribute): Set[Expression] = constraints.map(_ transform {
-    case e: Expression if e.semanticEquals(source) => destination
-  })
-
-  private def selfReferenceOnly(e: Expression): Boolean = {
-    e.references.nonEmpty && e.references.subsetOf(outputSet) && e.deterministic
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d87d30e4/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
index e068f51..e4671f0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
@@ -35,11 +35,25 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
         InferFiltersFromConstraints,
         CombineFilters,
         SimplifyBinaryComparison,
-        BooleanSimplification) :: Nil
+        BooleanSimplification,
+        PruneFilters) :: Nil
   }
 
   val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
 
+  private def testConstraintsAfterJoin(
+      x: LogicalPlan,
+      y: LogicalPlan,
+      expectedLeft: LogicalPlan,
+      expectedRight: LogicalPlan,
+      joinType: JoinType) = {
+    val condition = Some("x.a".attr === "y.a".attr)
+    val originalQuery = x.join(y, joinType, condition).analyze
+    val correctAnswer = expectedLeft.join(expectedRight, joinType, condition).analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+
   test("filter: filter out constraints in condition") {
     val originalQuery = testRelation.where('a === 1 && 'a === 'b).analyze
     val correctAnswer = testRelation
@@ -196,13 +210,7 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
   test("SPARK-23405: left-semi equal-join should filter out null join keys on both sides")
{
     val x = testRelation.subquery('x)
     val y = testRelation.subquery('y)
-    val condition = Some("x.a".attr === "y.a".attr)
-    val originalQuery = x.join(y, LeftSemi, condition).analyze
-    val left = x.where(IsNotNull('a))
-    val right = y.where(IsNotNull('a))
-    val correctAnswer = left.join(right, LeftSemi, condition).analyze
-    val optimized = Optimize.execute(originalQuery)
-    comparePlans(optimized, correctAnswer)
+    testConstraintsAfterJoin(x, y, x.where(IsNotNull('a)), y.where(IsNotNull('a)), LeftSemi)
   }
 
   test("SPARK-21479: Outer join after-join filters push down to null-supplying side") {
@@ -232,12 +240,27 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
   test("SPARK-21479: Outer join no filter push down to preserved side") {
     val x = testRelation.subquery('x)
     val y = testRelation.subquery('y)
-    val condition = Some("x.a".attr === "y.a".attr)
-    val originalQuery = x.join(y.where("y.a".attr === 1), LeftOuter, condition).analyze
-    val left = x
-    val right = y.where(IsNotNull('a) && 'a === 1)
-    val correctAnswer = left.join(right, LeftOuter, condition).analyze
-    val optimized = Optimize.execute(originalQuery)
-    comparePlans(optimized, correctAnswer)
+    testConstraintsAfterJoin(
+      x, y.where("a".attr === 1),
+      x, y.where(IsNotNull('a) && 'a === 1),
+      LeftOuter)
+  }
+
+  test("SPARK-23564: left anti join should filter out null join keys on right side") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+    testConstraintsAfterJoin(x, y, x, y.where(IsNotNull('a)), LeftAnti)
+  }
+
+  test("SPARK-23564: left outer join should filter out null join keys on right side") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+    testConstraintsAfterJoin(x, y, x, y.where(IsNotNull('a)), LeftOuter)
+  }
+
+  test("SPARK-23564: right outer join should filter out null join keys on left side") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+    testConstraintsAfterJoin(x, y, x.where(IsNotNull('a)), y, RightOuter)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message