spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject spark git commit: [SPARK-13091][SQL] Rewrite/Propagate constraints for Aliases
Date Fri, 19 Feb 2016 22:48:37 GMT
Repository: spark
Updated Branches:
  refs/heads/master 14844118b -> 091f6a783


[SPARK-13091][SQL] Rewrite/Propagate constraints for Aliases

This PR adds support for rewriting constraints if there are aliases in the query plan. For
e.g., if there is a query of form `SELECT a, a AS b`, any constraints on `a` now also apply
to `b`.

JIRA: https://issues.apache.org/jira/browse/SPARK-13091

cc marmbrus

Author: Sameer Agarwal <sameer@databricks.com>

Closes #11144 from sameeragarwal/alias.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/091f6a78
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/091f6a78
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/091f6a78

Branch: refs/heads/master
Commit: 091f6a7830bbee01fa580fbb0336b9f4fcac0dfa
Parents: 1484411
Author: Sameer Agarwal <sameer@databricks.com>
Authored: Fri Feb 19 14:48:34 2016 -0800
Committer: Michael Armbrust <michael@databricks.com>
Committed: Fri Feb 19 14:48:34 2016 -0800

----------------------------------------------------------------------
 .../catalyst/plans/logical/basicOperators.scala | 20 ++++++++++++++++++++
 .../plans/ConstraintPropagationSuite.scala      | 20 +++++++++++++++++++-
 2 files changed, 39 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/091f6a78/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 502d898..7d155ac 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -50,6 +50,26 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
extend
 
     !expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions
   }
+
+  /**
+   * Generates an additional set of aliased constraints by replacing the original constraint
+   * expressions with the corresponding alias
+   */
+  private def getAliasedConstraints: Set[Expression] = {
+    projectList.flatMap {
+      case a @ Alias(e, _) =>
+        child.constraints.map(_ transform {
+          case expr: Expression if expr.semanticEquals(e) =>
+            a.toAttribute
+        }).union(Set(EqualNullSafe(e, a.toAttribute)))
+      case _ =>
+        Set.empty[Expression]
+    }.toSet
+  }
+
+  override def validConstraints: Set[Expression] = {
+    child.constraints.union(getAliasedConstraints)
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/091f6a78/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
index b5cf913..373b1ff 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
@@ -27,7 +27,10 @@ import org.apache.spark.sql.catalyst.plans.logical._
 class ConstraintPropagationSuite extends SparkFunSuite {
 
   private def resolveColumn(tr: LocalRelation, columnName: String): Expression =
-    tr.analyze.resolveQuoted(columnName, caseInsensitiveResolution).get
+    resolveColumn(tr.analyze, columnName)
+
+  private def resolveColumn(plan: LogicalPlan, columnName: String): Expression =
+    plan.resolveQuoted(columnName, caseInsensitiveResolution).get
 
   private def verifyConstraints(found: Set[Expression], expected: Set[Expression]): Unit
= {
     val missing = expected.filterNot(i => found.map(_.semanticEquals(i)).reduce(_ || _))
@@ -69,6 +72,21 @@ class ConstraintPropagationSuite extends SparkFunSuite {
         IsNotNull(resolveColumn(tr, "c"))))
   }
 
+  test("propagating constraints in aliases") {
+    val tr = LocalRelation('a.int, 'b.string, 'c.int)
+
+    assert(tr.where('c.attr > 10).select('a.as('x), 'b.as('y)).analyze.constraints.isEmpty)
+
+    val aliasedRelation = tr.where('a.attr > 10).select('a.as('x), 'b, 'b.as('y), 'a.as('z))
+
+    verifyConstraints(aliasedRelation.analyze.constraints,
+      Set(resolveColumn(aliasedRelation.analyze, "x") > 10,
+        IsNotNull(resolveColumn(aliasedRelation.analyze, "x")),
+        resolveColumn(aliasedRelation.analyze, "b") <=> resolveColumn(aliasedRelation.analyze,
"y"),
+        resolveColumn(aliasedRelation.analyze, "z") > 10,
+        IsNotNull(resolveColumn(aliasedRelation.analyze, "z"))))
+  }
+
   test("propagating constraints in union") {
     val tr1 = LocalRelation('a.int, 'b.int, 'c.int)
     val tr2 = LocalRelation('d.int, 'e.int, 'f.int)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message