crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject crunch git commit: CRUNCH-515: Decrease collision probability on temp dir cleanup. Contributed by Sean Owen.
Date Mon, 19 Oct 2015 22:43:31 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 5d9432fbc -> 463a131b4


CRUNCH-515: Decrease collision probability on temp dir cleanup. Contributed by Sean Owen.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/463a131b
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/463a131b
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/463a131b

Branch: refs/heads/master
Commit: 463a131b445e34456c020c95ae66edf7b9f345e7
Parents: 5d9432f
Author: Josh Wills <jwills@slack-corp.com>
Authored: Mon Oct 19 12:15:10 2015 -0700
Committer: Josh Wills <jwills@slack-corp.com>
Committed: Mon Oct 19 12:15:10 2015 -0700

----------------------------------------------------------------------
 .../crunch/impl/dist/DistributedPipeline.java   | 58 +++++++++++++-------
 1 file changed, 39 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/463a131b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
index 049046a..d3fb0d0 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
@@ -99,7 +99,6 @@ public abstract class DistributedPipeline implements Pipeline {
     this.allPipelineCallables = Maps.newHashMap();
     this.appendedTargets = Sets.newHashSet();
     this.conf = conf;
-    this.tempDirectory = createTempDirectory(conf);
     this.tempFileIndex = 0;
     this.nextAnonymousStageId = 0;
   }
@@ -115,8 +114,9 @@ public abstract class DistributedPipeline implements Pipeline {
 
   @Override
   public void setConfiguration(Configuration conf) {
+    // Clear any existing temp dir
+    deleteTempDirectory();
     this.conf = conf;
-    this.tempDirectory = createTempDirectory(conf);
   }
 
   @Override
@@ -390,19 +390,22 @@ public abstract class DistributedPipeline implements Pipeline {
 
   public Path createTempPath() {
     tempFileIndex++;
-    return new Path(tempDirectory, "p" + tempFileIndex);
+    return new Path(getTempDirectory(), "p" + tempFileIndex);
   }
 
-  private static Path createTempDirectory(Configuration conf) {
-    Path dir = createTemporaryPath(conf);
-    try {
-      FileSystem fs = dir.getFileSystem(conf);
-      fs.mkdirs(dir);
-      fs.deleteOnExit(dir);
-    } catch (IOException e) {
-      throw new RuntimeException("Cannot create job output directory " + dir, e);
+  private synchronized Path getTempDirectory() {
+    if (tempDirectory == null) {
+      Path dir = createTemporaryPath(conf);
+      try {
+        FileSystem fs = dir.getFileSystem(conf);
+        fs.mkdirs(dir);
+        fs.deleteOnExit(dir);
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot create job output directory " + dir, e);
+      }
+      tempDirectory = dir;
     }
-    return dir;
+    return tempDirectory;
   }
 
   private static Path createTemporaryPath(Configuration conf) {
@@ -427,21 +430,38 @@ public abstract class DistributedPipeline implements Pipeline {
   @Override
   public void cleanup(boolean force) {
     if (force || outputTargets.isEmpty()) {
+      deleteTempDirectory();
+    } else {
+      LOG.warn("Not running cleanup while output targets remain.");
+    }
+  }
+
+  private void cleanup() {
+    cleanup(false);
+  }
+
+  private synchronized void deleteTempDirectory() {
+    Path toDelete = tempDirectory;
+    tempDirectory = null;
+    if (toDelete != null) {
       try {
-        FileSystem fs = tempDirectory.getFileSystem(conf);
-        if (fs.exists(tempDirectory)) {
-          fs.delete(tempDirectory, true);
+        FileSystem fs = toDelete.getFileSystem(conf);
+        if (fs.exists(toDelete)) {
+          fs.delete(toDelete, true);
         }
       } catch (IOException e) {
         LOG.info("Exception during cleanup", e);
       }
-    } else {
-      LOG.warn("Not running cleanup while output targets remain.");
     }
   }
 
-  private void cleanup() {
-    cleanup(false);
+  @Override
+  protected void finalize() throws Throwable {
+    if (tempDirectory != null) {
+      LOG.warn("Temp directory {} still exists; was Pipeline.done() called?", tempDirectory);
+      deleteTempDirectory();
+    }
+    super.finalize();
   }
 
   public int getNextAnonymousStageId() {


Mime
View raw message