spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject spark git commit: [SPARK-6016][SQL] Cannot read the parquet table after overwriting the existing table when spark.sql.parquet.cacheMetadata=true
Date Thu, 26 Feb 2015 17:04:34 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 e0f5fb0ad -> b5c5e93d7


[SPARK-6016][SQL] Cannot read the parquet table after overwriting the existing table when
spark.sql.parquet.cacheMetadata=true

Please see JIRA (https://issues.apache.org/jira/browse/SPARK-6016) for details of the bug.

Author: Yin Huai <yhuai@databricks.com>

Closes #4775 from yhuai/parquetFooterCache and squashes the following commits:

78787b1 [Yin Huai] Remove footerCache in FilteringParquetRowInputFormat.
dff6fba [Yin Huai] Failed unit test.

(cherry picked from commit 192e42a2933eb283e12bfdfb46e2ef895228af4a)
Signed-off-by: Cheng Lian <lian@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b5c5e93d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b5c5e93d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b5c5e93d

Branch: refs/heads/branch-1.3
Commit: b5c5e93d71741f3daa7bf9b3de838c36bd234511
Parents: e0f5fb0
Author: Yin Huai <yhuai@databricks.com>
Authored: Fri Feb 27 01:01:32 2015 +0800
Committer: Cheng Lian <lian@databricks.com>
Committed: Fri Feb 27 01:04:24 2015 +0800

----------------------------------------------------------------------
 .../sql/parquet/ParquetTableOperations.scala    | 49 ++++----------------
 .../apache/spark/sql/parquet/newParquet.scala   |  8 +++-
 .../spark/sql/parquet/parquetSuites.scala       | 27 +++++++++++
 3 files changed, 42 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b5c5e93d/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 4dc13b0..9061d3f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -374,8 +374,6 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
 private[parquet] class FilteringParquetRowInputFormat
   extends parquet.hadoop.ParquetInputFormat[Row] with Logging {
 
-  private var footers: JList[Footer] = _
-
   private var fileStatuses = Map.empty[Path, FileStatus]
 
   override def createRecordReader(
@@ -396,46 +394,15 @@ private[parquet] class FilteringParquetRowInputFormat
     }
   }
 
-  override def getFooters(jobContext: JobContext): JList[Footer] = {
-    import org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.footerCache
-
-    if (footers eq null) {
-      val conf = ContextUtil.getConfiguration(jobContext)
-      val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)
-      val statuses = listStatus(jobContext)
-      fileStatuses = statuses.map(file => file.getPath -> file).toMap
-      if (statuses.isEmpty) {
-        footers = Collections.emptyList[Footer]
-      } else if (!cacheMetadata) {
-        // Read the footers from HDFS
-        footers = getFooters(conf, statuses)
-      } else {
-        // Read only the footers that are not in the footerCache
-        val foundFooters = footerCache.getAllPresent(statuses)
-        val toFetch = new ArrayList[FileStatus]
-        for (s <- statuses) {
-          if (!foundFooters.containsKey(s)) {
-            toFetch.add(s)
-          }
-        }
-        val newFooters = new mutable.HashMap[FileStatus, Footer]
-        if (toFetch.size > 0) {
-          val startFetch = System.currentTimeMillis
-          val fetched = getFooters(conf, toFetch)
-          logInfo(s"Fetched $toFetch footers in ${System.currentTimeMillis - startFetch}
ms")
-          for ((status, i) <- toFetch.zipWithIndex) {
-            newFooters(status) = fetched.get(i)
-          }
-          footerCache.putAll(newFooters)
-        }
-        footers = new ArrayList[Footer](statuses.size)
-        for (status <- statuses) {
-          footers.add(newFooters.getOrElse(status, foundFooters.get(status)))
-        }
-      }
-    }
+  // This is only a temporary solution sicne we need to use fileStatuses in
+  // both getClientSideSplits and getTaskSideSplits. It can be removed once we get rid of
these
+  // two methods.
+  override def getSplits(jobContext: JobContext): JList[InputSplit] = {
+    // First set fileStatuses.
+    val statuses = listStatus(jobContext)
+    fileStatuses = statuses.map(file => file.getPath -> file).toMap
 
-    footers
+    super.getSplits(jobContext)
   }
 
   // TODO Remove this method and related code once PARQUET-16 is fixed

http://git-wip-us.apache.org/repos/asf/spark/blob/b5c5e93d/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 16b7713..e648618 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -200,7 +200,7 @@ private[sql] case class ParquetRelation2(
     private var commonMetadataStatuses: Array[FileStatus] = _
 
     // Parquet footer cache.
-    private var footers: Map[FileStatus, Footer] = _
+    var footers: Map[FileStatus, Footer] = _
 
     // `FileStatus` objects of all data files (Parquet part-files).
     var dataStatuses: Array[FileStatus] = _
@@ -400,6 +400,7 @@ private[sql] case class ParquetRelation2(
     } else {
       metadataCache.dataStatuses.toSeq
     }
+    val selectedFooters = selectedFiles.map(metadataCache.footers)
 
     // FileInputFormat cannot handle empty lists.
     if (selectedFiles.nonEmpty) {
@@ -447,11 +448,16 @@ private[sql] case class ParquetRelation2(
         @transient
         val cachedStatus = selectedFiles
 
+        @transient
+        val cachedFooters = selectedFooters
+
         // Overridden so we can inject our own cached files statuses.
         override def getPartitions: Array[SparkPartition] = {
           val inputFormat = if (cacheMetadata) {
             new FilteringParquetRowInputFormat {
               override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
+
+              override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
             }
           } else {
             new FilteringParquetRowInputFormat

http://git-wip-us.apache.org/repos/asf/spark/blob/b5c5e93d/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 80fd5cd..6a9d9da 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.hive.execution.{InsertIntoHiveTable, HiveTableScan}
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
+import org.apache.spark.sql.SaveMode
 
 // The data where the partitioning key exists only in the directory structure.
 case class ParquetData(intField: Int, stringField: String)
@@ -409,6 +410,32 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
       )
     """)
   }
+
+  test("SPARK-6016 make sure to use the latest footers") {
+    sql("drop table if exists spark_6016_fix")
+
+    // Create a DataFrame with two partitions. So, the created table will have two parquet
files.
+    val df1 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i}"""), 2))
+    df1.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
+    checkAnswer(
+      sql("select * from spark_6016_fix"),
+      (1 to 10).map(i => Row(i))
+    )
+
+    // Create a DataFrame with four partitions. So, the created table will have four parquet
files.
+    val df2 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"b":$i}"""), 4))
+    df2.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
+    // For the bug of SPARK-6016, we are caching two outdated footers for df1. Then,
+    // since the new table has four parquet files, we are trying to read new footers from
two files
+    // and then merge metadata in footers of these four (two outdated ones and two latest
one),
+    // which will cause an error.
+    checkAnswer(
+      sql("select * from spark_6016_fix"),
+      (1 to 10).map(i => Row(i))
+    )
+
+    sql("drop table spark_6016_fix")
+  }
 }
 
 class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message