crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-218: Add a WriteMode for checkpoint outputs, and make invalid checkpoint targets throw a CrunchRuntimeException.
Date Sun, 30 Jun 2013 16:08:50 GMT
Updated Branches:
  refs/heads/master 246109962 -> 1082111c7


CRUNCH-218: Add a WriteMode for checkpoint outputs, and make invalid
checkpoint targets throw a CrunchRuntimeException.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/1082111c
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/1082111c
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/1082111c

Branch: refs/heads/master
Commit: 1082111c741916f62208cf783a0e69850fe60018
Parents: 2461099
Author: Josh Wills <jwills@apache.org>
Authored: Wed Jun 12 10:22:29 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Sat Jun 29 08:37:22 2013 -0700

----------------------------------------------------------------------
 .../crunch/contrib/io/jdbc/DataBaseSource.java  |   7 +-
 .../it/java/org/apache/crunch/CheckpointIT.java | 100 +++++++++++++++++++
 .../src/main/java/org/apache/crunch/Source.java |   7 ++
 .../src/main/java/org/apache/crunch/Target.java |  11 +-
 .../org/apache/crunch/impl/mem/MemPipeline.java |   2 +-
 .../org/apache/crunch/impl/mr/MRPipeline.java   |  13 ++-
 .../impl/mr/collect/DoCollectionImpl.java       |   5 +
 .../crunch/impl/mr/collect/DoTableImpl.java     |   5 +
 .../crunch/impl/mr/collect/InputCollection.java |   5 +
 .../crunch/impl/mr/collect/InputTable.java      |   5 +
 .../crunch/impl/mr/collect/PCollectionImpl.java |   2 +
 .../impl/mr/collect/PGroupedTableImpl.java      |   5 +
 .../crunch/impl/mr/collect/UnionCollection.java |  11 +-
 .../crunch/impl/mr/collect/UnionTable.java      |  11 +-
 .../crunch/impl/mr/exec/CrunchJobHooks.java     |   2 +-
 .../apache/crunch/io/SourceTargetHelper.java    |  14 +++
 .../apache/crunch/io/impl/FileSourceImpl.java   |  19 +++-
 .../apache/crunch/io/impl/FileTargetImpl.java   |  32 +++++-
 .../apache/crunch/io/impl/SourceTargetImpl.java |  12 ++-
 .../impl/mr/collect/DoCollectionImplTest.java   |   4 +
 .../crunch/io/hbase/HBaseSourceTarget.java      |  10 ++
 .../org/apache/crunch/io/hbase/HBaseTarget.java |   3 +-
 22 files changed, 270 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
index 23ca685..6ba5e06 100644
--- a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
@@ -114,8 +114,13 @@ public class DataBaseSource<T extends DBWritable & Writable>
implements Source<T
   }
 
   @Override
