spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject spark git commit: [SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata
Date Fri, 03 Apr 2015 03:23:19 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 0ef46b2d8 -> 0c1c0fb90


[SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata

https://issues.apache.org/jira/browse/SPARK-6575

Author: Yin Huai <yhuai@databricks.com>

Closes #5339 from yhuai/parquetRelationCache and squashes the following commits:

83d9846 [Yin Huai] Remove unnecessary change.
c0dc7a4 [Yin Huai] Cache converted parquet relations.

(cherry picked from commit 4b82bd730a24f96d94dfea87420cfaa4253a5ccb)
Signed-off-by: Michael Armbrust <michael@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c1c0fb9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c1c0fb9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c1c0fb9

Branch: refs/heads/branch-1.3
Commit: 0c1c0fb90d025e60c5ab74bd80a7c36482070b80
Parents: 0ef46b2
Author: Yin Huai <yhuai@databricks.com>
Authored: Thu Apr 2 20:23:08 2015 -0700
Committer: Michael Armbrust <michael@databricks.com>
Committed: Thu Apr 2 20:23:16 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  61 +++++++++-
 .../apache/spark/sql/hive/parquetSuites.scala   | 112 +++++++++++++++++++
 2 files changed, 167 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0c1c0fb9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index bbd920a..76d329a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -116,7 +116,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog
with
   }
 
   override def refreshTable(databaseName: String, tableName: String): Unit = {
-    cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase)
+    // refresh table does not eagerly reload the cache. It just invalidate the cache.
+    // Next time when we use the table, it will be populated in the cache.
+    invalidateTable(databaseName, tableName)
   }
 
   def invalidateTable(databaseName: String, tableName: String): Unit = {
@@ -229,13 +231,42 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends
Catalog with
   private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation
= {
     val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
     val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
-    val parquetOptions = Map(
-      ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
-      ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
 
     // NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have
to
     // serialize the Metastore schema to JSON and pass it as a data source option because
of the
     // evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
+    val parquetOptions = Map(
+      ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
+      ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
+    val tableIdentifier =
+      QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
+
+    def getCached(
+      tableIdentifier: QualifiedTableName,
+      pathsInMetastore: Seq[String],
+      schemaInMetastore: StructType,
+      partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
+      cachedDataSourceTables.getIfPresent(tableIdentifier) match {
+        case null => None // Cache miss
+        case logical @ LogicalRelation(parquetRelation: ParquetRelation2) =>
+          // If we have the same paths, same schema, and same partition spec,
+          // we will use the cached Parquet Relation.
+          val useCached =
+            parquetRelation.paths == pathsInMetastore &&
+            logical.schema.sameType(metastoreSchema) &&
+            parquetRelation.maybePartitionSpec == partitionSpecInMetastore
+
+          if (useCached) Some(logical) else None
+        case other =>
+          logWarning(
+            s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} shold be stored
" +
+              s"as Parquet. However, we are getting a ${other} from the metastore cache.
" +
+              s"This cached entry will be invalidated.")
+          cachedDataSourceTables.invalidate(tableIdentifier)
+          None
+      }
+    }
+
     if (metastoreRelation.hiveQlTable.isPartitioned) {
       val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
       val partitionColumnDataTypes = partitionSchema.map(_.dataType)
@@ -248,10 +279,28 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends
Catalog with
       }
       val partitionSpec = PartitionSpec(partitionSchema, partitions)
       val paths = partitions.map(_.path)
-      LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
+
+      val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
+      val parquetRelation = cached.getOrElse {
+        val created =
+          LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
+        cachedDataSourceTables.put(tableIdentifier, created)
+        created
+      }
+
+      parquetRelation
     } else {
       val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
-      LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
+
+      val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
+      val parquetRelation = cached.getOrElse {
+        val created =
+          LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
+        cachedDataSourceTables.put(tableIdentifier, created)
+        created
+      }
+
+      parquetRelation
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0c1c0fb9/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index c8eabc9..e1e8461 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -26,8 +26,10 @@ import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
 import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
 import org.apache.spark.sql.hive.execution.HiveTableScan
+import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
+import org.apache.spark.sql.json.JSONRelation
 import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
 import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
 import org.apache.spark.sql.SaveMode
@@ -389,6 +391,116 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase
{
 
     sql("DROP TABLE ms_convert")
   }
+
+  test("Caching converted data source Parquet Relations") {
+    def checkCached(tableIdentifer: catalog.QualifiedTableName): Unit = {
+      // Converted test_parquet should be cached.
+      catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match {
+        case null => fail("Converted test_parquet should be cached in the cache.")
+        case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK
+        case other =>
+          fail(
+            "The cached test_parquet should be a Parquet Relation. " +
+              s"However, $other is returned form the cache.")
+      }
+    }
+
+    sql("DROP TABLE IF EXISTS test_insert_parquet")
+    sql("DROP TABLE IF EXISTS test_parquet_partitioned_cache_test")
+
+    sql(
+      """
+        |create table test_insert_parquet
+        |(
+        |  intField INT,
+        |  stringField STRING
+        |)
+        |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    var tableIdentifer = catalog.QualifiedTableName("default", "test_insert_parquet")
+
+    // First, make sure the converted test_parquet is not cached.
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+    // Table lookup will make the table cached.
+    table("test_insert_parquet")
+    checkCached(tableIdentifer)
+    // For insert into non-partitioned table, we will do the conversion,
+    // so the converted test_insert_parquet should be cached.
+    invalidateTable("test_insert_parquet")
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+    sql(
+      """
+        |INSERT INTO TABLE test_insert_parquet
+        |select a, b from jt
+      """.stripMargin)
+    checkCached(tableIdentifer)
+    // Make sure we can read the data.
+    checkAnswer(
+      sql("select * from test_insert_parquet"),
+      sql("select a, b from jt").collect())
+    // Invalidate the cache.
+    invalidateTable("test_insert_parquet")
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+
+    // Create a partitioned table.
+    sql(
+      """
+        |create table test_parquet_partitioned_cache_test
+        |(
+        |  intField INT,
+        |  stringField STRING
+        |)
+        |PARTITIONED BY (date string)
+        |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+        |STORED AS
+        |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+        |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      """.stripMargin)
+
+    tableIdentifer = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+    sql(
+      """
+        |INSERT INTO TABLE test_parquet_partitioned_cache_test
+        |PARTITION (date='2015-04-01')
+        |select a, b from jt
+      """.stripMargin)
+    // Right now, insert into a partitioned Parquet is not supported in data source Parquet.
+    // So, we expect it is not cached.
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+    sql(
+      """
+        |INSERT INTO TABLE test_parquet_partitioned_cache_test
+        |PARTITION (date='2015-04-02')
+        |select a, b from jt
+      """.stripMargin)
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+
+    // Make sure we can cache the partitioned table.
+    table("test_parquet_partitioned_cache_test")
+    checkCached(tableIdentifer)
+    // Make sure we can read the data.
+    checkAnswer(
+      sql("select STRINGField, date, intField from test_parquet_partitioned_cache_test"),
+      sql(
+        """
+          |select b, '2015-04-01', a FROM jt
+          |UNION ALL
+          |select b, '2015-04-02', a FROM jt
+        """.stripMargin).collect())
+
+    invalidateTable("test_parquet_partitioned_cache_test")
+    assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+
+    sql("DROP TABLE test_insert_parquet")
+    sql("DROP TABLE test_parquet_partitioned_cache_test")
+  }
 }
 
 class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message