spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hvanhov...@apache.org
Subject spark git commit: [SPARK-18582][SQL] Whitelist LogicalPlan operators allowed in correlated subqueries
Date Sat, 03 Dec 2016 19:36:45 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 28ea432a2 -> b098b4845


[SPARK-18582][SQL] Whitelist LogicalPlan operators allowed in correlated subqueries

## What changes were proposed in this pull request?

This fix puts an explicit list of operators that Spark supports for correlated subqueries.

## How was this patch tested?

Run sql/test, catalyst/test and add a new test case on Generate.

Author: Nattavut Sutyanyong <nsy.can@gmail.com>

Closes #16046 from nsyca/spark18455.0.

(cherry picked from commit 4a3c09601ba69f7d49d1946bb6f20f5cfe453031)
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b098b484
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b098b484
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b098b484

Branch: refs/heads/branch-2.1
Commit: b098b4845c557a3139c76caa0377c3049b6fe8aa
Parents: 28ea432
Author: Nattavut Sutyanyong <nsy.can@gmail.com>
Authored: Sat Dec 3 11:36:26 2016 -0800
Committer: Herman van Hovell <hvanhovell@databricks.com>
Committed: Sat Dec 3 11:36:39 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 158 +++++++++++++------
 .../sql/catalyst/optimizer/Optimizer.scala      |   2 +-
 .../catalyst/analysis/AnalysisErrorSuite.scala  |   4 +-
 .../org/apache/spark/sql/SubquerySuite.scala    |  18 +++
 4 files changed, 129 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b098b484/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index fec42ee..f738ae8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -952,24 +952,24 @@ class Analyzer(
     private def pullOutCorrelatedPredicates(sub: LogicalPlan): (LogicalPlan, Seq[Expression])
= {
       val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]]
 
