spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject spark git commit: [SPARK-6555] [SQL] Overrides equals() and hashCode() for MetastoreRelation
Date Tue, 31 Mar 2015 18:18:38 GMT
Repository: spark
Updated Branches:
  refs/heads/master d01a6d8c3 -> a7992ffaf


[SPARK-6555] [SQL] Overrides equals() and hashCode() for MetastoreRelation

Also removes temporary workarounds made in #5183 and #5251.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5289)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #5289 from liancheng/spark-6555 and squashes the following commits:

d0095ac [Cheng Lian] Removes unused imports
cfafeeb [Cheng Lian] Removes outdated comment
75a2746 [Cheng Lian] Overrides equals() and hashCode() for MetastoreRelation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a7992ffa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a7992ffa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a7992ffa

Branch: refs/heads/master
Commit: a7992ffaf1e8adc9d2c225a986fa3162e8e130eb
Parents: d01a6d8
Author: Cheng Lian <lian@databricks.com>
Authored: Tue Mar 31 11:18:25 2015 -0700
Committer: Michael Armbrust <michael@databricks.com>
Committed: Tue Mar 31 11:18:25 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 42 +++++++++++---------
 .../spark/sql/hive/execution/HivePlanTest.scala |  6 ++-
 2 files changed, 28 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a7992ffa/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 6a01a23..f20f0ad 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
 import java.io.IOException
 import java.util.{List => JList}
 
+import com.google.common.base.Objects
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 import org.apache.hadoop.hive.metastore.api.{FieldSchema, Partition => TPartition, Table
=> TTable}
 import org.apache.hadoop.hive.metastore.{TableType, Warehouse}
@@ -465,7 +466,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog
with
               relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
           val parquetRelation = convertToParquetRelation(relation)
           val attributedRewrites = relation.output.zip(parquetRelation.output)
-          (relation -> relation.output, parquetRelation, attributedRewrites)
+          (relation, parquetRelation, attributedRewrites)
 
         // Write path
         case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
@@ -476,7 +477,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog
with
             relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
           val parquetRelation = convertToParquetRelation(relation)
           val attributedRewrites = relation.output.zip(parquetRelation.output)
-          (relation -> relation.output, parquetRelation, attributedRewrites)
+          (relation, parquetRelation, attributedRewrites)
 
         // Read path
         case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
@@ -485,33 +486,28 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends
Catalog with
               relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
           val parquetRelation = convertToParquetRelation(relation)
           val attributedRewrites = relation.output.zip(parquetRelation.output)
-          (relation -> relation.output, parquetRelation, attributedRewrites)
+          (relation, parquetRelation, attributedRewrites)
       }
 
-      // Quick fix for SPARK-6450: Notice that we're using both the MetastoreRelation instances
and
-      // their output attributes as the key of the map. This is because MetastoreRelation.equals
-      // doesn't take output attributes into account, thus multiple MetastoreRelation instances
-      // pointing to the same table get collapsed into a single entry in the map. A proper
fix for
-      // this should be overriding equals & hashCode in MetastoreRelation.
       val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
       val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _))
 
       // Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes
       // attribute IDs referenced in other nodes.
       plan.transformUp {
-        case r: MetastoreRelation if relationMap.contains(r -> r.output) =>
-          val parquetRelation = relationMap(r -> r.output)
+        case r: MetastoreRelation if relationMap.contains(r) =>
+          val parquetRelation = relationMap(r)
           val alias = r.alias.getOrElse(r.tableName)
           Subquery(alias, parquetRelation)
 
         case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
-          if relationMap.contains(r -> r.output) =>
-          val parquetRelation = relationMap(r -> r.output)
+          if relationMap.contains(r) =>
+          val parquetRelation = relationMap(r)
           InsertIntoTable(parquetRelation, partition, child, overwrite)
 
         case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite)
-          if relationMap.contains(r -> r.output) =>
-          val parquetRelation = relationMap(r -> r.output)
+          if relationMap.contains(r) =>
+          val parquetRelation = relationMap(r)
           InsertIntoTable(parquetRelation, partition, child, overwrite)
 
         case other => other.transformExpressions {
@@ -707,6 +703,19 @@ private[hive] case class MetastoreRelation
 
   self: Product =>
 
+  override def equals(other: scala.Any): Boolean = other match {
+    case relation: MetastoreRelation =>
+      databaseName == relation.databaseName &&
+        tableName == relation.tableName &&
+        alias == relation.alias &&
+        output == relation.output
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    Objects.hashCode(databaseName, tableName, alias, output)
+  }
+
   // TODO: Can we use org.apache.hadoop.hive.ql.metadata.Table as the type of table and
   // use org.apache.hadoop.hive.ql.metadata.Partition as the type of elements of partitions.
   // Right now, using org.apache.hadoop.hive.ql.metadata.Table and
@@ -786,10 +795,7 @@ private[hive] case class MetastoreRelation
   val columnOrdinals = AttributeMap(attributes.zipWithIndex)
 
   override def newInstance() = {
-    val newCopy = MetastoreRelation(databaseName, tableName, alias)(table, partitions)(sqlContext)
-    // The project here is an ugly hack to work around the fact that MetastoreRelation's
-    // equals method is broken.  Please remove this when SPARK-6555 is fixed.
-    Project(newCopy.output, newCopy)
+    MetastoreRelation(databaseName, tableName, alias)(table, partitions)(sqlContext)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a7992ffa/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
index c939e6e..bdb53dd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
@@ -22,10 +22,12 @@ import org.apache.spark.sql.hive.test.TestHive
 
 class HivePlanTest extends QueryTest {
   import TestHive._
+  import TestHive.implicits._
 
   test("udf constant folding") {
-    val optimized = sql("SELECT cos(null) FROM src").queryExecution.optimizedPlan
-    val correctAnswer = sql("SELECT cast(null as double) FROM src").queryExecution.optimizedPlan
+    Seq.empty[Tuple1[Int]].toDF("a").registerTempTable("t")
+    val optimized = sql("SELECT cos(null) FROM t").queryExecution.optimizedPlan
+    val correctAnswer = sql("SELECT cast(null as double) FROM t").queryExecution.optimizedPlan
 
     comparePlans(optimized, correctAnswer)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message