beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [1/2] incubator-beam git commit: FileBasedSink: ignore exceptions when removing temp output files for issues in Windows OS.
Date Tue, 13 Dec 2016 05:14:28 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 5ef34f7ee -> f898ff785


FileBasedSink: ignore exceptions when removing temp output files for issues in Windows OS.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c62f0a89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c62f0a89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c62f0a89

Branch: refs/heads/master
Commit: c62f0a890dca5229353a9395d9a1f5d6628f7ca0
Parents: 5ef34f7
Author: Pei He <peihe@google.com>
Authored: Mon Dec 12 15:43:14 2016 -0800
Committer: Davor Bonaci <davor@google.com>
Committed: Mon Dec 12 21:14:06 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSink.java    | 19 +++++++++++++++----
 1 file changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c62f0a89/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 33296b4..32b8b4f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -30,7 +30,6 @@ import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -477,7 +476,14 @@ public abstract class FileBasedSink<T> extends Sink<T> {
       // (produced by successfully completed bundles).
       // This may still fail to remove temporary outputs of some failed bundles, but at least
       // the common case (where all bundles succeed) is guaranteed to be fully addressed.
-      Collection<String> matches = factory.match(factory.resolve(tempDir, "*"));
+      Set<String> matches = new HashSet<>();
+      // TODO: Windows OS cannot resolves and matches '*' in the path,
+      // ignore the exception for now to avoid failing the pipeline.
+      try {
+        matches.addAll(factory.match(factory.resolve(tempDir, "*")));
+      } catch (Exception e) {
+        LOG.warn("Failed to match temporary files under: [{}].", tempDir);
+      }
       Set<String> allMatches = new HashSet<>(matches);
       allMatches.addAll(knownFiles);
       LOG.debug(
@@ -486,8 +492,13 @@ public abstract class FileBasedSink<T> extends Sink<T> {
           tempDir,
           matches.size(),
           allMatches.size() - matches.size());
-      factory.remove(allMatches);
-      factory.remove(ImmutableList.of(tempDir));
+      // Deletion of the temporary directory might fail, if not all temporary files are removed.
+      try {
+        factory.remove(allMatches);
+        factory.remove(ImmutableList.of(tempDir));
+      } catch (Exception e) {
+        LOG.warn("Failed to remove temporary directory: [{}].", tempDir);
+      }
     }
 
     /**


Mime
View raw message