-      /** Make sure a plans' subtree does not contain a tagged predicate. */
-      def failOnOuterReferenceInSubTree(p: LogicalPlan, msg: String): Unit = {
-        if (p.collect(predicateMap).nonEmpty) {
-          failAnalysis(s"Accessing outer query column is not allowed in $msg: $p")
+      // Make sure a plan's subtree does not contain outer references
+      def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = {
+        if (p.collectFirst(predicateMap).nonEmpty) {
+          failAnalysis(s"Accessing outer query column is not allowed in:\n$p")
         }
       }
 
-      /** Helper function for locating outer references. */
+      // Helper function for locating outer references.
       def containsOuter(e: Expression): Boolean = {
         e.find(_.isInstanceOf[OuterReference]).isDefined
       }
 
-      /** Make sure a plans' expressions do not contain a tagged predicate. */
+      // Make sure a plan's expressions do not contain outer references
       def failOnOuterReference(p: LogicalPlan): Unit = {
         if (p.expressions.exists(containsOuter)) {
           failAnalysis(
             "Expressions referencing the outer query are not supported outside of WHERE/HAVING
" +
-              s"clauses: $p")
+              s"clauses:\n$p")
         }
       }
 
@@ -1018,10 +1018,51 @@ class Analyzer(
 
       // Simplify the predicates before pulling them out.
       val transformed = BooleanSimplification(sub) transformUp {
-        // WARNING:
-        // Only Filter can host correlated expressions at this time
-        // Anyone adding a new "case" below needs to add the call to
-        // "failOnOuterReference" to disallow correlated expressions in it.
+
+        // Whitelist operators allowed in a correlated subquery
+        // There are 4 categories:
+        // 1. Operators that are allowed anywhere in a correlated subquery, and,
+        //    by definition of the operators, they either do not contain
+        //    any columns or cannot host outer references.
+        // 2. Operators that are allowed anywhere in a correlated subquery
+        //    so long as they do not host outer references.
+        // 3. Operators that need special handlings. These operators are
+        //    Project, Filter, Join, Aggregate, and Generate.
+        //
+        // Any operators that are not in the above list are allowed
+        // in a correlated subquery only if they are not on a correlation path.
+        // In other word, these operators are allowed only under a correlation point.
+        //
+        // A correlation path is defined as the sub-tree of all the operators that
+        // are on the path from the operator hosting the correlated expressions
+        // up to the operator producing the correlated values.
+
+        // Category 1:
+        // BroadcastHint, Distinct, LeafNode, Repartition, and SubqueryAlias
+        case p: BroadcastHint =>
+          p
+        case p: Distinct =>
+          p
+        case p: LeafNode =>
+          p
+        case p: Repartition =>
+          p
+        case p: SubqueryAlias =>
+          p
+
+        // Category 2:
+        // These operators can be anywhere in a correlated subquery.
+        // so long as they do not host outer references in the operators.
+        case p: Sort =>
+          failOnOuterReference(p)
+          p
+        case p: RedistributeData =>
+          failOnOuterReference(p)
+          p
+
+        // Category 3:
+        // Filter is one of the two operators allowed to host correlated expressions.
+        // The other operator is Join. Filter can be anywhere in a correlated subquery.
         case f @ Filter(cond, child) =>
           // Find all predicates with an outer reference.
           val (correlated, local) = splitConjunctivePredicates(cond).partition(containsOuter)
@@ -1043,14 +1084,24 @@ class Analyzer(
               predicateMap += child -> xs
               child
           }
+
+        // Project cannot host any correlated expressions
+        // but can be anywhere in a correlated subquery.
         case p @ Project(expressions, child) =>
           failOnOuterReference(p)
+
           val referencesToAdd = missingReferences(p)
           if (referencesToAdd.nonEmpty) {
             Project(expressions ++ referencesToAdd, child)
           } else {
             p
           }
+
+        // Aggregate cannot host any correlated expressions
+        // It can be on a correlation path if the correlation contains
+        // only equality correlated predicates.
+        // It cannot be on a correlation path if the correlation has
+        // non-equality correlated predicates.
         case a @ Aggregate(grouping, expressions, child) =>
           failOnOuterReference(a)
           failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, a)
@@ -1061,48 +1112,55 @@ class Analyzer(
           } else {
             a
           }
-        case w : Window =>
-          failOnOuterReference(w)
-          failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, w)
-          w
-        case j @ Join(left, _, RightOuter, _) =>
-          failOnOuterReference(j)
-          failOnOuterReferenceInSubTree(left, "a RIGHT OUTER JOIN")
-          j
-        // SPARK-18578: Do not allow any correlated predicate
-        // in a Full (Outer) Join operator and its descendants
-        case j @ Join(_, _, FullOuter, _) =>
-          failOnOuterReferenceInSubTree(j, "a FULL OUTER JOIN")
-          j
-        case j @ Join(_, right, jt, _) if !jt.isInstanceOf[InnerLike] =>
-          failOnOuterReference(j)
-          failOnOuterReferenceInSubTree(right, "a LEFT (OUTER) JOIN")
+
+        // Join can host correlated expressions.
+        case j @ Join(left, right, joinType, _) =>
+          joinType match {
+            // Inner join, like Filter, can be anywhere.
+            case _: InnerLike =>
+              failOnOuterReference(j)
+
+            // Left outer join's right operand cannot be on a correlation path.
+            // LeftAnti and ExistenceJoin are special cases of LeftOuter.
+            // Note that ExistenceJoin cannot be expressed externally in both SQL and DataFrame
+            // so it should not show up here in Analysis phase. This is just a safety net.
+            //
+            // LeftSemi does not allow output from the right operand.
+            // Any correlated references in the subplan
+            // of the right operand cannot be pulled up.
+            case LeftOuter | LeftSemi | LeftAnti | ExistenceJoin(_) =>
+              failOnOuterReference(j)
+              failOnOuterReferenceInSubTree(right)
+
+            // Likewise, Right outer join's left operand cannot be on a correlation path.
+            case RightOuter =>
+              failOnOuterReference(j)
+              failOnOuterReferenceInSubTree(left)
+
+            // Any other join types not explicitly listed above,
+            // including Full outer join, are treated as Category 4.
+            case _ =>
+              failOnOuterReferenceInSubTree(j)
+          }
           j
-        case u: Union =>
-          failOnOuterReferenceInSubTree(u, "a UNION")
-          u
-        case s: SetOperation =>
-          failOnOuterReferenceInSubTree(s.right, "an INTERSECT/EXCEPT")
-          s
-        case e: Expand =>
-          failOnOuterReferenceInSubTree(e, "an EXPAND")
-          e
-        case l : LocalLimit =>
-          failOnOuterReferenceInSubTree(l, "a LIMIT")
-          l
-        // Since LIMIT <n> is represented as GlobalLimit(<n>, (LocalLimit (<n>,
child))
-        // and we are walking bottom up, we will fail on LocalLimit before
-        // reaching GlobalLimit.
-        // The code below is just a safety net.
-        case g : GlobalLimit =>
-          failOnOuterReferenceInSubTree(g, "a LIMIT")
-          g
-        case s : Sample =>
-          failOnOuterReferenceInSubTree(s, "a TABLESAMPLE")
-          s
-        case p =>
+
+        // Generator with join=true, i.e., expressed with
+        // LATERAL VIEW [OUTER], similar to inner join,
+        // allows to have correlation under it
+        // but must not host any outer references.
+        // Note:
+        // Generator with join=false is treated as Category 4.
+        case p @ Generate(generator, true, _, _, _, _) =>
           failOnOuterReference(p)
           p
