spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [SPARK-18960][SQL][SS] Avoid double reading file which is being copied.
Date Wed, 28 Dec 2016 10:42:52 GMT
Repository: spark
Updated Branches:
  refs/heads/master 67fb33e7e -> 76e9bd748


[SPARK-18960][SQL][SS] Avoid double reading file which is being copied.

## What changes were proposed in this pull request?

In HDFS, when we copy a file into target directory, there will a temporary `._COPY_` file
for a period of time. The duration depends on file size. If we do not skip this file, we will
may read the same data for two times.

## How was this patch tested?
update unit test

Author: uncleGen <hustyugm@gmail.com>

Closes #16370 from uncleGen/SPARK-18960.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76e9bd74
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76e9bd74
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76e9bd74

Branch: refs/heads/master
Commit: 76e9bd74885a99462ed0957aad37cbead7f14de2
Parents: 67fb33e
Author: uncleGen <hustyugm@gmail.com>
Authored: Wed Dec 28 10:42:47 2016 +0000
Committer: Sean Owen <sowen@cloudera.com>
Committed: Wed Dec 28 10:42:47 2016 +0000

----------------------------------------------------------------------
 .../datasources/PartitioningAwareFileIndex.scala         | 11 ++++++++---
 .../spark/sql/execution/datasources/FileIndexSuite.scala |  1 +
 2 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/76e9bd74/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 825a0f7..82c1599 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -439,10 +439,15 @@ object PartitioningAwareFileIndex extends Logging {
 
   /** Checks if we should filter out this path name. */
   def shouldFilterOut(pathName: String): Boolean = {
-    // We filter everything that starts with _ and ., except _common_metadata and _metadata
+    // We filter follow paths:
+    // 1. everything that starts with _ and ., except _common_metadata and _metadata
     // because Parquet needs to find those metadata files from leaf files returned by this
method.
     // We should refactor this logic to not mix metadata files with data files.
-    ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith("."))
&&
-      !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
+    // 2. everything that ends with `._COPYING_`, because this is a intermediate state of
file. we
+    // should skip this file in case of double reading.
+    val exclude = (pathName.startsWith("_") && !pathName.contains("=")) ||
+      pathName.startsWith(".") || pathName.endsWith("._COPYING_")
+    val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata")
+    exclude && !include
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/76e9bd74/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index b7a472b..2b4c9f3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -142,6 +142,7 @@ class FileIndexSuite extends SharedSQLContext {
     assert(!PartitioningAwareFileIndex.shouldFilterOut("_common_metadata"))
     assert(PartitioningAwareFileIndex.shouldFilterOut("_ab_metadata"))
     assert(PartitioningAwareFileIndex.shouldFilterOut("_cd_common_metadata"))
+    assert(PartitioningAwareFileIndex.shouldFilterOut("a._COPYING_"))
   }
 
   test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message