spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject spark git commit: [SPARK-8037] [SQL] Ignores files whose name starts with dot in HadoopFsRelation
Date Tue, 02 Jun 2015 17:09:33 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 8c3fc3a6c -> f71a09de6


[SPARK-8037] [SQL] Ignores files whose name starts with dot in HadoopFsRelation

Author: Cheng Lian <lian@databricks.com>

Closes #6581 from liancheng/spark-8037 and squashes the following commits:

d08e97b [Cheng Lian] Ignores files whose name starts with dot in HadoopFsRelation

(cherry picked from commit 1bb5d716c0351cd0b4c11b397fd778f30db39bd9)
Signed-off-by: Cheng Lian <lian@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f71a09de
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f71a09de
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f71a09de

Branch: refs/heads/branch-1.4
Commit: f71a09de6e8ec7222389096824689ee6a83a1bd9
Parents: 8c3fc3a
Author: Cheng Lian <lian@databricks.com>
Authored: Wed Jun 3 00:59:50 2015 +0800
Committer: Cheng Lian <lian@databricks.com>
Committed: Wed Jun 3 01:09:19 2015 +0800

----------------------------------------------------------------------
 .../spark/sql/sources/PartitioningUtils.scala    |  2 +-
 .../apache/spark/sql/sources/interfaces.scala    | 11 +++++++----
 .../parquet/ParquetPartitionDiscoverySuite.scala | 19 ++++++++++++++++++-
 3 files changed, 26 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f71a09de/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index dafdf0f..c4c99de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -187,7 +187,7 @@ private[sql] object PartitioningUtils {
       Seq.empty
     } else {
       assert(distinctPartitionsColNames.size == 1, {
-        val list = distinctPartitionsColNames.mkString("\t", "\n", "")
+        val list = distinctPartitionsColNames.mkString("\t", "\n\t", "")
         s"Conflicting partition column names detected:\n$list"
       })
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f71a09de/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index b1b997c..c4ffa8d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -379,10 +379,10 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
     var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
 
     def refresh(): Unit = {
-      // We don't filter files/directories whose name start with "_" or "." here, as specific
data
-      // sources may take advantages over them (e.g. Parquet _metadata and _common_metadata
files).
-      // But "_temporary" directories are explicitly ignored since failed tasks/jobs may
leave
-      // partial/corrupted data files there.
+      // We don't filter files/directories whose name start with "_" except "_temporary"
here, as
+      // specific data sources may take advantages over them (e.g. Parquet _metadata and
+      // _common_metadata files). "_temporary" directories are explicitly ignored since failed
+      // tasks/jobs may leave partial/corrupted data files there.
       def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = {
         if (status.getPath.getName.toLowerCase == "_temporary") {
           Set.empty
@@ -400,6 +400,9 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
         val fs = hdfsPath.getFileSystem(hadoopConf)
         val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
         Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs,
_))
+      }.filterNot { status =>
+        // SPARK-8037: Ignores files like ".DS_Store" and other hidden files/directories
+        status.getPath.getName.startsWith(".")
       }
 
       val files = statuses.filterNot(_.isDir)

http://git-wip-us.apache.org/repos/asf/spark/blob/f71a09de/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index f231589..3b29979 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -18,10 +18,11 @@ package org.apache.spark.sql.parquet
 
 import java.io.File
 import java.math.BigInteger
-import java.sql.{Timestamp, Date}
+import java.sql.Timestamp
 
 import scala.collection.mutable.ArrayBuffer
 
+import com.google.common.io.Files
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.catalyst.expressions.Literal
@@ -432,4 +433,20 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest
{
       checkAnswer(read.load(dir.toString).select(fields: _*), row)
     }
   }
+
+  test("SPARK-8037: Ignores files whose name starts with dot") {
+    withTempPath { dir =>
+      val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
+
+      df.write
+        .format("parquet")
+        .partitionBy("b", "c", "d")
+        .save(dir.getCanonicalPath)
+
+      Files.touch(new File(s"${dir.getCanonicalPath}/b=1", ".DS_Store"))
+      Files.createParentDirs(new File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar"))
+
+      checkAnswer(read.format("parquet").load(dir.getCanonicalPath), df)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message