+
+        // Category 4: Any other operators not in the above 3 categories
+        // cannot be on a correlation path, that is they are allowed only
+        // under a correlation point but they and their descendant operators
+        // are not allowed to have any correlated expressions.
+        case p =>
+          failOnOuterReferenceInSubTree(p)
+          p
       }
       (transformed, predicateMap.values.flatten.toSeq)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/b098b484/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 37f0c8e..75d9997 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -932,7 +932,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper
{
         split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
 
       joinType match {
-        case _: InnerLike |  LeftSemi =>
+        case _: InnerLike | LeftSemi =>
           // push down the single side only join filter for both sides sub queries
           val newLeft = leftJoinConditions.
             reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)

http://git-wip-us.apache.org/repos/asf/spark/blob/b098b484/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 8c1faea..96aff37 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -542,7 +542,7 @@ class AnalysisErrorSuite extends AnalysisTest {
           Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))
       ),
       LocalRelation(a))
-    assertAnalysisError(plan4, "Accessing outer query column is not allowed in a LIMIT" ::
Nil)
+    assertAnalysisError(plan4, "Accessing outer query column is not allowed in" :: Nil)
 
     val plan5 = Filter(
       Exists(
@@ -551,6 +551,6 @@ class AnalysisErrorSuite extends AnalysisTest {
       ),
       LocalRelation(a))
     assertAnalysisError(plan5,
-                        "Accessing outer query column is not allowed in a TABLESAMPLE" ::
Nil)
+                        "Accessing outer query column is not allowed in" :: Nil)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b098b484/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 73a5394..0f2f520 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -789,4 +789,22 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
       }
     }
   }
+
+  // Generate operator
+  test("Correlated subqueries in LATERAL VIEW") {
+    withTempView("t1", "t2") {
+      Seq((1, 1), (2, 0)).toDF("c1", "c2").createOrReplaceTempView("t1")
+      Seq[(Int, Array[Int])]((1, Array(1, 2)), (2, Array(-1, -3)))
+        .toDF("c1", "arr_c2").createTempView("t2")
+      checkAnswer(
+        sql(
+          """
+          | select c2
+          | from t1
+          | where exists (select *
+          |               from t2 lateral view explode(arr_c2) q as c2
+                          where t1.c1 = t2.c1)""".stripMargin),
+        Row(1) :: Row(0) :: Nil)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message