spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-19082][SQL] Make ignoreCorruptFiles work for Parquet
Date Mon, 16 Jan 2017 07:26:54 GMT
Repository: spark
Updated Branches:
  refs/heads/master de62ddf7f -> 61e48f52d


[SPARK-19082][SQL] Make ignoreCorruptFiles work for Parquet

## What changes were proposed in this pull request?

We have a config `spark.sql.files.ignoreCorruptFiles` which can be used to ignore corrupt
files when reading files in SQL. Currently the `ignoreCorruptFiles` config has two issues
and can't work for Parquet:

1. We only ignore corrupt files in `FileScanRDD` . Actually, we begin to read those files
as early as inferring data schema from the files. For corrupt files, we can't read the schema
and fail the program. A related issue reported at http://apache-spark-developers-list.1001551.n3.nabble.com/Skip-Corrupted-Parquet-blocks-footer-tc20418.html
2. In `FileScanRDD`, we assume that we only begin to read the files when starting to consume
the iterator. However, it is possibly the files are read before that. In this case, `ignoreCorruptFiles`
config doesn't work too.

This patch targets Parquet datasource. If this direction is ok, we can address the same issue
for other datasources like Orc.

Two main changes in this patch:

1. Replace `ParquetFileReader.readAllFootersInParallel` by implementing the logic to read
footers in multi-threaded manner

    We can't ignore corrupt files if we use `ParquetFileReader.readAllFootersInParallel`.
So this patch implements the logic to do the similar thing in `readParquetFootersInParallel`.

2. In `FileScanRDD`, we need to ignore corrupt file too when we call `readFunction` to return
iterator.

One thing to notice is:

We read schema from Parquet file's footer. The method to read footer `ParquetFileReader.readFooter`
throws `RuntimeException`, instead of `IOException`, if it can't successfully read the footer.
Please check out https://github.com/apache/parquet-mr/blob/df9d8e415436292ae33e1ca0b8da256640de9710/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L470.
So this patch catches `RuntimeException`.  One concern is that it might also shadow other
runtime exceptions other than reading corrupt files.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #16474 from viirya/fix-ignorecorrupted-parquet-files.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61e48f52
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61e48f52
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61e48f52

