crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mkw...@apache.org
Subject git commit: CRUNCH-304: Exposed a cleanup method for consumers
Date Fri, 29 Nov 2013 18:22:55 GMT
Updated Branches:
  refs/heads/master e3b0d13f3 -> 23bad11d6


CRUNCH-304: Exposed a cleanup method for consumers


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/23bad11d
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/23bad11d
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/23bad11d

Branch: refs/heads/master
Commit: 23bad11d6a2f89f1d069ada121f3cf6d08e7f614
Parents: e3b0d13
Author: Micah Whitacre <mkwhit@apache.org>
Authored: Fri Nov 29 10:28:42 2013 -0600
Committer: Micah Whitacre <mkwhit@apache.org>
Committed: Fri Nov 29 12:03:31 2013 -0600

----------------------------------------------------------------------
 .../main/java/org/apache/crunch/Pipeline.java   |  6 ++++
 .../org/apache/crunch/impl/mem/MemPipeline.java |  5 ++++
 .../org/apache/crunch/impl/mr/MRPipeline.java   | 29 ++++++++++++--------
 3 files changed, 28 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/23bad11d/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
index 84c720c..503ca49 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
@@ -122,6 +122,12 @@ public interface Pipeline {
   PipelineResult done();
 
   /**
+  * Cleans up any artifacts created as a result of {@link #run() running} the pipeline.
+  * @param force forces the cleanup even if all targets of the pipeline have not been completed.
+  */
+  void cleanup(boolean force);
+
+  /**
    * A convenience method for reading a text file.
    */
   PCollection<String> readTextFile(String pathName);

http://git-wip-us.apache.org/repos/asf/crunch/blob/23bad11d/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index ce411ca..0a4dbea 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -321,6 +321,11 @@ public class MemPipeline implements Pipeline {
   }
 
   @Override
+  public void cleanup(boolean force) {
+    //no-op
+  }
+
+    @Override
   public PipelineResult done() {
     return run();
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/23bad11d/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index ff95b91..3c2ab77 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -372,22 +372,27 @@ public class MRPipeline implements Pipeline {
       return input.toString();
     }
   }
-  
-  private void cleanup() {
-    if (!outputTargets.isEmpty()) {
-      LOG.warn("Not running cleanup while output targets remain");
-      return;
-    }
-    try {
-      FileSystem fs = tempDirectory.getFileSystem(conf);
-      if (fs.exists(tempDirectory)) {
-        fs.delete(tempDirectory, true);
+
+  @Override
+  public void cleanup(boolean force) {
+    if (force || outputTargets.isEmpty()) {
+      try {
+        FileSystem fs = tempDirectory.getFileSystem(conf);
+        if (fs.exists(tempDirectory)) {
+          fs.delete(tempDirectory, true);
+        }
+      } catch (IOException e) {
+        LOG.info("Exception during cleanup", e);
       }
-    } catch (IOException e) {
-      LOG.info("Exception during cleanup", e);
+    } else {
+      LOG.warn("Not running cleanup while output targets remain.");
     }
   }
 
+  private void cleanup() {
+    cleanup(false);
+  }
+
   public int getNextAnonymousStageId() {
     return nextAnonymousStageId++;
   }


Mime
View raw message