+  public long getLastModifiedAt(Configuration configuration) {
+    return -1;
+  }
+
+  @Override
   public PType<T> getType() {
     return ptype;
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/it/java/org/apache/crunch/CheckpointIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/CheckpointIT.java b/crunch-core/src/it/java/org/apache/crunch/CheckpointIT.java
new file mode 100644
index 0000000..acb039d
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/CheckpointIT.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.Target.WriteMode;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class CheckpointIT {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+  
+  @Test
+  public void testCheckpoints() throws Exception {
+    String inputPath = tmpDir.copyResourceFileName("shakes.txt");
+    Pipeline p = new MRPipeline(CheckpointIT.class);
+    String inter = tmpDir.getFileName("intermediate");
+    PipelineResult one = run(p, tmpDir, inputPath, inter, false);
+    assertTrue(one.succeeded());
+    assertEquals(2, one.getStageResults().size());
+    PipelineResult two = run(p, tmpDir, inputPath, inter, false);
+    assertTrue(two.succeeded());
+    assertEquals(1, two.getStageResults().size());
+  }
+  
+  @Test
+  public void testUnsuccessfulCheckpoint() throws Exception {
+    String inputPath = tmpDir.copyResourceFileName("shakes.txt");
+    Pipeline p = new MRPipeline(CheckpointIT.class);
+    String inter = tmpDir.getFileName("intermediate");
+    PipelineResult one = run(p, tmpDir, inputPath, inter, true);
+    assertFalse(one.succeeded());
+    PipelineResult two = run(p, tmpDir, inputPath, inter, false);
+    assertTrue(two.succeeded());
+    assertEquals(2, two.getStageResults().size());
+  }
+  
+  @Test
+  public void testModifiedFileCheckpoint() throws Exception {
+    String inputPath = tmpDir.copyResourceFileName("shakes.txt");
+    Pipeline p = new MRPipeline(CheckpointIT.class);
+    Path inter = tmpDir.getPath("intermediate");
+    PipelineResult one = run(p, tmpDir, inputPath, inter.toString(), false);
+    assertTrue(one.succeeded());
+    assertEquals(2, one.getStageResults().size());
+    // Update the input path
+    inputPath = tmpDir.copyResourceFileName("shakes.txt");
+    PipelineResult two = run(p, tmpDir, inputPath, inter.toString(), false);
+    assertTrue(two.succeeded());
+    assertEquals(2, two.getStageResults().size());
+  }
+  
+  public static PipelineResult run(Pipeline pipeline, TemporaryPath tmpDir, 
+      String shakesInputPath, String intermediatePath,
+      final boolean fail)
+      throws Exception {
+    PCollection<String> shakes = pipeline.readTextFile(shakesInputPath);
+    PTable<String, Long> cnts = shakes.parallelDo("split words", new DoFn<String,
String>() {
+      @Override
+      public void process(String line, Emitter<String> emitter) {
+        if (fail) {
+          throw new RuntimeException("Failure!");
+        }
+        for (String word : line.split("\\s+")) {
+          emitter.emit(word);
+        }
+      }
+    }, Avros.strings()).count();
+    cnts.write(To.avroFile(intermediatePath), WriteMode.CHECKPOINT);
+    PTable<String, Long> singleCounts = cnts.keys().count();
+    singleCounts.write(To.textFile(tmpDir.getFileName("singleCounts")), WriteMode.OVERWRITE);
+    return pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/Source.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Source.java b/crunch-core/src/main/java/org/apache/crunch/Source.java
index f54d135..b744c8f 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Source.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Source.java
@@ -49,4 +49,11 @@ public interface Source<T> {
    * Returns the number of bytes in this {@code Source}.
    */
   long getSize(Configuration configuration);
+  
+  /**
+   * Returns the time (in milliseconds) that this {@code Source} was most recently
+   * modified (e.g., because an input file was edited or new files were added to
+   * a directory.)
+   */
+  long getLastModifiedAt(Configuration configuration);
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/Target.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Target.java b/crunch-core/src/main/java/org/apache/crunch/Target.java
index 0a0c23d..48dc2cd 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Target.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Target.java
@@ -50,7 +50,13 @@ public interface Target {
      * add the output of this pipeline to the target. This was the
      * behavior in Crunch up to version 0.4.0.
      */
-    APPEND
+    APPEND,
+    
+    /**
+     * If the output target exists and is newer than any of its source inputs, don't rewrite
it,
+     * just start the pipeline from here. Only works with {@code SourceTarget} instances.
+     */
+    CHECKPOINT
   }
 
   /**
@@ -58,8 +64,9 @@ public interface Target {
    * 
    * @param writeMode The strategy for handling existing outputs
    * @param conf The ever-useful {@code Configuration} instance
+   * @return true if the target did exist
    */
-  void handleExisting(WriteMode writeMode, Configuration conf);
+  boolean handleExisting(WriteMode writeMode, long lastModifiedAt, Configuration conf);
   
   /**
    * Checks to see if this {@code Target} instance is compatible with the

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 9001e51..60677fc 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -176,7 +176,7 @@ public class MemPipeline implements Pipeline {
   
   @Override
   public void write(PCollection<?> collection, Target target, Target.WriteMode writeMode)
{
-    target.handleExisting(writeMode, getConfiguration());
+    target.handleExisting(writeMode, -1, getConfiguration());
     if (writeMode != Target.WriteMode.APPEND && activeTargets.contains(target)) {
       throw new CrunchRuntimeException("Target " + target
           + " is already written in the current run."

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index f167846..4fb2876 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -209,8 +209,17 @@ public class MRPipeline implements Pipeline {
       pcollection = pcollection.parallelDo("UnionCollectionWrapper",
           (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
     }
-    target.handleExisting(writeMode, getConfiguration());
-    if (writeMode != WriteMode.APPEND && targetInCurrentRun(target)) {
+    boolean exists = target.handleExisting(writeMode, ((PCollectionImpl) pcollection).getLastModifiedAt(),
+        getConfiguration());
+    if (exists && writeMode == WriteMode.CHECKPOINT) {
+      SourceTarget<?> st = target.asSourceTarget(pcollection.getPType());
+      if (st == null) {
+        throw new CrunchRuntimeException("Target " + target + " does not support checkpointing");
+      } else {
+        ((PCollectionImpl) pcollection).materializeAt(st);
+      }
+      return;
+    } else if (writeMode != WriteMode.APPEND && targetInCurrentRun(target)) {
       throw new CrunchRuntimeException("Target " + target + " is already written in current
run." +
           " Use WriteMode.APPEND in order to write additional data to it.");
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
index 8881e3f..917ef65 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
@@ -68,4 +68,9 @@ public class DoCollectionImpl<S> extends PCollectionImpl<S>
{
   public DoNode createDoNode() {
     return DoNode.createFnNode(getName(), fn, ntype);
   }
+
+  @Override
+  public long getLastModifiedAt() {
+    return parent.getLastModifiedAt();
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
index 176643b..5329c7a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
@@ -81,4 +81,9 @@ public class DoTableImpl<K, V> extends PTableBase<K, V> implements
PTable<K, V>
   public boolean hasCombineFn() {
     return fn instanceof CombineFn;
   }
+  
+  @Override
+  public long getLastModifiedAt() {
+    return parent.getLastModifiedAt();
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
index ace5cc1..a4958e7 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
@@ -71,6 +71,11 @@ public class InputCollection<S> extends PCollectionImpl<S>
{
   }
 
   @Override
+  public long getLastModifiedAt() {
+    return source.getLastModifiedAt(pipeline.getConfiguration());
+  }
+  
+  @Override
   public boolean equals(Object obj) {
     if (obj == null || !(obj instanceof InputCollection)) {
       return false;

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
index 71f11c5..8317452 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
@@ -75,6 +75,11 @@ public class InputTable<K, V> extends PTableBase<K, V> {
   }
 
   @Override
+  public long getLastModifiedAt() {
+    return source.getLastModifiedAt(pipeline.getConfiguration());
+  }
+  
+  @Override
   public int hashCode() {
     return asCollection.hashCode();
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index 6ea9c4c..b5f1cef 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -284,6 +284,8 @@ public abstract class PCollectionImpl<S> implements PCollection<S>
{
 
   protected abstract long getSizeInternal();
   
+  public abstract long getLastModifiedAt();
+  
   /**
    * Retrieve the PCollectionImpl to be used for chaining within PCollectionImpls further
down the pipeline.
    * @return The PCollectionImpl instance to be chained

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
index d277b75..c385e16 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
@@ -154,6 +154,11 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K,
Iterable<V>
   }
   
   @Override
+  public long getLastModifiedAt() {
+    return parent.getLastModifiedAt();
+  }
+  
+  @Override
   protected PCollectionImpl<Pair<K, Iterable<V>>> getChainingCollection()
{
     // Use a copy for chaining to allow sending the output of a single grouped table to multiple
outputs
     // TODO This should be implemented in a cleaner way in the planner

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
index 7b3dd7b..b6e1fdd 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
@@ -29,7 +29,8 @@ public class UnionCollection<S> extends PCollectionImpl<S> {
 
   private List<PCollectionImpl<S>> parents;
   private long size = 0;
-
+  private long lastModifiedAt = -1;
+  
   private static <S> String flatName(List<PCollectionImpl<S>> collections)
{
     StringBuilder sb = new StringBuilder("union(");
     for (int i = 0; i < collections.size(); i++) {
@@ -50,6 +51,9 @@ public class UnionCollection<S> extends PCollectionImpl<S> {
         throw new IllegalStateException("Cannot union PCollections from different Pipeline
instances");
       }
       size += parent.getSize();
+      if (parent.getLastModifiedAt() > lastModifiedAt) {
+        this.lastModifiedAt = parent.getLastModifiedAt();
+      }
     }
   }
 
@@ -59,6 +63,11 @@ public class UnionCollection<S> extends PCollectionImpl<S>
{
   }
 
   @Override
+  public long getLastModifiedAt() {
+    return lastModifiedAt;
+  }
+  
+  @Override
   protected void acceptInternal(PCollectionImpl.Visitor visitor) {
     visitor.visitUnionCollection(this);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
index a369432..91f518a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
@@ -33,7 +33,8 @@ public class UnionTable<K, V> extends PTableBase<K, V> {
   private PTableType<K, V> ptype;
   private List<PCollectionImpl<Pair<K, V>>> parents;
   private long size;
-
+  private long lastModifiedAt = -1;
+  
   private static <K, V> String flatName(List<PTableBase<K, V>> tables)
{
     StringBuilder sb = new StringBuilder("union(");
     for (int i = 0; i < tables.size(); i++) {
@@ -56,6 +57,9 @@ public class UnionTable<K, V> extends PTableBase<K, V> {
       }
       this.parents.add(parent);
       size += parent.getSize();
+      if (parent.getLastModifiedAt() > lastModifiedAt) {
+        this.lastModifiedAt = parent.getLastModifiedAt();
+      }
     }
   }
 
@@ -65,6 +69,11 @@ public class UnionTable<K, V> extends PTableBase<K, V> {
   }
 
   @Override
+  public long getLastModifiedAt() {
+    return lastModifiedAt;
+  }
+  
+  @Override
   public PTableType<K, V> getPTableType() {
     return ptype;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
index b06847b..dee80f1 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
@@ -79,7 +79,7 @@ public final class CrunchJobHooks {
     }
 
     private synchronized void handleMultiPaths() throws IOException {
-      if (!multiPaths.isEmpty()) {
+      if (job.isSuccessful() && !multiPaths.isEmpty()) {
         // Need to handle moving the data from the output directory of the
         // job to the output locations specified in the paths.
         FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration());

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/SourceTargetHelper.java b/crunch-core/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
index f4400de..66764cf 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
@@ -45,4 +45,18 @@ public class SourceTargetHelper {
     }
     return size;
   }
+  
+  public static long getLastModifiedAt(FileSystem fs, Path path) throws IOException {
+    FileStatus[] stati = fs.globStatus(path);
+    if (stati == null || stati.length == 0) {
+      return -1L;
+    }
+    long lastMod = -1;
+    for (FileStatus status : stati) {
+      if (lastMod < status.getModificationTime()) {
+        lastMod = status.getModificationTime();
+      }
+    }
+    return lastMod;
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
index 44139b0..b232abb 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -31,9 +31,7 @@ import org.apache.crunch.io.CrunchInputs;
 import org.apache.crunch.io.FileReaderFactory;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.SourceTargetHelper;
-import org.apache.crunch.io.avro.AvroFileReaderFactory;
 import org.apache.crunch.types.PType;
-import org.apache.crunch.types.avro.AvroType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -138,6 +136,23 @@ public class FileSourceImpl<T> implements Source<T> {
   }
 
   @Override
+  public long getLastModifiedAt(Configuration conf) {
+    long lastMod = -1;
+    for (Path path : paths) {
+      try {
+        FileSystem fs = path.getFileSystem(conf);
+        long lm = SourceTargetHelper.getLastModifiedAt(fs, path);
+        if (lm > lastMod) {
+          lastMod = lm;
+        }
+      } catch (IOException e) {
+        LOG.error("Could not determine last modification time for source: " + toString(),
e);
+      }
+    }
+    return lastMod;
+  }
+
+  @Override
   public boolean equals(Object other) {
     if (other == null || !getClass().equals(other.getClass())) {
       return false;

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index 4e3dd9a..50a1fd3 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -31,6 +31,7 @@ import org.apache.crunch.io.CrunchOutputs;
 import org.apache.crunch.io.FileNamingScheme;
 import org.apache.crunch.io.OutputHandler;
 import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.io.SourceTargetHelper;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
@@ -103,6 +104,11 @@ public class FileTargetImpl implements PathTarget {
         FileUtil.copy(srcFs, s, dstFs, d, true, true, conf);
       }
     }
+    dstFs.create(getSuccessIndicator(), true).close();
+  }
+  
+  private Path getSuccessIndicator() {
+    return new Path(path, "_SUCCESS");
   }
   
   protected Path getSourcePattern(Path workingPath, int index) {
@@ -184,7 +190,7 @@ public class FileTargetImpl implements PathTarget {
   }
 
   @Override
-  public void handleExisting(WriteMode strategy, Configuration conf) {
+  public boolean handleExisting(WriteMode strategy, long lastModForSource, Configuration
conf) {
     FileSystem fs = null;
     try {
       fs = path.getFileSystem(conf);
@@ -194,8 +200,14 @@ public class FileTargetImpl implements PathTarget {
     }
     
     boolean exists = false;
+    boolean successful = false;
+    long lastModForTarget = -1;
     try {
       exists = fs.exists(path);
+      if (exists) {
+        successful = fs.exists(getSuccessIndicator());
+        lastModForTarget = SourceTargetHelper.getLastModifiedAt(fs, path);
+      }
     } catch (IOException e) {
       LOG.error("Exception checking existence of path: " + path, e);
       throw new CrunchRuntimeException(e);
@@ -217,11 +229,29 @@ public class FileTargetImpl implements PathTarget {
       case APPEND:
         LOG.info("Adding output files to existing path: " + path);
         break;
+      case CHECKPOINT:
+        if (successful && lastModForTarget > lastModForSource) {
+          LOG.info("Re-starting pipeline from checkpoint path: " + path);
+          break;
+        } else {
+          if (!successful) {
+            LOG.info("_SUCCESS file not found, Removing data at existing checkpoint path:
" + path);
+          } else {
+            LOG.info("Source data has recent updates. Removing data at existing checkpoint
path: " + path);
+          }
+          try {
+            fs.delete(path, true);
+          } catch (IOException e) {
+            LOG.error("Exception thrown removing data at checkpoint path: " + path, e);
+          }
+          return false;
+        }
       default:
         throw new CrunchRuntimeException("Unknown WriteMode:  " + strategy);
       }
     } else {
       LOG.info("Will write output files to new path: " + path);
     }
+    return exists;
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
index 4d2b88a..5dd4d69 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
@@ -60,6 +60,9 @@ class SourceTargetImpl<T> implements SourceTarget<T> {
 
   @Override
   public <S> SourceTarget<S> asSourceTarget(PType<S> ptype) {
+    if (ptype != null && ptype.equals(source.getType())) {
+      return (SourceTarget<S>) this;
+    }
     return target.asSourceTarget(ptype);
   }
 
@@ -83,7 +86,12 @@ class SourceTargetImpl<T> implements SourceTarget<T> {
   }
 
   @Override
-  public void handleExisting(WriteMode strategy, Configuration conf) {
-    target.handleExisting(strategy, conf);  
+  public boolean handleExisting(WriteMode strategy, long lastModifiedAt, Configuration conf)
{
+    return target.handleExisting(strategy, lastModifiedAt, conf);  
+  }
+
+  @Override
+  public long getLastModifiedAt(Configuration configuration) {
+    return source.getLastModifiedAt(configuration);
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
b/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
index fd582bc..b025119 100644
--- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
@@ -107,6 +107,10 @@ public class DoCollectionImplTest {
       return internalSize;
     }
 
+    @Override
+    public long getLastModifiedAt() {
+      return -1;
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index 230c701..6a5a124 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -22,6 +22,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.Pair;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.TableSource;
@@ -45,6 +47,8 @@ import org.apache.hadoop.mapreduce.Job;
 public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<ImmutableBytesWritable,
Result>>,
     TableSource<ImmutableBytesWritable, Result> {
 
+  private static final Log LOG = LogFactory.getLog(HBaseSourceTarget.class);
+  
   private static final PTableType<ImmutableBytesWritable, Result> PTYPE = Writables.tableOf(
       Writables.writables(ImmutableBytesWritable.class), Writables.writables(Result.class));
 
@@ -120,4 +124,10 @@ public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<
     // TODO something smarter here.
     return 1000L * 1000L * 1000L;
   }
+
+  @Override
+  public long getLastModifiedAt(Configuration configuration) {
+    LOG.warn("Cannot determine last modified time for source: " + toString());
+    return -1;
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/1082111c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
index eceb31d..83d62c8 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -116,7 +116,8 @@ public class HBaseTarget implements MapReduceTarget {
   }
 
   @Override
-  public void handleExisting(WriteMode strategy, Configuration conf) {
+  public boolean handleExisting(WriteMode strategy, long lastModifiedAt, Configuration conf)
{
     LOG.info("HBaseTarget ignores checks for existing outputs...");
+    return false;
   }
 }


Mime
View raw message