spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-15968][SQL] Nonempty partitioned metastore tables are not cached
Date Tue, 05 Jul 2016 01:49:32 GMT
Repository: spark
Updated Branches:
  refs/heads/master 7dbffcdd6 -> 8f6cf00c6


[SPARK-15968][SQL] Nonempty partitioned metastore tables are not cached

(Please note this is a revision of PR #13686, which has been closed in favor of this PR.)

This PR addresses [SPARK-15968](https://issues.apache.org/jira/browse/SPARK-15968).

## What changes were proposed in this pull request?

The `getCached` method of [HiveMetastoreCatalog](https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala)
computes `pathsInMetastore` from the metastore relation's catalog table. This only returns
the table base path, which is incomplete/inaccurate for a nonempty partitioned table. As a
result, cached lookups on nonempty partitioned tables always miss.

Rather than get `pathsInMetastore` from

    metastoreRelation.catalogTable.storage.locationUri.toSeq

I modified the `getCached` method to take a `pathsInMetastore` argument. Calls to this method
pass in the paths computed from calls to the Hive metastore. This is how `getCached` was implemented
in Spark 1.5:

https://github.com/apache/spark/blob/e0c3212a9b42e3e704b070da4ac25b68c584427f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L444.

I also added a call in `InsertIntoHiveTable.scala` to invalidate the table from the SQL session
catalog.

## How was this patch tested?

I've added a new unit test to `parquetSuites.scala`:

    SPARK-15968: nonempty partitioned metastore Parquet table lookup should use cached relation

Note that the only difference between this new test and the one above it in the file is that
the new test populates its partitioned table with a single value, while the existing test
leaves the table empty. This reveals a subtle, unexpected hole in test coverage present before
this patch.

Note I also modified a different but related unit test in `parquetSuites.scala`:

    SPARK-15248: explicitly added partitions should be readable

This unit test asserts that Spark SQL should return data from a table partition which has
been placed there outside a metastore query immediately after it is added. I changed the test
so that, instead of adding the data as a parquet file saved in the partition's location, the
data is added through a SQL `INSERT` query. I made this change because I could find no way
to efficiently support partitioned table caching without failing that test.

In addition to my primary motivation, I can offer a few reasons I believe this is an acceptable
weakening of that test. First, it still validates a fix for [SPARK-15248](https://issues.apache.org/jira/browse/SPARK-15248),
the issue for which it was written. Second, the assertion made is stronger than that required
for non-partitioned tables. If you write data to the storage location of a non-partitioned
metastore table without using a proper SQL DML query, a subsequent call to show that data
will not return it. I believe this is an intentional limitation put in place to make table
caching feasible, but I'm only speculating.

Building a large `HadoopFsRelation` requires `stat`-ing all of its data files. In our environment,
where we have tables with 10's of thousands of partitions, the difference between using a
cached relation versus a new one is a matter of seconds versus minutes. Caching partitioned
table metadata vastly improves the usability of Spark SQL for these cases.

Thanks.

Author: Michael Allman <michael@videoamp.com>

Closes #13818 from mallman/spark-15968.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f6cf00c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f6cf00c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f6cf00c

Branch: refs/heads/master
Commit: 8f6cf00c697689174bcf522091e396b4fa5ef66d
Parents: 7dbffcd
Author: Michael Allman <michael@videoamp.com>
Authored: Tue Jul 5 09:49:25 2016 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Tue Jul 5 09:49:25 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 16 ++++-
 .../hive/execution/InsertIntoHiveTable.scala    |  1 +
 .../apache/spark/sql/hive/parquetSuites.scala   | 61 ++++++++++++++------
 3 files changed, 59 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8f6cf00c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 151e456..7dae473 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -191,6 +191,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends
Log
 
   private def getCached(
       tableIdentifier: QualifiedTableName,
+      pathsInMetastore: Seq[String],
       metastoreRelation: MetastoreRelation,
       schemaInMetastore: StructType,
       expectedFileFormat: Class[_ <: FileFormat],
@@ -200,7 +201,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends
Log
     cachedDataSourceTables.getIfPresent(tableIdentifier) match {
       case null => None // Cache miss
       case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
-        val pathsInMetastore = metastoreRelation.catalogTable.storage.locationUri.toSeq
         val cachedRelationFileFormatClass = relation.fileFormat.getClass
 
         expectedFileFormat match {
@@ -265,9 +265,22 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession)
extends Log
         PartitionDirectory(values, location)
       }
       val partitionSpec = PartitionSpec(partitionSchema, partitions)
+      val partitionPaths = partitions.map(_.path.toString)
+
+      // By convention (for example, see MetaStorePartitionedTableFileCatalog), the definition
of a
+      // partitioned table's paths depends on whether that table has any actual partitions.
+      // Partitioned tables without partitions use the location of the table's base path.
+      // Partitioned tables with partitions use the locations of those partitions' data locations,
+      // _omitting_ the table's base path.
+      val paths = if (partitionPaths.isEmpty) {
+        Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
+      } else {
+        partitionPaths
+      }
 
       val cached = getCached(
         tableIdentifier,
+        paths,
         metastoreRelation,
         metastoreSchema,
         fileFormatClass,
@@ -312,6 +325,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends
Log
       val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
 
       val cached = getCached(tableIdentifier,
+        paths,
         metastoreRelation,
         metastoreSchema,
         fileFormatClass,

http://git-wip-us.apache.org/repos/asf/spark/blob/8f6cf00c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 97cd29f..529d388 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -298,6 +298,7 @@ case class InsertIntoHiveTable(
 
     // Invalidate the cache.
     sqlContext.sharedState.cacheManager.invalidateCache(table)
+    sqlContext.sessionState.catalog.invalidateTable(table.catalogTable.identifier)
 
     // It would be nice to just return the childRdd unchanged so insert operations could
be chained,
     // however for now we return an empty list to simplify compatibility checks with hive,
which

http://git-wip-us.apache.org/repos/asf/spark/blob/8f6cf00c/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index fe7253d..4b32b13 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -389,17 +389,18 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
   test("SPARK-7749: non-partitioned metastore Parquet table lookup should use cached relation")
{
     withTable("nonPartitioned") {
       sql(
-        s"""CREATE TABLE nonPartitioned (
-           |  key INT,
-           |  value STRING
-           |)
-           |STORED AS PARQUET
-         """.stripMargin)
+        """
+          |CREATE TABLE nonPartitioned (
+          |  key INT,
+          |  value STRING
+          |)
+          |STORED AS PARQUET
+        """.stripMargin)
 
       // First lookup fills the cache
-      val r1 = collectHadoopFsRelation (table("nonPartitioned"))
+      val r1 = collectHadoopFsRelation(table("nonPartitioned"))
       // Second lookup should reuse the cache
-      val r2 = collectHadoopFsRelation (table("nonPartitioned"))
+      val r2 = collectHadoopFsRelation(table("nonPartitioned"))
       // They should be the same instance
       assert(r1 eq r2)
     }
@@ -408,18 +409,42 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
   test("SPARK-7749: partitioned metastore Parquet table lookup should use cached relation")
{
     withTable("partitioned") {
       sql(
-        s"""CREATE TABLE partitioned (
-           | key INT,
-           | value STRING
-           |)
-           |PARTITIONED BY (part INT)
-           |STORED AS PARQUET
-       """.stripMargin)
+        """
+          |CREATE TABLE partitioned (
+          |  key INT,
+          |  value STRING
+          |)
+          |PARTITIONED BY (part INT)
+          |STORED AS PARQUET
+        """.stripMargin)
+
+      // First lookup fills the cache
+      val r1 = collectHadoopFsRelation(table("partitioned"))
+      // Second lookup should reuse the cache
+      val r2 = collectHadoopFsRelation(table("partitioned"))
+      // They should be the same instance
+      assert(r1 eq r2)
+    }
+  }
+
+  test("SPARK-15968: nonempty partitioned metastore Parquet table lookup should use cached
" +
+      "relation") {
+    withTable("partitioned") {
+      sql(
+        """
+          |CREATE TABLE partitioned (
+          |  key INT,
+          |  value STRING
+          |)
+          |PARTITIONED BY (part INT)
+          |STORED AS PARQUET
+        """.stripMargin)
+      sql("INSERT INTO TABLE partitioned PARTITION(part=0) SELECT 1 as key, 'one' as value")
 
       // First lookup fills the cache
-      val r1 = collectHadoopFsRelation (table("partitioned"))
+      val r1 = collectHadoopFsRelation(table("partitioned"))
       // Second lookup should reuse the cache
-      val r2 = collectHadoopFsRelation (table("partitioned"))
+      val r2 = collectHadoopFsRelation(table("partitioned"))
       // They should be the same instance
       assert(r1 eq r2)
     }
@@ -557,7 +582,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
           Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
 
         // Add data files to partition directory and check whether they can be read
-        Seq("baz").toDF("a").write.mode(SaveMode.Overwrite).parquet(partitionDir)
+        sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 'baz' as a")
         checkAnswer(
           sql("SELECT * FROM test_added_partitions"),
           Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message