Branch: refs/heads/master
Commit: 61e48f52d1d8c7431707bd3511b6fe9f0ae996c0
Parents: de62ddf
Author: Liang-Chi Hsieh <viirya@gmail.com>
Authored: Mon Jan 16 15:26:41 2017 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Mon Jan 16 15:26:41 2017 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/UnionRDD.scala   |  2 +-
 .../sql/execution/datasources/FileScanRDD.scala | 12 +++-
 .../datasources/parquet/ParquetFileFormat.scala | 45 +++++++++++++--
 .../parquet/ParquetFileFormatSuite.scala        | 59 ++++++++++++++++++++
 .../datasources/parquet/ParquetQuerySuite.scala | 30 ++++++++++
 5 files changed, 140 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/61e48f52/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index ad1fddb..60e383a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -20,7 +20,7 @@ package org.apache.spark.rdd
 import java.io.{IOException, ObjectOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
-import scala.collection.parallel.{ForkJoinTaskSupport, ThreadPoolTaskSupport}
+import scala.collection.parallel.ForkJoinTaskSupport
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.reflect.ClassTag
 

http://git-wip-us.apache.org/repos/asf/spark/blob/61e48f52/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index dced536..d197daf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -135,7 +135,17 @@ class FileScanRDD(
           try {
             if (ignoreCorruptFiles) {
               currentIterator = new NextIterator[Object] {
-                private val internalIter = readFunction(currentFile)
+                private val internalIter = {
+                  try {
+                    // The readFunction may read files before consuming the iterator.
+                    // E.g., vectorized Parquet reader.
+                    readFunction(currentFile)
+                  } catch {
+                    case e @(_: RuntimeException | _: IOException) =>
+                      logWarning(s"Skipped the rest content in the corrupted file: $currentFile",
e)
+                      Iterator.empty
+                  }
+                }
 
                 override def getNext(): AnyRef = {
                   try {

http://git-wip-us.apache.org/repos/asf/spark/blob/61e48f52/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 0efe6da..d9831c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import java.io.IOException
 import java.net.URI
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.collection.parallel.ForkJoinTaskSupport
+import scala.concurrent.forkjoin.ForkJoinPool
 import scala.util.{Failure, Try}
 
 import org.apache.hadoop.conf.Configuration
@@ -30,6 +33,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
 import org.apache.parquet.hadoop._
 import org.apache.parquet.hadoop.codec.CodecConfig
 import org.apache.parquet.hadoop.util.ContextUtil
@@ -151,7 +155,7 @@ class ParquetFileFormat
     }
   }
 
-  def inferSchema(
+  override def inferSchema(
       sparkSession: SparkSession,
       parameters: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
@@ -543,6 +547,36 @@ object ParquetFileFormat extends Logging {
   }
 
   /**
+   * Reads Parquet footers in multi-threaded manner.
+   * If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the
corrupted
+   * files when reading footers.
+   */
+  private[parquet] def readParquetFootersInParallel(
+      conf: Configuration,
+      partFiles: Seq[FileStatus],
+      ignoreCorruptFiles: Boolean): Seq[Footer] = {
+    val parFiles = partFiles.par
+    parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
+    parFiles.flatMap { currentFile =>
+      try {
+        // Skips row group information since we only need the schema.
+        // ParquetFileReader.readFooter throws RuntimeException, instead of IOException,
+        // when it can't read the footer.
+        Some(new Footer(currentFile.getPath(),
+          ParquetFileReader.readFooter(
+            conf, currentFile, SKIP_ROW_GROUPS)))
+      } catch { case e: RuntimeException =>
+        if (ignoreCorruptFiles) {
+          logWarning(s"Skipped the footer in the corrupted file: $currentFile", e)
+          None
+        } else {
+          throw new IOException(s"Could not read footer for file: $currentFile", e)
+        }
+      }
+    }.seq
+  }
+
+  /**
    * Figures out a merged Parquet schema with a distributed Spark job.
    *
    * Note that locality is not taken into consideration here because:
@@ -582,6 +616,8 @@ object ParquetFileFormat extends Logging {
     val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
       sparkSession.sparkContext.defaultParallelism)
 
+    val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
+
     // Issues a Spark job to read Parquet schema in parallel.
     val partiallyMergedSchemas =
       sparkSession
@@ -593,13 +629,10 @@ object ParquetFileFormat extends Logging {
             new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))
           }.toSeq
 
-          // Skips row group information since we only need the schema
-          val skipRowGroups = true
-
           // Reads footers in multi-threaded manner within each task
           val footers =
-            ParquetFileReader.readAllFootersInParallel(
-              serializedConf.value, fakeFileStatuses.asJava, skipRowGroups).asScala
+            ParquetFileFormat.readParquetFootersInParallel(
+              serializedConf.value, fakeFileStatuses, ignoreCorruptFiles)
 
           // Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
           val converter =

http://git-wip-us.apache.org/repos/asf/spark/blob/61e48f52/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala
new file mode 100644
index 0000000..ccb3435
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetFileFormatSuite extends QueryTest with ParquetTest with SharedSQLContext {
+
+  test("read parquet footers in parallel") {
+    def testReadFooters(ignoreCorruptFiles: Boolean): Unit = {
+      withTempDir { dir =>
+        val fs = FileSystem.get(sparkContext.hadoopConfiguration)
+        val basePath = dir.getCanonicalPath
+
+        val path1 = new Path(basePath, "first")
+        val path2 = new Path(basePath, "second")
+        val path3 = new Path(basePath, "third")
+
+        spark.range(1).toDF("a").coalesce(1).write.parquet(path1.toString)
+        spark.range(1, 2).toDF("a").coalesce(1).write.parquet(path2.toString)
+        spark.range(2, 3).toDF("a").coalesce(1).write.json(path3.toString)
+
+        val fileStatuses =
+          Seq(fs.listStatus(path1), fs.listStatus(path2), fs.listStatus(path3)).flatten
+
+        val footers = ParquetFileFormat.readParquetFootersInParallel(
+          sparkContext.hadoopConfiguration, fileStatuses, ignoreCorruptFiles)
+
+        assert(footers.size == 2)
+      }
+    }
+
+    testReadFooters(true)
+    val exception = intercept[java.io.IOException] {
+      testReadFooters(false)
+    }
+    assert(exception.getMessage().contains("Could not read footer for file"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/61e48f52/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index ee7f2d0..d7d7176 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.parquet.hadoop.ParquetOutputFormat
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
@@ -217,6 +218,35 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
     }
   }
 
+  test("Enabling/disabling ignoreCorruptFiles") {
+    def testIgnoreCorruptFiles(): Unit = {
+      withTempDir { dir =>
+        val basePath = dir.getCanonicalPath
+        spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
+        spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
+        spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
+        val df = spark.read.parquet(
+          new Path(basePath, "first").toString,
+          new Path(basePath, "second").toString,
+          new Path(basePath, "third").toString)
+        checkAnswer(
+          df,
+          Seq(Row(0), Row(1)))
+      }
+    }
+
+    withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+      testIgnoreCorruptFiles()
+    }
+
+    withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+      val exception = intercept[SparkException] {
+        testIgnoreCorruptFiles()
+      }
+      assert(exception.getMessage().contains("is not a Parquet file"))
+    }
+  }
+
   test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
     withTempPath { dir =>
       val basePath = dir.getCanonicalPath


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message