spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject spark git commit: [SPARK-7088] [SQL] Fix analysis for 3rd party logical plan.
Date Wed, 24 Jun 2015 19:29:12 GMT
Repository: spark
Updated Branches:
  refs/heads/master 43e66192f -> b84d4b4df


[SPARK-7088] [SQL] Fix analysis for 3rd party logical plan.

ResolveReferences analysis rule now does not throw when it cannot resolve references in a
self-join.

Author: Santiago M. Mola <smola@stratio.com>

Closes #6853 from smola/SPARK-7088 and squashes the following commits:

af71ac7 [Santiago M. Mola] [SPARK-7088] Fix analysis for 3rd party logical plan.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b84d4b4d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b84d4b4d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b84d4b4d

Branch: refs/heads/master
Commit: b84d4b4dfe8ced1b96a0c74ef968a20a1bba8231
Parents: 43e6619
Author: Santiago M. Mola <smola@stratio.com>
Authored: Wed Jun 24 12:29:07 2015 -0700
Committer: Michael Armbrust <michael@databricks.com>
Committed: Wed Jun 24 12:29:07 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 38 ++++++++++----------
 .../sql/catalyst/analysis/CheckAnalysis.scala   | 12 +++++++
 2 files changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b84d4b4d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 0a3f5a7..b06759f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -283,7 +283,7 @@ class Analyzer(
         val conflictingAttributes = left.outputSet.intersect(right.outputSet)
         logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} in $j")
 
-        val (oldRelation, newRelation) = right.collect {
+        right.collect {
           // Handle base relations that might appear more than once.
           case oldVersion: MultiInstanceRelation
               if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
@@ -308,25 +308,27 @@ class Analyzer(
               if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
                 .nonEmpty =>
             (oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions)))
-        }.headOption.getOrElse { // Only handle first case, others will be fixed on the next
pass.
-          sys.error(
-            s"""
-              |Failure when resolving conflicting references in Join:
-              |$plan
-              |
-              |Conflicting attributes: ${conflictingAttributes.mkString(",")}
-              """.stripMargin)
         }
-
-        val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
-        val newRight = right transformUp {
-          case r if r == oldRelation => newRelation
-        } transformUp {
-          case other => other transformExpressions {
-            case a: Attribute => attributeRewrites.get(a).getOrElse(a)
-          }
+          // Only handle first case, others will be fixed on the next pass.
+          .headOption match {
+          case None =>
+            /*
+             * No result implies that there is a logical plan node that produces new references
+             * that this rule cannot handle. When that is the case, there must be another
rule
+             * that resolves these conflicts. Otherwise, the analysis will fail.
+             */
+            j
+          case Some((oldRelation, newRelation)) =>
+            val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
+            val newRight = right transformUp {
+              case r if r == oldRelation => newRelation
+            } transformUp {
+              case other => other transformExpressions {
+                case a: Attribute => attributeRewrites.get(a).getOrElse(a)
+              }
+            }
+            j.copy(right = newRight)
         }
-        j.copy(right = newRight)
 
       // When resolve `SortOrder`s in Sort based on child, don't report errors as
       // we still have chance to resolve it based on grandchild

http://git-wip-us.apache.org/repos/asf/spark/blob/b84d4b4d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index c5a1437..a069b47 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -48,6 +48,7 @@ trait CheckAnalysis {
     // We transform up and order the rules so as to catch the first possible failure instead
     // of the result of cascading resolution failures.
     plan.foreachUp {
+
       case operator: LogicalPlan =>
         operator transformExpressionsUp {
           case a: Attribute if !a.resolved =>
@@ -121,6 +122,17 @@ trait CheckAnalysis {
 
           case _ => // Analysis successful!
         }
+
+      // Special handling for cases when self-join introduce duplicate expression ids.
+      case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty
=>
+        val conflictingAttributes = left.outputSet.intersect(right.outputSet)
+        failAnalysis(
+          s"""
+             |Failure when resolving conflicting references in Join:
+             |$plan
+             |Conflicting attributes: ${conflictingAttributes.mkString(",")}
+             |""".stripMargin)
+
     }
     extendedCheckRules.foreach(_(plan))
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message