spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject spark git commit: [SPARK-5839][SQL]HiveMetastoreCatalog does not recognize table names and aliases of data source tables.
Date Mon, 16 Feb 2015 23:54:05 GMT
Repository: spark
Updated Branches:
  refs/heads/master 5b6cd65cd -> f3ff1eb29


[SPARK-5839][SQL]HiveMetastoreCatalog does not recognize table names and aliases of data source
tables.

JIRA: https://issues.apache.org/jira/browse/SPARK-5839

Author: Yin Huai <yhuai@databricks.com>

Closes #4626 from yhuai/SPARK-5839 and squashes the following commits:

f779d85 [Yin Huai] Use subqeury to wrap replaced ParquetRelation.
2695f13 [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-5839
f1ba6ca [Yin Huai] Address comment.
2c7fa08 [Yin Huai] Use Subqueries to wrap a data source table.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f3ff1eb2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f3ff1eb2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f3ff1eb2

Branch: refs/heads/master
Commit: f3ff1eb2985ff3e1567645b898f6b42e4b01f237
Parents: 5b6cd65
Author: Yin Huai <yhuai@databricks.com>
Authored: Mon Feb 16 15:54:01 2015 -0800
Committer: Michael Armbrust <michael@databricks.com>
Committed: Mon Feb 16 15:54:01 2015 -0800

----------------------------------------------------------------------
 .../spark/sql/parquet/ParquetQuerySuite.scala   |  5 +--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 18 +++++++++--
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 34 ++++++++++++++++++++
 3 files changed, 53 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f3ff1eb2/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 9318c15..8b4d05e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -30,9 +30,10 @@ import org.apache.spark.sql.test.TestSQLContext._
 class ParquetQuerySuiteBase extends QueryTest with ParquetTest {
   val sqlContext = TestSQLContext
 
-  test("simple projection") {
+  test("simple select queries") {
     withParquetTable((0 until 10).map(i => (i, i.toString)), "t") {
-      checkAnswer(sql("SELECT _1 FROM t"), (0 until 10).map(Row.apply(_)))
+      checkAnswer(sql("SELECT _1 FROM t where t._1 > 5"), (6 until 10).map(Row.apply(_)))
+      checkAnswer(sql("SELECT _1 FROM t as tmp where tmp._1 < 5"), (0 until 5).map(Row.apply(_)))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f3ff1eb2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 72211fe..87bc9fe 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -160,7 +160,15 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog
with
     }
 
     if (table.getProperty("spark.sql.sources.provider") != null) {
-      cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
+      val dataSourceTable =
+        cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
+      // Then, if alias is specified, wrap the table with a Subquery using the alias.
+      // Othersie, wrap the table with a Subquery using the table name.
+      val withAlias =
+        alias.map(a => Subquery(a, dataSourceTable)).getOrElse(
+          Subquery(tableIdent.last, dataSourceTable))
+
+      withAlias
     } else if (table.isView) {
       // if the unresolved relation is from hive view
       // parse the text into logic node.
@@ -433,7 +441,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog
with
         val attributedRewrites = AttributeMap(relation.output.zip(parquetRelation.output))
 
         lastPlan.transformUp {
-          case r: MetastoreRelation if r == relation => parquetRelation
+          case r: MetastoreRelation if r == relation => {
+            val withAlias =
+              r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
+                Subquery(r.tableName, parquetRelation))
+
+            withAlias
+          }
           case other => other.transformExpressions {
             case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/f3ff1eb2/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 375aae5..0263e3b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -401,6 +401,40 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach
{
     sql("DROP TABLE jsonTable").collect().foreach(println)
   }
 
+  test("SPARK-5839 HiveMetastoreCatalog does not recognize table aliases of data source tables.")
{
+    val originalDefaultSource = conf.defaultDataSourceName
+
+    val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+    val df = jsonRDD(rdd)
+
+    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
+    // Save the df as a managed table (by not specifiying the path).
+    df.saveAsTable("savedJsonTable")
+
+    checkAnswer(
+      sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
+      (1 to 4).map(i => Row(i, s"str${i}")))
+
+    checkAnswer(
+      sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"),
+      (6 to 10).map(i => Row(i, s"str${i}")))
+
+    invalidateTable("savedJsonTable")
+
+    checkAnswer(
+      sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
+      (1 to 4).map(i => Row(i, s"str${i}")))
+
+    checkAnswer(
+      sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"),
+      (6 to 10).map(i => Row(i, s"str${i}")))
+
+    // Drop table will also delete the data.
+    sql("DROP TABLE savedJsonTable")
+
+    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
+  }
+
   test("save table") {
     val originalDefaultSource = conf.defaultDataSourceName
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message