spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject spark git commit: [SPARK-6595][SQL] MetastoreRelation should be a MultiInstanceRelation
Date Mon, 30 Mar 2015 14:33:36 GMT
Repository: spark
Updated Branches:
  refs/heads/master 19d4c392f -> fe81f6c77


[SPARK-6595][SQL] MetastoreRelation should be a MultiInstanceRelation

Now that we have `DataFrame`s it is possible to have multiple copies in a single query plan.
 As such, it needs to inherit from `MultiInstanceRelation` or self joins will break.  I also
add better debugging errors when our self join handling fails in case there are future bugs.

Author: Michael Armbrust <michael@databricks.com>

Closes #5251 from marmbrus/multiMetaStore and squashes the following commits:

4272f6d [Michael Armbrust] [SPARK-6595][SQL] MetastoreRelation should be MuliInstanceRelation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe81f6c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe81f6c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe81f6c7

Branch: refs/heads/master
Commit: fe81f6c779213a91369ec61cf5489ad5c66cc49c
Parents: 19d4c39
Author: Michael Armbrust <michael@databricks.com>
Authored: Mon Mar 30 22:24:12 2015 +0800
Committer: Cheng Lian <lian@databricks.com>
Committed: Mon Mar 30 22:24:12 2015 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala    | 10 +++++++++-
 .../sql/catalyst/analysis/MultiInstanceRelation.scala    |  2 +-
 .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 11 +++++++++--
 .../spark/sql/hive/HiveMetastoreCatalogSuite.scala       |  8 ++++++++
 4 files changed, 27 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fe81f6c7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 44eceb0..ba1ac14 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -252,7 +252,15 @@ class Analyzer(catalog: Catalog,
           case oldVersion @ Aggregate(_, aggregateExpressions, _)
               if findAliases(aggregateExpressions).intersect(conflictingAttributes).nonEmpty
=>
             (oldVersion, oldVersion.copy(aggregateExpressions = newAliases(aggregateExpressions)))
-        }.head // Only handle first case found, others will be fixed on the next pass.
+        }.headOption.getOrElse { // Only handle first case, others will be fixed on the next
pass.
+          sys.error(
+            s"""
+              |Failure when resolving conflicting references in Join:
+              |$plan
+              |
+              |Conflicting attributes: ${conflictingAttributes.mkString(",")}
+              """.stripMargin)
+        }
 
         val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
         val newRight = right transformUp {

http://git-wip-us.apache.org/repos/asf/spark/blob/fe81f6c7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
index 894c350..35b7402 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
@@ -30,5 +30,5 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
  * of itself with globally unique expression ids.
  */
 trait MultiInstanceRelation {
-  def newInstance(): this.type
+  def newInstance(): LogicalPlan
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fe81f6c7/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d1a9955..203164e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.util.ReflectionUtils
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext}
-import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Catalog, OverrideCatalog}
+import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NoSuchTableException,
Catalog, OverrideCatalog}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical
@@ -697,7 +697,7 @@ private[hive] case class MetastoreRelation
     (databaseName: String, tableName: String, alias: Option[String])
     (val table: TTable, val partitions: Seq[TPartition])
     (@transient sqlContext: SQLContext)
-  extends LeafNode {
+  extends LeafNode with MultiInstanceRelation {
 
   self: Product =>
 
@@ -778,6 +778,13 @@ private[hive] case class MetastoreRelation
 
   /** An attribute map for determining the ordinal for non-partition columns. */
   val columnOrdinals = AttributeMap(attributes.zipWithIndex)
+
+  override def newInstance() = {
+    val newCopy = MetastoreRelation(databaseName, tableName, alias)(table, partitions)(sqlContext)
+    // The project here is an ugly hack to work around the fact that MetastoreRelation's
+    // equals method is broken.  Please remove this when SPARK-6555 is fixed.
+    Project(newCopy.output, newCopy)
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fe81f6c7/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index aad48ad..fa8e11f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hive
 
+import org.apache.spark.sql.hive.test.TestHive
 import org.scalatest.FunSuite
 
 import org.apache.spark.sql.test.ExamplePointUDT
@@ -36,4 +37,11 @@ class HiveMetastoreCatalogSuite extends FunSuite {
     assert(HiveMetastoreTypes.toMetastoreType(udt) ===
       HiveMetastoreTypes.toMetastoreType(udt.sqlType))
   }
+
+  test("duplicated metastore relations") {
+    import TestHive.implicits._
+    val df = TestHive.sql("SELECT * FROM src")
+    println(df.queryExecution)
+    df.as('a).join(df.as('b), $"a.key" === $"b.key")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message