spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject spark git commit: [SPARK-5454] More robust handling of self joins
Date Wed, 11 Feb 2015 20:32:02 GMT
Repository: spark
Updated Branches:
  refs/heads/master 03bf704bf -> a60d2b70a


[SPARK-5454] More robust handling of self joins

Also I fix a bunch of bad output in test cases.

Author: Michael Armbrust <michael@databricks.com>

Closes #4520 from marmbrus/selfJoin and squashes the following commits:

4f4a85c [Michael Armbrust] comments
49c8e26 [Michael Armbrust] fix tests
6fc38de [Michael Armbrust] fix style
55d64b3 [Michael Armbrust] fix dataframe selfjoins


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a60d2b70
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a60d2b70
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a60d2b70

Branch: refs/heads/master
Commit: a60d2b70adff3a8fb3bdfac226b1d86fdb443da4
Parents: 03bf704
Author: Michael Armbrust <michael@databricks.com>
Authored: Wed Feb 11 12:31:56 2015 -0800
Committer: Michael Armbrust <michael@databricks.com>
Committed: Wed Feb 11 12:31:56 2015 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 26 +++++++++++++++++---
 .../analysis/MultiInstanceRelation.scala        | 21 +---------------
 .../spark/sql/catalyst/plans/PlanTest.scala     |  4 +--
 .../scala/org/apache/spark/sql/SQLContext.scala |  2 ++
 sql/core/src/test/resources/log4j.properties    |  3 +++
 .../org/apache/spark/sql/DataFrameSuite.scala   | 10 ++++++++
 .../spark/sql/catalyst/plans/PlanTest.scala     |  4 +--
 7 files changed, 40 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a60d2b70/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 3f0d77a..2d1fa10 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -53,14 +53,11 @@ class Analyzer(catalog: Catalog,
   val extendedRules: Seq[Rule[LogicalPlan]] = Nil
 
   lazy val batches: Seq[Batch] = Seq(
-    Batch("MultiInstanceRelations", Once,
-      NewRelationInstances),
     Batch("Resolution", fixedPoint,
-      ResolveReferences ::
       ResolveRelations ::
+      ResolveReferences ::
       ResolveGroupingAnalytics ::
       ResolveSortReferences ::
-      NewRelationInstances ::
       ImplicitGenerate ::
       ResolveFunctions ::
       GlobalAggregates ::
@@ -285,6 +282,27 @@ class Analyzer(catalog: Catalog,
           }
         )
 
+      // Special handling for cases when self-join introduce duplicate expression ids.
+      case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty
=>
+        val conflictingAttributes = left.outputSet.intersect(right.outputSet)
+
+        val (oldRelation, newRelation, attributeRewrites) = right.collect {
+          case oldVersion: MultiInstanceRelation
+              if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
+            val newVersion = oldVersion.newInstance()
+            val newAttributes = AttributeMap(oldVersion.output.zip(newVersion.output))
+            (oldVersion, newVersion, newAttributes)
+        }.head // Only handle first case found, others will be fixed on the next pass.
+
+        val newRight = right transformUp {
+          case r if r == oldRelation => newRelation
+          case other => other transformExpressions {
+            case a: Attribute => attributeRewrites.get(a).getOrElse(a)
+          }
+        }
+
+        j.copy(right = newRight)
+
       case q: LogicalPlan =>
         logTrace(s"Attempting to resolve ${q.simpleString}")
         q transformExpressionsUp  {

http://git-wip-us.apache.org/repos/asf/spark/blob/a60d2b70/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
index 4c5fb3f..894c350 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
@@ -26,28 +26,9 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
  * produced by distinct operators in a query tree as this breaks the guarantee that expression
  * ids, which are used to differentiate attributes, are unique.
  *
- * Before analysis, all operators that include this trait will be asked to produce a new
version
+ * During analysis, operators that include this trait may be asked to produce a new version
  * of itself with globally unique expression ids.
  */
 trait MultiInstanceRelation {
   def newInstance(): this.type
 }
-
-/**
- * If any MultiInstanceRelation appears more than once in the query plan then the plan is
updated so
- * that each instance has unique expression ids for the attributes produced.
- */
-object NewRelationInstances extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = {
-    val localRelations = plan collect { case l: MultiInstanceRelation => l}
-    val multiAppearance = localRelations
-      .groupBy(identity[MultiInstanceRelation])
-      .filter { case (_, ls) => ls.size > 1 }
-      .map(_._1)
-      .toSet
-
-    plan transform {
-      case l: MultiInstanceRelation if multiAppearance.contains(l) => l.newInstance()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a60d2b70/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
index c4a1f89..7d609b9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -33,11 +33,9 @@ class PlanTest extends FunSuite {
    * we must normalize them to check if two different queries are identical.
    */
   protected def normalizeExprIds(plan: LogicalPlan) = {
-    val list = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id))
-    val minId = if (list.isEmpty) 0 else list.min
     plan transformAllExpressions {
       case a: AttributeReference =>
-        AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id -
minId))
+        AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(0))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a60d2b70/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 05ac162..fd121ce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -122,6 +122,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
     case _ =>
   }
 
+  @transient
   protected[sql] val cacheManager = new CacheManager(this)
 
   /**
@@ -159,6 +160,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    *       DataTypes.StringType);
    * }}}
    */
+  @transient
   val udf: UDFRegistration = new UDFRegistration(this)
 
   /** Returns true if the table is currently cached in-memory. */

http://git-wip-us.apache.org/repos/asf/spark/blob/a60d2b70/sql/core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/log4j.properties b/sql/core/src/test/resources/log4j.properties
index fbed0a7..28e90b9 100644
--- a/sql/core/src/test/resources/log4j.properties
+++ b/sql/core/src/test/resources/log4j.properties
@@ -39,6 +39,9 @@ log4j.appender.FA.Threshold = INFO
 log4j.additivity.parquet.hadoop.ParquetRecordReader=false
 log4j.logger.parquet.hadoop.ParquetRecordReader=OFF
 
+log4j.additivity.parquet.hadoop.ParquetOutputCommitter=false
+log4j.logger.parquet.hadoop.ParquetOutputCommitter=OFF
+
 log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false
 log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a60d2b70/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 02623f7..7be9215 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.test.TestSQLContext.logicalPlanToSparkQuery
 import org.apache.spark.sql.test.TestSQLContext.implicits._
+import org.apache.spark.sql.test.TestSQLContext.sql
 
 
 class DataFrameSuite extends QueryTest {
@@ -88,6 +89,15 @@ class DataFrameSuite extends QueryTest {
       testData.collect().toSeq)
   }
 
+  test("self join") {
+    val df1 = testData.select(testData("key")).as('df1)
+    val df2 = testData.select(testData("key")).as('df2)
+
+    checkAnswer(
+      df1.join(df2, $"df1.key" === $"df2.key"),
+      sql("SELECT a.key, b.key FROM testData a JOIN testData b ON a.key = b.key").collect().toSeq)
+  }
+
   test("selectExpr") {
     checkAnswer(
       testData.selectExpr("abs(key)", "value"),

http://git-wip-us.apache.org/repos/asf/spark/blob/a60d2b70/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
index 081d94b..44ee5ab 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -35,11 +35,9 @@ class PlanTest extends FunSuite {
    * we must normalize them to check if two different queries are identical.
    */
   protected def normalizeExprIds(plan: LogicalPlan) = {
-    val list = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id))
-    val minId = if (list.isEmpty) 0 else list.min
     plan transformAllExpressions {
       case a: AttributeReference =>
-        AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id -
minId))
+        AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(0))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message