spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject spark git commit: [SPARK-13207][SQL] Make partitioning discovery ignore _SUCCESS files.
Date Mon, 14 Mar 2016 16:03:20 GMT
Repository: spark
Updated Branches:
  refs/heads/master 31d069d4c -> 250832c73


[SPARK-13207][SQL] Make partitioning discovery ignore _SUCCESS files.

If a _SUCCESS appears in the inner partitioning dir, partition discovery will treat that _SUCCESS
file as a data file. Then, partition discovery will fail because it finds that the dir structure
is not valid. We should ignore those `_SUCCESS` files.

In future, it is better to ignore all files/dirs starting with `_` or `.`. This PR does not
make this change. I am thinking about making this change simple, so we can consider of getting
it in branch 1.6.

To ignore all files/dirs starting with `_` or `, the main change is to let ParquetRelation
have another way to get metadata files. Right now, it relies on FileStatusCache's cachedLeafStatuses,
which returns file statuses of both metadata files (e.g. metadata files used by parquet) and
data files, which requires more changes.

https://issues.apache.org/jira/browse/SPARK-13207

Author: Yin Huai <yhuai@databricks.com>

Closes #11088 from yhuai/SPARK-13207.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/250832c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/250832c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/250832c7

Branch: refs/heads/master
Commit: 250832c733602bcca034dd1fea6e325c55ccd1cd
Parents: 31d069d
Author: Yin Huai <yhuai@databricks.com>
Authored: Mon Mar 14 09:03:13 2016 -0700
Committer: Yin Huai <yhuai@databricks.com>
Committed: Mon Mar 14 09:03:13 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/sources/interfaces.scala   | 30 ++++++++++++++------
 .../ParquetPartitionDiscoverySuite.scala        | 23 +++++++++++++++
 2 files changed, 44 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/250832c7/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index be2d98c..601f944 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -522,7 +522,7 @@ class HDFSFileCatalog(
         }
       }.filterNot { status =>
         val name = status.getPath.getName
-        name.toLowerCase == "_temporary" || name.startsWith(".")
+        HadoopFsRelation.shouldFilterOut(name)
       }
 
       val (dirs, files) = statuses.partition(_.isDirectory)
@@ -616,6 +616,16 @@ class HDFSFileCatalog(
  * Helper methods for gathering metadata from HDFS.
  */
 private[sql] object HadoopFsRelation extends Logging {
+
+  /** Checks if we should filter out this path name. */
+  def shouldFilterOut(pathName: String): Boolean = {
+    // TODO: We should try to filter out all files/dirs starting with "." or "_".
+    // The only reason that we are not doing it now is that Parquet needs to find those
+    // metadata files from leaf files returned by this methods. We should refactor
+    // this logic to not mix metadata files with data files.
+    pathName == "_SUCCESS" || pathName == "_temporary" || pathName.startsWith(".")
+  }
+
   // We don't filter files/directories whose name start with "_" except "_temporary" here,
as
   // specific data sources may take advantages over them (e.g. Parquet _metadata and
   // _common_metadata files). "_temporary" directories are explicitly ignored since failed
@@ -624,19 +634,21 @@ private[sql] object HadoopFsRelation extends Logging {
   def listLeafFiles(fs: FileSystem, status: FileStatus): Array[FileStatus] = {
     logInfo(s"Listing ${status.getPath}")
     val name = status.getPath.getName.toLowerCase
-    if (name == "_temporary" || name.startsWith(".")) {
+    if (shouldFilterOut(name)) {
       Array.empty
     } else {
       // Dummy jobconf to get to the pathFilter defined in configuration
       val jobConf = new JobConf(fs.getConf, this.getClass())
       val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
-      if (pathFilter != null) {
-        val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDirectory)
-        files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
-      } else {
-        val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
-        files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
-      }
+      val statuses =
+        if (pathFilter != null) {
+          val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDirectory)
+          files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
+        } else {
+          val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
+          files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
+        }
+      statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/250832c7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index b74b9d3..0261915 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -706,6 +706,29 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest
with Sha
     }
   }
 
+  test("_SUCCESS should not break partitioning discovery") {
+    Seq(1, 32).foreach { threshold =>
+      // We have two paths to list files, one at driver side, another one that we use
+      // a Spark job. We need to test both ways.
+      withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> threshold.toString)
{
+        withTempPath { dir =>
+          val tablePath = new File(dir, "table")
+          val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
+
+          df.write
+            .format("parquet")
+            .partitionBy("b", "c", "d")
+            .save(tablePath.getCanonicalPath)
+
+          Files.touch(new File(s"${tablePath.getCanonicalPath}/b=1", "_SUCCESS"))
+          Files.touch(new File(s"${tablePath.getCanonicalPath}/b=1/c=1", "_SUCCESS"))
+          Files.touch(new File(s"${tablePath.getCanonicalPath}/b=1/c=1/d=1", "_SUCCESS"))
+          checkAnswer(sqlContext.read.format("parquet").load(tablePath.getCanonicalPath),
df)
+        }
+      }
+    }
+  }
+
   test("listConflictingPartitionColumns") {
     def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = {
       val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message