crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [2/2] git commit: CRUNCH-449: Add a new PipelineCallable and sequentialDo method for inserting non-Crunch tasks into Crunch workflows.
Date Sat, 02 Aug 2014 16:22:32 GMT
CRUNCH-449: Add a new PipelineCallable and sequentialDo method
for inserting non-Crunch tasks into Crunch workflows.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/a5c59276
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/a5c59276
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/a5c59276

Branch: refs/heads/apache-crunch-0.8
Commit: a5c5927683ab1d7ebe11138246b4a247e2ad0155
Parents: b6791f8
Author: Josh Wills <jwills@apache.org>
Authored: Fri Jun 20 22:07:15 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Sat Aug 2 09:15:34 2014 -0700

----------------------------------------------------------------------
 .../org/apache/crunch/PipelineCallableIT.java   | 104 ++++++++
 .../java/org/apache/crunch/PCollection.java     |  12 +
 .../org/apache/crunch/ParallelDoOptions.java    |  35 ++-
 .../main/java/org/apache/crunch/Pipeline.java   |  23 ++
 .../org/apache/crunch/PipelineCallable.java     | 244 +++++++++++++++++++
 .../lib/jobcontrol/CrunchControlledJob.java     |  13 +-
 .../lib/jobcontrol/CrunchJobControl.java        |  89 ++++++-
 .../crunch/impl/dist/DistributedPipeline.java   |  41 +++-
 .../impl/dist/collect/BaseGroupedTable.java     |   5 +-
 .../impl/dist/collect/BaseInputCollection.java  |  11 +
 .../impl/dist/collect/BaseInputTable.java       |  17 +-
 .../impl/dist/collect/PCollectionFactory.java   |  10 +-
 .../impl/dist/collect/PCollectionImpl.java      |  38 ++-
 .../crunch/impl/dist/collect/PTableBase.java    |   4 +-
 .../org/apache/crunch/impl/mem/MemPipeline.java |  14 ++
 .../crunch/impl/mem/collect/MemCollection.java  |   7 +
 .../org/apache/crunch/impl/mr/MRPipeline.java   |   3 +-
 .../crunch/impl/mr/collect/InputCollection.java |   5 +-
 .../crunch/impl/mr/collect/InputTable.java      |   5 +-
 .../impl/mr/collect/MRCollectionFactory.java    |  14 +-
 .../apache/crunch/impl/mr/exec/MRExecutor.java  |  55 +++--
 .../crunch/impl/mr/plan/JobPrototype.java       |   5 +-
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java |  20 +-
 .../lib/jobcontrol/CrunchJobControlTest.java    |  84 ++++++-
 .../apache/crunch/scrunch/PCollectionLike.scala |   8 +
 .../apache/crunch/scrunch/PipelineLike.scala    |   4 +
 .../apache/crunch/SparkPipelineCallableIT.java  |  99 ++++++++
 .../apache/crunch/impl/spark/SparkPipeline.java |   3 +-
 .../apache/crunch/impl/spark/SparkRuntime.java  |  89 ++++++-
 .../impl/spark/collect/InputCollection.java     |   7 +-
 .../crunch/impl/spark/collect/InputTable.java   |   5 +-
 .../impl/spark/collect/SparkCollectFactory.java |  14 +-
 32 files changed, 995 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java b/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java
new file mode 100644
index 0000000..b4fc19e
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/PipelineCallableIT.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+
+public class PipelineCallableIT {
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testMRShakes() throws Exception {
+    run(new MRPipeline(PipelineCallableIT.class, tmpDir.getDefaultConfiguration()),
+        tmpDir.copyResourceFileName("shakes.txt"), false /* fail */);
+  }
+
+  @Test
+  public void testFailure() throws Exception {
+    run(new MRPipeline(PipelineCallableIT.class, tmpDir.getDefaultConfiguration()),
+        tmpDir.copyResourceFileName("shakes.txt"), true /* fail */);
+  }
+
+  public static int INC1 = 0;
+  public static int INC2 = 0;
+
+  public static void run(Pipeline p, final String input, final boolean fail) {
+
+    PTable<String, Long> top3 = p.sequentialDo(new PipelineCallable<PCollection<String>>() {
+      @Override
+      public Status call() {
+        INC1 = 17;
+        return fail ? Status.FAILURE : Status.SUCCESS;
+      }
+
+      @Override
+      public PCollection<String> getOutput(Pipeline pipeline) {
+        return pipeline.readTextFile(input);
+      }
+    }.named("first"))
+    .sequentialDo("onInput", new PipelineCallable<PCollection<String>>() {
+      @Override
+      protected PCollection<String> getOutput(Pipeline pipeline) {
+        return getOnlyPCollection();
+      }
+
+      @Override
+      public Status call() throws Exception {
+        return Status.SUCCESS;
+      }
+    })
+    .count()
+    .sequentialDo("label", new PipelineCallable<PTable<String, Long>>() {
+      @Override
+      public Status call() {
+        INC2 = 29;
+        if (getPCollection("label") != null) {
+          return Status.SUCCESS;
+        }
+        return Status.FAILURE;
+      }
+
+      @Override
+      public PTable<String, Long> getOutput(Pipeline pipeline) {
+        return (PTable<String, Long>) getOnlyPCollection();
+      }
+    }.named("second"))
+    .top(3);
+
+    if (fail) {
+      assertFalse(p.run().succeeded());
+    } else {
+      Map<String, Long> counts = top3.materializeToMap();
+      assertEquals(ImmutableMap.of("", 788L, "Enter Macbeth.", 7L, "Exeunt.", 21L), counts);
+      assertEquals(17, INC1);
+      assertEquals(29, INC2);
+    }
+    p.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PCollection.java b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
index 878fbb9..a1d507a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
@@ -188,6 +188,18 @@ public interface PCollection<S> {
   PObject<S> first();
 
   /**
+   * Adds the materialized data in this {@code PCollection} as a dependency to the given
+   * {@code PipelineCallable} and registers it with the {@code Pipeline} associated with this
+   * instance.
+   *
+   * @param label the label to use inside of the PipelineCallable for referencing this PCollection
+   * @param pipelineCallable the function itself
+   *
+   * @return The value of the {@code getOutput} function on the given argument.
+   */
+  <Output> Output sequentialDo(String label, PipelineCallable<Output> pipelineCallable);
+
+  /**
    * @return A reference to the data in this instance that can be read from a job running
    * on a cluster.
    *

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java b/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java
index 65d0df2..24abf90 100644
--- a/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java
+++ b/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java
@@ -34,18 +34,21 @@ import org.apache.hadoop.conf.Configuration;
  * that require reading a file from the filesystem into a {@code DoFn}.
  */
 public class ParallelDoOptions {
-  private final Set<SourceTarget<?>> sourceTargets;
+  private final Set targets;
   private final Map<String, String> extraConf;
 
-  private ParallelDoOptions(Set<SourceTarget<?>> sourceTargets, Map<String, String> extraConf) {
-    this.sourceTargets = sourceTargets;
+  private ParallelDoOptions(Set<Target> targets, Map<String, String> extraConf) {
+    this.targets = targets;
     this.extraConf = extraConf;
   }
-  
+
+  @Deprecated
   public Set<SourceTarget<?>> getSourceTargets() {
-    return sourceTargets;
+    return (Set<SourceTarget<?>>) targets;
   }
 
+  public Set<Target> getTargets() { return targets; }
+
   /**
    * Applies the key-value pairs that were associated with this instance to the given {@code Configuration}
    * object. This is called just before the {@code configure} method on the {@code DoFn} corresponding to this
@@ -62,11 +65,11 @@ public class ParallelDoOptions {
   }
   
   public static class Builder {
-    private Set<SourceTarget<?>> sourceTargets;
+    private Set<Target> targets;
     private Map<String, String> extraConf;
 
     public Builder() {
-      this.sourceTargets = Sets.newHashSet();
+      this.targets = Sets.newHashSet();
       this.extraConf = Maps.newHashMap();
     }
 
@@ -78,19 +81,29 @@ public class ParallelDoOptions {
       for (Source<?> src : sources) {
         // Only SourceTargets need to be checked for materialization
         if (src instanceof SourceTarget) {
-          sourceTargets.add((SourceTarget) src);
+          targets.add((SourceTarget) src);
         }
       }
       return this;
     }
 
     public Builder sourceTargets(SourceTarget<?>... sourceTargets) {
-      Collections.addAll(this.sourceTargets, sourceTargets);
+      Collections.addAll(this.targets, sourceTargets);
       return this;
     }
 
     public Builder sourceTargets(Collection<SourceTarget<?>> sourceTargets) {
-      this.sourceTargets.addAll(sourceTargets);
+      this.targets.addAll(sourceTargets);
+      return this;
+    }
+
+    public Builder targets(Target... targets) {
+      Collections.addAll(this.targets, targets);
+      return this;
+    }
+
+    public Builder targets(Collection<Target> targets) {
+      this.targets.addAll(targets);
       return this;
     }
 
@@ -107,7 +120,7 @@ public class ParallelDoOptions {
     }
 
     public ParallelDoOptions build() {
-      return new ParallelDoOptions(sourceTargets, extraConf);
+      return new ParallelDoOptions(targets, extraConf);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
index f34d0ef..cd3f3f6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
@@ -111,11 +111,34 @@ public interface Pipeline {
    */
   <T> void cache(PCollection<T> pcollection, CachingOptions options);
 
+  /**
+   * Creates an empty {@code PCollection} of the given {@code PType}.
+   *
+   * @param ptype The PType of the empty PCollection
+   * @return A valid PCollection with no contents
+   */
   <T> PCollection<T> emptyPCollection(PType<T> ptype);
 
+  /**
+   * Creates an empty {@code PTable} of the given {@code PTable Type}.
+   *
+   * @param ptype The PTableType of the empty PTable
+   * @return A valid PTable with no contents
+   */
   <K, V> PTable<K, V> emptyPTable(PTableType<K, V> ptype);
 
   /**
+   * Executes the given {@code PipelineCallable} on the client after the {@code Targets}
+   * that the PipelineCallable depends on (if any) have been created by other pipeline
+   * processing steps.
+   *
+   * @param pipelineCallable The sequential logic to execute
+   * @param <Output> The return type of the PipelineCallable
+   * @return The result of executing the PipelineCallable
+   */
+  <Output> Output sequentialDo(PipelineCallable<Output> pipelineCallable);
+
+  /**
    * Constructs and executes a series of MapReduce jobs in order to write data
    * to the output targets.
    */

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/PipelineCallable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineCallable.java b/crunch-core/src/main/java/org/apache/crunch/PipelineCallable.java
new file mode 100644
index 0000000..e1b16aa
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/PipelineCallable.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import parquet.Preconditions;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * A specialization of {@code Callable} that executes some sequential logic on the client machine as
+ * part of an overall Crunch pipeline in order to generate zero or more outputs, some of
+ * which may be {@code PCollection} instances that are processed by other jobs in the
+ * pipeline.
+ *
+ * <p>{@code PipelineCallable} is intended to be used to inject auxiliary logic into the control
+ * flow of a Crunch pipeline. This can be used for a number of purposes, such as importing or
+ * exporting data into a cluster using Apache Sqoop, executing a legacy MapReduce job
+ * or Pig/Hive script within a Crunch pipeline, or sending emails or status notifications
+ * about the status of a long-running pipeline during its execution.</p>
+ *
+ * <p>The Crunch planner needs to know three things about a {@code PipelineCallable} instance in order
+ * to manage it:
+ * <ol>
+ *   <li>The {@code Target} and {@code PCollection} instances that must have been materialized
+ *   before this instance is allowed to run. This information should be specified via the {@code dependsOn}
+ *   methods of the class.</li>
+ *   <li>What Outputs will be created after this instance is executed, if any. These outputs may be
+ *   new {@code PCollection} instances that are used as inputs in other Crunch jobs. These outputs should
+ *   be specified by the {@code getOutput(Pipeline)} method of the class, which will be executed immediately
+ *   after this instance is registered with the {@link Pipeline#sequentialDo} method.</li>
+ *   <li>The actual logic to execute when the dependent Targets and PCollections have been created in
+ *   order to materialize the output data. This is defined in the {@code call} method of the class.</li>
+ * </ol>
+ * </p>
+ *
+ * <p>If a given PipelineCallable does not have any dependencies, it will be executed before any jobs are run
+ * by the planner. After that, the planner will keep track of when the dependencies of a given instance
+ * have been materialized, and then execute the instance as soon as they all exist. The Crunch planner
+ * uses a thread pool executor to run multiple {@code PipelineCallable} instances simultaneously, but you can
+ * indicate that an instance should be run by itself by overriding the {@code boolean runSingleThreaded()} method
+ * below to return true.</p>
+ *
+ * <p>The {@code call} method returns a {@code Status} to indicate whether it succeeded or failed. A failed
+ * instance, or any exceptions/errors thrown by the call method, will cause the overall Crunch pipeline containing
+ * this instance to fail.</p>
+ *
+ * <p>A number of helper methods for accessing the dependent Target/PCollection instances that this instance
+ * needs to exist, as well as the {@code Configuration} instance for the overall Pipeline execution, are available
+ * as protected methods in this class so that they may be accessed from implementations of {@code PipelineCallable}
+ * within the {@code call} method.
+ * </p>
+ * @param <Output> the output value returned by this instance (Void, PCollection, Pair&lt;PCollection, PCollection&gt;,
+ *                 etc.
+ */
+public abstract class PipelineCallable<Output> implements Callable<PipelineCallable.Status> {
+
+  private static final Log LOG = LogFactory.getLog(PipelineCallable.class);
+
+  public enum Status { SUCCESS, FAILURE };
+
+  private String name;
+  private String message;
+
+  private Map<String, Target> namedTargets = Maps.newHashMap();
+  private Map<String, PCollection<?>> namedPCollections = Maps.newHashMap();
+  private Configuration conf;
+
+  private boolean outputsGenerated = false;
+
+  /**
+   * Clients should override this method to define the outputs that will exist after this instance is
+   * executed. These may be PCollections, PObjects, or nothing (which can be indicated with Java's {@code Void}
+   * type and a null value.
+   *
+   * @param pipeline The pipeline that is managing the execution of this instance
+   */
+  protected abstract Output getOutput(Pipeline pipeline);
+
+  /**
+   * Override this method to indicate to the planner that this instance should not be run at the
+   * same time as any other {@code PipelineCallable} instances.
+   *
+   * @return true if this instance should run by itself, false otherwise
+   */
+  public boolean runSingleThreaded() {
+    return false;
+  }
+
+  /**
+   * Requires that the given {@code Target} exists before this instance may be
+   * executed.
+   *
+   * @param label A string that can be used to retrieve the given Target inside of the {@code call} method.
+   * @param t the {@code Target} itself
+   * @return this instance
+   */
+  public PipelineCallable<Output> dependsOn(String label, Target t) {
+    Preconditions.checkNotNull(label, "label");
+    if (outputsGenerated) {
+      throw new IllegalStateException(
+          "Dependencies may not be added to a PipelineCallable after its outputs have been generated");
+    }
+    if (namedTargets.containsKey(label)) {
+      throw new IllegalStateException("Label " + label + " cannot be reused for multiple targets");
+    }
+    this.namedTargets.put(label, t);
+    return this;
+  }
+
+  /**
+   * Requires that the given {@code PCollection} be materialized to disk before this instance may be
+   * executed.
+   *
+   * @param label A string that can be used to retrieve the given PCollection inside of the {@code call} method.
+   * @param pcollect the {@code PCollection} itself
+   * @return this instance
+   */
+  public PipelineCallable<Output> dependsOn(String label, PCollection<?> pcollect) {
+    Preconditions.checkNotNull(label, "label");
+    if (outputsGenerated) {
+      throw new IllegalStateException(
+          "Dependencies may not be added to a PipelineCallable after its outputs have been generated");
+    }
+    if (namedPCollections.containsKey(label)) {
+      throw new IllegalStateException("Label " + label + " cannot be reused for multiple PCollections");
+    }
+    this.namedPCollections.put(label, pcollect);
+    return this;
+  }
+
+  /**
+   * Called by the {@code Pipeline} when this instance is registered with {@code Pipeline#sequentialDo}. In general,
+   * clients should override the protected {@code getOutput(Pipeline)} method instead of this one.
+   */
+  public Output generateOutput(Pipeline pipeline) {
+    if (outputsGenerated == true) {
+      throw new IllegalStateException("PipelineCallable.generateOutput should only be called once");
+    }
+    outputsGenerated = true;
+    this.conf = pipeline.getConfiguration();
+    return getOutput(pipeline);
+  }
+
+  /**
+   * Returns the name of this instance.
+   */
+  public String getName() {
+    return name == null ? this.getClass().getName() : name;
+  }
+
+  /**
+   * Use the given name to identify this instance in the logs.
+   */
+  public PipelineCallable<Output> named(String name) {
+    this.name = name;
+    return this;
+  }
+
+  /**
+   * Returns a message associated with this callable's execution, especially in case of errors.
+   */
+  public String getMessage() {
+    if (message == null) {
+      LOG.warn("No message specified for PipelineCallable instance \"" + getName() +
+          "\". Consider overriding PipelineCallable.getMessage()");
+      return toString();
+    }
+    return message;
+  }
+
+  /**
+   * Sets a message associated with this callable's execution, especially in case of errors.
+   */
+  public void setMessage(String message) {
+    this.message = message;
+  }
+
+  /**
+   * The {@code Configuration} instance for the {@code Pipeline} this callable is registered with.
+   */
+  protected Configuration getConfiguration() {
+    return conf;
+  }
+
+  /**
+   * Returns the {@code Target} associated with the given label in the dependencies list,
+   * or null if no such target exists.
+   */
+  protected Target getTarget(String label) {
+    return namedTargets.get(label);
+  }
+
+  /**
+   * Returns the {@code PCollection} associated with the given label in the dependencies list,
+   * or null if no such instance exists.
+   */
+  protected PCollection getPCollection(String label) {
+    return namedPCollections.get(label);
+  }
+
+  /**
+   * Returns the only PCollection this instance depends on. Only valid in the case that this callable
+   * has precisely one dependency.
+   */
+  protected PCollection getOnlyPCollection() {
+    return Iterables.getOnlyElement(namedPCollections.values());
+  }
+
+  /**
+   * Returns the mapping of labels to PCollection dependencies for this instance.
+   */
+  public Map<String, PCollection<?>> getAllPCollections() {
+    return ImmutableMap.copyOf(namedPCollections);
+  }
+
+  /**
+   * Returns the mapping of labels to Target dependencies for this instance.
+   */
+  public Map<String, Target> getAllTargets() {
+    return ImmutableMap.copyOf(namedTargets);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
index 06d886d..ce80691 100644
--- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -19,9 +19,11 @@ package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.Target;
 import org.apache.crunch.impl.mr.MRJob;
 import org.apache.crunch.impl.mr.plan.JobNameBuilder;
 import org.apache.crunch.impl.mr.run.RuntimeParameters;
@@ -55,6 +57,7 @@ public class CrunchControlledJob implements MRJob {
   private final int jobID;
   private final Job job; // mapreduce job to be executed.
   private final JobNameBuilder jobNameBuilder;
+  private final Set<Target> allTargets;
 
   // the jobs the current job depends on
   private final List<CrunchControlledJob> dependingJobs;
@@ -77,15 +80,21 @@ public class CrunchControlledJob implements MRJob {
    *          an ID used to match with its {@link org.apache.crunch.impl.mr.plan.JobPrototype}.
    * @param job
    *          a mapreduce job to be executed.
+   * @param jobNameBuilder
+   *          code for generating a name for the executed MapReduce job.
+   * @param allTargets
+   *          the set of Targets that will exist after this job completes successfully.
    * @param prepareHook
    *          a piece of code that will run before this job is submitted.
    * @param completionHook
    *          a piece of code that will run after this job gets completed.
    */
-  public CrunchControlledJob(int jobID, Job job, JobNameBuilder jobNameBuilder, Hook prepareHook, Hook completionHook) {
+  public CrunchControlledJob(int jobID, Job job, JobNameBuilder jobNameBuilder, Set<Target> allTargets,
+                             Hook prepareHook, Hook completionHook) {
     this.jobID = jobID;
     this.job = job;
     this.jobNameBuilder = jobNameBuilder;
+    this.allTargets = allTargets;
     this.dependingJobs = Lists.newArrayList();
     this.prepareHook = prepareHook;
     this.completionHook = completionHook;
@@ -160,6 +169,8 @@ public class CrunchControlledJob implements MRJob {
     return counters;
   }
 
+  public Set<Target> getAllTargets() { return allTargets; }
+
   @Override
   public synchronized Job getJob() {
     return this.job;

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
index 8a650c7..d23d821 100644
--- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
@@ -22,10 +22,21 @@ import java.util.ArrayList;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.PipelineCallable;
+import org.apache.crunch.Target;
 import org.apache.crunch.impl.mr.MRJob.State;
 import org.apache.crunch.impl.mr.run.RuntimeParameters;
 import org.apache.hadoop.conf.Configuration;
@@ -47,6 +58,9 @@ public class CrunchJobControl {
   private Map<Integer, CrunchControlledJob> runningJobs;
   private Map<Integer, CrunchControlledJob> successfulJobs;
   private Map<Integer, CrunchControlledJob> failedJobs;
+  private Map<PipelineCallable<?>, Set<Target>> allPipelineCallables;
+  private Set<PipelineCallable<?>> activePipelineCallables;
+  private List<PipelineCallable<?>> failedCallables;
 
   private Log log = LogFactory.getLog(CrunchJobControl.class);
 
@@ -60,7 +74,8 @@ public class CrunchJobControl {
    * @param groupName
    *          a name identifying this group
    */
-  public CrunchJobControl(Configuration conf, String groupName) {
+  public CrunchJobControl(Configuration conf, String groupName,
+                          Map<PipelineCallable<?>, Set<Target>> pipelineCallables) {
     this.waitingJobs = new Hashtable<Integer, CrunchControlledJob>();
     this.readyJobs = new Hashtable<Integer, CrunchControlledJob>();
     this.runningJobs = new Hashtable<Integer, CrunchControlledJob>();
@@ -68,6 +83,9 @@ public class CrunchJobControl {
     this.failedJobs = new Hashtable<Integer, CrunchControlledJob>();
     this.groupName = groupName;
     this.maxRunningJobs = conf.getInt(RuntimeParameters.MAX_RUNNING_JOBS, 5);
+    this.allPipelineCallables = pipelineCallables;
+    this.activePipelineCallables = allPipelineCallables.keySet();
+    this.failedCallables = Lists.newArrayList();
   }
 
   private static List<CrunchControlledJob> toList(Map<Integer, CrunchControlledJob> jobs) {
@@ -189,6 +207,61 @@ public class CrunchJobControl {
     }
   }
 
+  private Set<Target> getUnfinishedTargets() {
+    Set<Target> unfinished = Sets.newHashSet();
+    for (CrunchControlledJob job : runningJobs.values()) {
+      unfinished.addAll(job.getAllTargets());
+    }
+    for (CrunchControlledJob job : readyJobs.values()) {
+      unfinished.addAll(job.getAllTargets());
+    }
+    for (CrunchControlledJob job : waitingJobs.values()) {
+      unfinished.addAll(job.getAllTargets());
+    }
+    return unfinished;
+  }
+
+  synchronized private void executeReadySeqDoFns() {
+    Set<Target> unfinished = getUnfinishedTargets();
+    Set<PipelineCallable<?>> oldPipelineCallables = activePipelineCallables;
+    this.activePipelineCallables = Sets.newHashSet();
+    List<Callable<PipelineCallable.Status>> callablesToRun = Lists.newArrayList();
+    for (final PipelineCallable<?> pipelineCallable : oldPipelineCallables) {
+      if (Sets.intersection(allPipelineCallables.get(pipelineCallable), unfinished).isEmpty()) {
+        if (pipelineCallable.runSingleThreaded()) {
+          try {
+            if (pipelineCallable.call() != PipelineCallable.Status.SUCCESS) {
+              failedCallables.add(pipelineCallable);
+            }
+          } catch (Throwable t) {
+            pipelineCallable.setMessage(t.getLocalizedMessage());
+            failedCallables.add(pipelineCallable);
+          }
+        } else {
+          callablesToRun.add(pipelineCallable);
+        }
+      } else {
+        // Still need to run this one
+       activePipelineCallables.add(pipelineCallable);
+      }
+    }
+
+    ListeningExecutorService es = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+    try {
+      List<Future<PipelineCallable.Status>> res = es.invokeAll(callablesToRun);
+      for (int i = 0; i < res.size(); i++) {
+        if (res.get(i).get() != PipelineCallable.Status.SUCCESS) {
+          failedCallables.add((PipelineCallable) callablesToRun.get(i));
+        }
+      }
+    } catch (Throwable t) {
+      t.printStackTrace();
+      failedCallables.addAll((List) callablesToRun);
+    } finally {
+      es.shutdownNow();
+    }
+  }
+
   synchronized private void startReadyJobs() {
     Map<Integer, CrunchControlledJob> oldJobs = null;
     oldJobs = this.readyJobs;
@@ -220,8 +293,16 @@ public class CrunchJobControl {
   }
 
   synchronized public boolean allFinished() {
-    return this.waitingJobs.size() == 0 && this.readyJobs.size() == 0
-        && this.runningJobs.size() == 0;
+    return (this.waitingJobs.size() == 0 && this.readyJobs.size() == 0
+        && this.runningJobs.size() == 0);
+  }
+
+  synchronized public boolean anyFailures() {
+    return this.failedJobs.size() > 0 || failedCallables.size() > 0;
+  }
+
+  public List<PipelineCallable<?>> getFailedCallables() {
+    return failedCallables;
   }
 
   /**
@@ -231,6 +312,8 @@ public class CrunchJobControl {
   public void pollJobStatusAndStartNewOnes() throws IOException, InterruptedException {
     checkRunningJobs();
     checkWaitingJobs();
+    executeReadySeqDoFns();
     startReadyJobs();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
index e595a72..6b3da5e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
@@ -25,8 +25,10 @@ import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
+import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.PipelineResult;
+import org.apache.crunch.PipelineCallable;
 import org.apache.crunch.Source;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.TableSource;
@@ -67,11 +69,13 @@ public abstract class DistributedPipeline implements Pipeline {
   protected final PCollectionFactory factory;
   protected final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
   protected final Map<PCollectionImpl<?>, MaterializableIterable<?>> outputTargetsToMaterialize;
+  protected final Map<PipelineCallable<?>, Set<Target>> allPipelineCallables;
   private Path tempDirectory;
   private int tempFileIndex;
   private int nextAnonymousStageId;
 
   private Configuration conf;
+  private PipelineCallable currentPipelineCallable;
 
   /**
    * Instantiate with a custom name and configuration.
@@ -84,6 +88,7 @@ public abstract class DistributedPipeline implements Pipeline {
     this.factory = factory;
     this.outputTargets = Maps.newHashMap();
     this.outputTargetsToMaterialize = Maps.newHashMap();
+    this.allPipelineCallables = Maps.newHashMap();
     this.conf = conf;
     this.tempDirectory = createTempDirectory(conf);
     this.tempFileIndex = 0;
@@ -115,12 +120,44 @@ public abstract class DistributedPipeline implements Pipeline {
     return res;
   }
 
+  @Override
+  public <Output> Output sequentialDo(PipelineCallable<Output> pipelineCallable) {
+    allPipelineCallables.put(pipelineCallable, getDependencies(pipelineCallable));
+    PipelineCallable last = currentPipelineCallable;
+    currentPipelineCallable = pipelineCallable;
+    Output out = pipelineCallable.generateOutput(this);
+    currentPipelineCallable = last;
+    return out;
+  }
+
   public <S> PCollection<S> read(Source<S> source) {
-    return factory.createInputCollection(source, this);
+    return factory.createInputCollection(source, this, getCurrentPDoOptions());
   }
 
   public <K, V> PTable<K, V> read(TableSource<K, V> source) {
-    return factory.createInputTable(source, this);
+    return factory.createInputTable(source, this, getCurrentPDoOptions());
+  }
+
+  private ParallelDoOptions getCurrentPDoOptions() {
+    ParallelDoOptions.Builder pdb = ParallelDoOptions.builder();
+    if (currentPipelineCallable != null) {
+      pdb.targets(allPipelineCallables.get(currentPipelineCallable));
+    }
+    return pdb.build();
+  }
+
+  private Set<Target> getDependencies(PipelineCallable<?> callable) {
+    Set<Target> deps = Sets.newHashSet(callable.getAllTargets().values());
+    for (PCollection pc : callable.getAllPCollections().values()) {
+      PCollectionImpl pcImpl = (PCollectionImpl) pc;
+      deps.addAll(pcImpl.getTargetDependencies());
+      MaterializableIterable iter = (MaterializableIterable) pc.materialize();
+      Source pcSrc = iter.getSource();
+      if (pcSrc instanceof Target) {
+        deps.add((Target) pcSrc);
+      }
+    }
+    return deps;
   }
 
   public PCollection<String> readTextFile(String pathName) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
index 24cbaf5..064bba8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
@@ -31,6 +31,7 @@ import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.ReadableData;
 import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
 import org.apache.crunch.fn.Aggregators;
 import org.apache.crunch.lib.PTables;
 import org.apache.crunch.types.PGroupedTableType;
@@ -127,8 +128,8 @@ public class BaseGroupedTable<K, V> extends PCollectionImpl<Pair<K, Iterable<V>>
   }
 
   @Override
-  public Set<SourceTarget<?>> getTargetDependencies() {
-    Set<SourceTarget<?>> td = Sets.newHashSet(super.getTargetDependencies());
+  public Set<Target> getTargetDependencies() {
+    Set<Target> td = Sets.newHashSet(super.getTargetDependencies());
     if (groupingOptions != null) {
       td.addAll(groupingOptions.getSourceTargets());
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java
index 641a3cb..8d3887d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java
@@ -19,6 +19,7 @@ package org.apache.crunch.impl.dist.collect;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.ReadableData;
 import org.apache.crunch.Source;
 import org.apache.crunch.impl.dist.DistributedPipeline;
@@ -36,6 +37,11 @@ public class BaseInputCollection<S> extends PCollectionImpl<S> {
     this.source = source;
   }
 
+  public BaseInputCollection(Source<S> source, DistributedPipeline pipeline, ParallelDoOptions doOpts) {
+    super(source.toString(), pipeline, doOpts);
+    this.source = source;
+  }
+
   @Override
   protected ReadableData<S> getReadableDataInternal() {
     if (source instanceof ReadableSource) {
@@ -60,6 +66,11 @@ public class BaseInputCollection<S> extends PCollectionImpl<S> {
   }
 
   @Override
+  protected boolean waitingOnTargets() {
+    return doOptions.getTargets().contains(source);
+  }
+
+  @Override
   protected long getSizeInternal() {
     long sz = source.getSize(pipeline.getConfiguration());
     if (sz < 0) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java
index f41895a..cbab255 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java
@@ -19,6 +19,7 @@ package org.apache.crunch.impl.dist.collect;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.ReadableData;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.impl.dist.DistributedPipeline;
@@ -35,13 +36,25 @@ public class BaseInputTable<K, V> extends PTableBase<K, V> {
   public BaseInputTable(TableSource<K, V> source, DistributedPipeline pipeline) {
     super(source.toString(), pipeline);
     this.source = source;
-    this.asCollection = pipeline.getFactory().createInputCollection(source, pipeline);
+    this.asCollection = pipeline.getFactory().createInputCollection(
+        source, pipeline, ParallelDoOptions.builder().build());
+  }
+
+  public BaseInputTable(TableSource<K, V> source, DistributedPipeline pipeline, ParallelDoOptions doOpts) {
+    super(source.toString(), pipeline, doOpts);
+    this.source = source;
+    this.asCollection = pipeline.getFactory().createInputCollection(source, pipeline, doOpts);
   }
 
   public TableSource<K, V> getSource() {
     return source;
   }
-  
+
+  @Override
+  protected boolean waitingOnTargets() {
+    return asCollection.waitingOnTargets();
+  }
+
   @Override
   protected long getSizeInternal() {
     return asCollection.getSizeInternal();

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java
index a176aa1..9077fc9 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java
@@ -33,9 +33,15 @@ import java.util.List;
 
 public interface PCollectionFactory {
 
-  <S> BaseInputCollection<S> createInputCollection(Source<S> source, DistributedPipeline distributedPipeline);
+  <S> BaseInputCollection<S> createInputCollection(
+      Source<S> source,
+      DistributedPipeline distributedPipeline,
+      ParallelDoOptions doOpts);
 
-  <K, V> BaseInputTable<K, V> createInputTable(TableSource<K,V> source, DistributedPipeline distributedPipeline);
+  <K, V> BaseInputTable<K, V> createInputTable(
+      TableSource<K,V> source,
+      DistributedPipeline distributedPipeline,
+      ParallelDoOptions doOpts);
 
   <S> BaseUnionCollection<S> createUnionCollection(List<? extends PCollectionImpl<S>> internal);
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
index a1e70fe..fb2ce31 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Sets;
 
 import org.apache.crunch.Aggregator;
 import org.apache.crunch.CachingOptions;
+import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.FilterFn;
 import org.apache.crunch.MapFn;
@@ -30,6 +31,7 @@ import org.apache.crunch.PObject;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.PipelineCallable;
 import org.apache.crunch.ReadableData;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
@@ -44,6 +46,7 @@ import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -90,10 +93,17 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
 
   @Override
   public Iterable<S> materialize() {
-    if (getSize() == 0) {
+    if (!waitingOnTargets() && getSize() == 0) {
       System.err.println("Materializing an empty PCollection: " + this.getName());
       return Collections.emptyList();
     }
+    if (materializedAt != null && (materializedAt instanceof ReadableSource)) {
+      try {
+        return ((ReadableSource<S>) materializedAt).read(getPipeline().getConfiguration());
+      } catch (IOException e) {
+        throw new CrunchRuntimeException("Error reading materialized data", e);
+      }
+    }
     materialized = true;
     return pipeline.materialize(this);
   }
@@ -156,9 +166,10 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
     return pipeline.getFactory().createDoTable(name, getChainingCollection(), fn, type, options);
   }
 
+
   public PCollection<S> write(Target target) {
     if (materializedAt != null) {
-      getPipeline().write(pipeline.getFactory().createInputCollection(materializedAt, pipeline), target);
+      getPipeline().write(pipeline.getFactory().createInputCollection(materializedAt, pipeline, doOptions), target);
     } else {
       getPipeline().write(this, target);
     }
@@ -169,7 +180,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
   public PCollection<S> write(Target target, Target.WriteMode writeMode) {
     if (materializedAt != null) {
       getPipeline().write(
-          pipeline.getFactory().createInputCollection(materializedAt, pipeline),
+          pipeline.getFactory().createInputCollection(materializedAt, pipeline, doOptions),
           target,
           writeMode);
     } else {
@@ -192,12 +203,21 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
 
   public void accept(Visitor visitor) {
     if (materializedAt != null) {
-      visitor.visitInputCollection(pipeline.getFactory().createInputCollection(materializedAt, pipeline));
+      visitor.visitInputCollection(pipeline.getFactory().createInputCollection(materializedAt, pipeline, doOptions));
     } else {
       acceptInternal(visitor);
     }
   }
 
+  protected boolean waitingOnTargets() {
+    for (PCollectionImpl parent : getParents()) {
+      if (parent.waitingOnTargets()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   protected abstract void acceptInternal(Visitor visitor);
 
   public void setBreakpoint() {
@@ -217,6 +237,12 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
   @Override
   public PObject<S> first() { return new FirstElementPObject<S>(this); }
 
+  @Override
+  public <Output> Output sequentialDo(String label, PipelineCallable<Output> pipelineCallable) {
+    pipelineCallable.dependsOn(label, this);
+    return getPipeline().sequentialDo(pipelineCallable);
+  }
+
   public SourceTarget<S> getMaterializedAt() {
     return materializedAt;
   }
@@ -286,8 +312,8 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
     return parents.get(0);
   }
 
-  public Set<SourceTarget<?>> getTargetDependencies() {
-    Set<SourceTarget<?>> targetDeps = doOptions.getSourceTargets();
+  public Set<Target> getTargetDependencies() {
+    Set<Target> targetDeps = Sets.<Target>newHashSet(doOptions.getTargets());
     for (PCollectionImpl<?> parent : getParents()) {
       targetDeps = Sets.union(targetDeps, parent.getTargetDependencies());
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
index 32f9991..a893b9e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
@@ -92,7 +92,7 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> imple
   public PTable<K, V> write(Target target) {
     if (getMaterializedAt() != null) {
       getPipeline().write(pipeline.getFactory().createInputTable(
-          (TableSource<K, V>) getMaterializedAt(), pipeline), target);
+          (TableSource<K, V>) getMaterializedAt(), pipeline, doOptions), target);
     } else {
       getPipeline().write(this, target);
     }
@@ -103,7 +103,7 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> imple
   public PTable<K, V> write(Target target, Target.WriteMode writeMode) {
     if (getMaterializedAt() != null) {
       getPipeline().write(pipeline.getFactory().createInputTable(
-          (TableSource<K, V>) getMaterializedAt(), pipeline), target, writeMode);
+          (TableSource<K, V>) getMaterializedAt(), pipeline, doOptions), target, writeMode);
     } else {
       getPipeline().write(this, target, writeMode);
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 42d1ca8..5996bfa 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -37,6 +37,7 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.PipelineExecution;
 import org.apache.crunch.PipelineResult;
+import org.apache.crunch.PipelineCallable;
 import org.apache.crunch.Source;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.Target;
@@ -326,6 +327,19 @@ public class MemPipeline implements Pipeline {
   }
 
   @Override
+  public <Output> Output sequentialDo(PipelineCallable<Output> callable) {
+    Output out = callable.generateOutput(this);
+    try {
+      if (PipelineCallable.Status.FAILURE == callable.call()) {
+        throw new IllegalStateException("PipelineCallable " + callable + " failed in in-memory Crunch pipeline");
+      }
+    } catch (Throwable t) {
+      t.printStackTrace();
+    }
+    return out;
+  }
+
+  @Override
   public PipelineExecution runAsync() {
     activeTargets.clear();
     return new MemExecution();

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index 240de1c..becee88 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -37,6 +37,7 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.ReadableData;
+import org.apache.crunch.PipelineCallable;
 import org.apache.crunch.Target;
 import org.apache.crunch.fn.ExtractKeyFn;
 import org.apache.crunch.impl.mem.MemPipeline;
@@ -189,6 +190,12 @@ public class MemCollection<S> implements PCollection<S> {
   public PObject<S> first() { return new FirstElementPObject<S>(this); }
 
   @Override
+  public <Output> Output sequentialDo(String label, PipelineCallable<Output> pipelineCallable) {
+    pipelineCallable.dependsOn(label, this);
+    return getPipeline().sequentialDo(pipelineCallable);
+  }
+
+  @Override
   public ReadableData<S> asReadable(boolean materialize) {
     return new MemReadableData<S>(collect);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index 6cfc6d0..bf3f58a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -27,7 +27,6 @@ import java.util.Map;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Maps;
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.CachingOptions;
@@ -107,7 +106,7 @@ public class MRPipeline extends DistributedPipeline {
         outputTargetsToMaterialize.remove(c);
       }
     }
-    MSCRPlanner planner = new MSCRPlanner(this, outputTargets, toMaterialize);
+    MSCRPlanner planner = new MSCRPlanner(this, outputTargets, toMaterialize, allPipelineCallables);
     try {
       return planner.plan(jarClass, getConfiguration());
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
index 42d9df2..ea189f8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch.impl.mr.collect;
 
+import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.ReadableData;
 import org.apache.crunch.Source;
 import org.apache.crunch.impl.dist.collect.BaseInputCollection;
@@ -27,8 +28,8 @@ import org.apache.crunch.io.ReadableSource;
 
 public class InputCollection<S> extends BaseInputCollection<S> implements MRCollection {
 
-  public InputCollection(Source<S> source, MRPipeline pipeline) {
-    super(source, pipeline);
+  public InputCollection(Source<S> source, MRPipeline pipeline, ParallelDoOptions doOpts) {
+    super(source, pipeline, doOpts);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
index fb550fa..3154190 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch.impl.mr.collect;
 
+import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.impl.dist.collect.BaseInputTable;
 import org.apache.crunch.impl.dist.collect.MRCollection;
@@ -25,8 +26,8 @@ import org.apache.crunch.impl.mr.plan.DoNode;
 
 public class InputTable<K, V> extends BaseInputTable<K, V> implements MRCollection {
 
-  public InputTable(TableSource<K, V> source, MRPipeline pipeline) {
-    super(source, pipeline);
+  public InputTable(TableSource<K, V> source, MRPipeline pipeline, ParallelDoOptions doOpts) {
+    super(source, pipeline, doOpts);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java
index 1e94c53..ede88f2 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java
@@ -50,13 +50,19 @@ import java.util.List;
 
 public class MRCollectionFactory implements PCollectionFactory {
   @Override
-  public <S> BaseInputCollection<S> createInputCollection(Source<S> source, DistributedPipeline pipeline) {
-    return new InputCollection<S>(source, (MRPipeline) pipeline);
+  public <S> BaseInputCollection<S> createInputCollection(
+      Source<S> source,
+      DistributedPipeline pipeline,
+      ParallelDoOptions doOpts) {
+    return new InputCollection<S>(source, (MRPipeline) pipeline, doOpts);
   }
 
   @Override
-  public <K, V> BaseInputTable<K, V> createInputTable(TableSource<K, V> source, DistributedPipeline pipeline) {
-    return new InputTable<K, V>(source, (MRPipeline) pipeline);
+  public <K, V> BaseInputTable<K, V> createInputTable(
+      TableSource<K, V> source,
+      DistributedPipeline pipeline,
+      ParallelDoOptions doOpts) {
+    return new InputTable<K, V>(source, (MRPipeline) pipeline, doOpts);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index eb46ab0..2d07c13 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AbstractFuture;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.PipelineCallable;
 import org.apache.crunch.PipelineResult;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
@@ -33,7 +34,6 @@ import org.apache.crunch.impl.mr.MRPipelineExecution;
 import org.apache.crunch.materialize.MaterializableIterable;
 import org.apache.hadoop.conf.Configuration;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -73,8 +73,9 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe
       Configuration conf,
       Class<?> jarClass,
       Map<PCollectionImpl<?>, Set<Target>> outputTargets,
-      Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) {
-    this.control = new CrunchJobControl(conf, jarClass.toString());
+      Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize,
+      Map<PipelineCallable<?>, Set<Target>> pipelineCallables) {
+    this.control = new CrunchJobControl(conf, jarClass.toString(), pipelineCallables);
     this.outputTargets = outputTargets;
     this.toMaterialize = toMaterialize;
     this.monitorThread = new Thread(new Runnable() {
@@ -121,31 +122,41 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe
           System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage());
         }
       }
+      List<PipelineCallable<?>> failedCallables = control.getFailedCallables();
+      if (!failedCallables.isEmpty()) {
+        System.err.println(failedCallables.size() + " callable failure(s) occurred:");
+        for (PipelineCallable<?> c : failedCallables) {
+          System.err.println(c.getName() + ": " + c.getMessage());
+        }
+      }
+      boolean hasFailures = !failures.isEmpty() || !failedCallables.isEmpty();
       List<PipelineResult.StageResult> stages = Lists.newArrayList();
       for (CrunchControlledJob job : control.getSuccessfulJobList()) {
         stages.add(new PipelineResult.StageResult(job.getJobName(), job.getMapredJobID().toString(), job.getCounters(),
             job.getStartTimeMsec(), job.getJobStartTimeMsec(), job.getJobEndTimeMsec(), job.getEndTimeMsec()));
       }
 
-      for (PCollectionImpl<?> c : outputTargets.keySet()) {
-        if (toMaterialize.containsKey(c)) {
-          MaterializableIterable iter = toMaterialize.get(c);
-          if (iter.isSourceTarget()) {
-            iter.materialize();
-            c.materializeAt((SourceTarget) iter.getSource());
-          }
-        } else {
-          boolean materialized = false;
-          for (Target t : outputTargets.get(c)) {
-            if (!materialized) {
-              if (t instanceof SourceTarget) {
-                c.materializeAt((SourceTarget) t);
-                materialized = true;
-              } else {
-                SourceTarget st = t.asSourceTarget(c.getPType());
-                if (st != null) {
-                  c.materializeAt(st);
+      if (!hasFailures) {
+        for (PCollectionImpl<?> c : outputTargets.keySet()) {
+          if (toMaterialize.containsKey(c)) {
+            MaterializableIterable iter = toMaterialize.get(c);
+            if (iter.isSourceTarget()) {
+              iter.materialize();
+              c.materializeAt((SourceTarget) iter.getSource());
+            }
+          } else {
+            boolean materialized = false;
+            for (Target t : outputTargets.get(c)) {
+              if (!materialized) {
+                if (t instanceof SourceTarget) {
+                  c.materializeAt((SourceTarget) t);
                   materialized = true;
+                } else {
+                  SourceTarget st = t.asSourceTarget(c.getPType());
+                  if (st != null) {
+                    c.materializeAt(st);
+                    materialized = true;
+                  }
                 }
               }
             }
@@ -156,7 +167,7 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe
       synchronized (this) {
         if (killSignal.getCount() == 0) {
           status.set(Status.KILLED);
-        } else if (!failures.isEmpty()) {
+        } else if (!failures.isEmpty() || !failedCallables.isEmpty()) {
           status.set(Status.FAILED);
         } else {
           status.set(Status.SUCCEEDED);

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index 41da5a6..d341184 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -147,6 +147,7 @@ class JobPrototype {
     job.setJarByClass(jarClass);
 
     Set<DoNode> outputNodes = Sets.newHashSet();
+    Set<Target> allTargets = Sets.newHashSet();
     Path outputPath = new Path(workingPath, "output");
     MSCROutputHandler outputHandler = new MSCROutputHandler(job, outputPath, group == null);
     for (Target target : targetsToNodePaths.keySet()) {
@@ -159,6 +160,7 @@ class JobPrototype {
         }
         outputNodes.add(walkPath(nodePath.descendingIterator(), node));
       }
+      allTargets.add(target);
     }
 
     Set<DoNode> mapSideNodes = Sets.newHashSet();
@@ -173,7 +175,7 @@ class JobPrototype {
           }
           mapSideNodes.add(walkPath(nodePath.descendingIterator(), node));
         }
-        
+        allTargets.add(target);
       }
     }
     
@@ -229,6 +231,7 @@ class JobPrototype {
         jobID,
         job,
         jobNameBuilder,
+        allTargets,
         new CrunchJobHooks.PrepareHook(job),
         new CrunchJobHooks.CompletionHook(job, outputPath, outputHandler.getMultiPaths(), group == null));
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index c9a6136..7a1ff4e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -18,10 +18,16 @@
 package org.apache.crunch.impl.mr.plan;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.PipelineCallable;
 import org.apache.crunch.Source;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
@@ -47,14 +53,17 @@ public class MSCRPlanner {
   private final MRPipeline pipeline;
   private final Map<PCollectionImpl<?>, Set<Target>> outputs;
   private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
+  private final Map<PipelineCallable<?>, Set<Target>> pipelineCallables;
   private int lastJobID = 0;
 
   public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs,
-      Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) {
+      Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize,
+      Map<PipelineCallable<?>, Set<Target>> pipelineCallables) {
     this.pipeline = pipeline;
     this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR);
     this.outputs.putAll(outputs);
     this.toMaterialize = toMaterialize;
+    this.pipelineCallables = pipelineCallables;
   }
 
   // Used to ensure that we always build pipelines starting from the deepest
@@ -74,8 +83,7 @@ public class MSCRPlanner {
   };  
 
   public MRExecutor plan(Class<?> jarClass, Configuration conf) throws IOException {
-    Map<PCollectionImpl<?>, Set<SourceTarget<?>>> targetDeps =
-        Maps.<PCollectionImpl<?>, PCollectionImpl<?>, Set<SourceTarget<?>>>newTreeMap(DEPTH_COMPARATOR);
+    Map<PCollectionImpl<?>, Set<Target>> targetDeps = Maps.newTreeMap(DEPTH_COMPARATOR);
     for (PCollectionImpl<?> pcollect : outputs.keySet()) {
       targetDeps.put(pcollect, pcollect.getTargetDependencies());
     }
@@ -109,7 +117,7 @@ public class MSCRPlanner {
       }
       if (!hasInputs) {
         LOG.warn("No input sources for pipeline, nothing to do...");
-        return new MRExecutor(conf, jarClass, outputs, toMaterialize);
+        return new MRExecutor(conf, jarClass, outputs, toMaterialize, pipelineCallables);
       }
 
       // Create a new graph that splits up up dependent GBK nodes.
@@ -183,7 +191,7 @@ public class MSCRPlanner {
     
     // Finally, construct the jobs from the prototypes and return.
     DotfileWriter dotfileWriter = new DotfileWriter();
-    MRExecutor exec = new MRExecutor(conf, jarClass, outputs, toMaterialize);
+    MRExecutor exec = new MRExecutor(conf, jarClass, outputs, toMaterialize, pipelineCallables);
     for (JobPrototype proto : Sets.newHashSet(assignments.values())) {
       dotfileWriter.addJobPrototype(proto);
       exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline, lastJobID));

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
index 2c1f1be..e727ec1 100644
--- a/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControlTest.java
@@ -17,15 +17,26 @@
  */
 package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineCallable;
+import org.apache.crunch.Target;
 import org.apache.crunch.impl.mr.MRJob;
 import org.apache.crunch.impl.mr.plan.JobNameBuilder;
 import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.io.To;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -37,7 +48,8 @@ public class CrunchJobControlTest {
   public void testMaxRunningJobs() throws IOException, InterruptedException {
     Configuration conf = new Configuration();
     conf.setInt(RuntimeParameters.MAX_RUNNING_JOBS, 2);
-    CrunchJobControl jobControl = new CrunchJobControl(conf, "group");
+    CrunchJobControl jobControl = new CrunchJobControl(conf, "group",
+        ImmutableMap.<PipelineCallable<?>, Set<Target>>of());
     CrunchControlledJob job1 = createJob(1);
     CrunchControlledJob job2 = createJob(2);
     CrunchControlledJob job3 = createJob(3);
@@ -60,13 +72,81 @@ public class CrunchJobControlTest {
     verify(job3).submit();
   }
 
-  private CrunchControlledJob createJob(int jobID) throws IOException, InterruptedException {
+  private class IncrementingPipelineCallable extends PipelineCallable<Void> {
+
+    private String name;
+    private boolean executed;
+
+    public IncrementingPipelineCallable(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public Status call() {
+      executed = true;
+      return Status.SUCCESS;
+    }
+
+    public boolean isExecuted() { return executed; }
+
+    @Override
+    public Void getOutput(Pipeline pipeline) {
+      return null;
+    }
+  }
+
+  @Test
+  public void testSequentialDo() throws IOException, InterruptedException {
+    Target t1 = To.textFile("foo");
+    Target t2 = To.textFile("bar");
+    Target t3 = To.textFile("baz");
+    IncrementingPipelineCallable first = new IncrementingPipelineCallable("first");
+    IncrementingPipelineCallable second = new IncrementingPipelineCallable("second");
+    IncrementingPipelineCallable third = new IncrementingPipelineCallable("third");
+    CrunchControlledJob job1 = createJob(1, ImmutableSet.of(t1));
+    CrunchControlledJob job2 = createJob(2, ImmutableSet.of(t2));
+    CrunchControlledJob job3 = createJob(3, ImmutableSet.of(t3));
+    Configuration conf = new Configuration();
+    Map<PipelineCallable<?>, Set<Target>> pipelineCallables = Maps.newHashMap();
+    pipelineCallables.put(first, ImmutableSet.<Target>of());
+    pipelineCallables.put(second, ImmutableSet.<Target>of(t1));
+    pipelineCallables.put(third, ImmutableSet.<Target>of(t2, t3));
+    CrunchJobControl jobControl = new CrunchJobControl(conf, "group", pipelineCallables);
+
+    jobControl.addJob(job1);
+    jobControl.addJob(job2);
+    jobControl.addJob(job3);
+    jobControl.pollJobStatusAndStartNewOnes();
+    verify(job1).submit();
+    verify(job2).submit();
+    verify(job3).submit();
+    assertTrue(first.isExecuted());
+
+    setSuccess(job1);
+    jobControl.pollJobStatusAndStartNewOnes();
+    assertTrue(second.isExecuted());
+
+    setSuccess(job2);
+    jobControl.pollJobStatusAndStartNewOnes();
+    assertFalse(third.isExecuted());
+
+    setSuccess(job3);
+    jobControl.pollJobStatusAndStartNewOnes();
+    assertTrue(third.isExecuted());
+  }
+
+  private CrunchControlledJob createJob(int jobID) {
+    return createJob(jobID, ImmutableSet.<Target>of());
+  }
+
+  private CrunchControlledJob createJob(int jobID, Set<Target> targets) {
     Job mrJob = mock(Job.class);
     when(mrJob.getConfiguration()).thenReturn(new Configuration());
     CrunchControlledJob job = new CrunchControlledJob(
         jobID,
         mrJob,
         new JobNameBuilder(mrJob.getConfiguration(), "test", 1, 1),
+        targets,
         mock(CrunchControlledJob.Hook.class),
         mock(CrunchControlledJob.Hook.class));
     return spy(job);

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
index b2216f1..6f4a19f 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
@@ -278,6 +278,14 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
   def shard(numPartitions: Int) = wrap(Shard.shard(native, numPartitions))
 
   /**
+   * Adds this PCollection as a dependency for the given PipelineCallable
+   * and then registers it to the Pipeline associated with this instance.
+   */
+  def sequentialDo[Output](label: String, fn: PipelineCallable[Output]) = {
+    native.sequentialDo(label, fn)
+  }
+
+  /**
    * Gets the number of elements represented by this PCollection.
    *
    * @return The number of elements in this PCollection.

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
index b800612..08b4697 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
@@ -113,6 +113,10 @@ trait PipelineLike {
    */
   def emptyPTable[K, V](pt: PTableType[K, V]) = new PTable[K, V](jpipeline.emptyPTable(pt))
 
+  /**
+   * Adds the given {@code SeqDoFn} to the pipeline execution and returns its output.
+   */
+  def sequentialDo[Output](seqDoFn: PipelineCallable[Output]) = jpipeline.sequentialDo(seqDoFn)
 
   /**
    * Returns a handler for controlling the execution of the underlying MapReduce

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java
new file mode 100644
index 0000000..51b65af
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkPipelineCallableIT.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+
+public class SparkPipelineCallableIT extends CrunchTestSupport {
+  @Test
+  public void testSparkShakes() throws Exception {
+    run(new SparkPipeline("local", "PC", SparkPipelineCallableIT.class, tempDir.getDefaultConfiguration()),
+        tempDir.copyResourceFileName("shakes.txt"), false /* fail */);
+  }
+
+  @Test
+  public void testFailure() throws Exception {
+    run(new SparkPipeline("local", "PC", SparkPipelineCallableIT.class, tempDir.getDefaultConfiguration()),
+        tempDir.copyResourceFileName("shakes.txt"), true /* fail */);
+  }
+
+  public static int INC1 = 0;
+  public static int INC2 = 0;
+
+  public static void run(Pipeline p, final String input, final boolean fail) {
+
+    PTable<String, Long> top3 = p.sequentialDo(new PipelineCallable<PCollection<String>>() {
+      @Override
+      public Status call() {
+        INC1 = 17;
+        return fail ? Status.FAILURE : Status.SUCCESS;
+      }
+
+      @Override
+      public PCollection<String> getOutput(Pipeline pipeline) {
+        return pipeline.readTextFile(input);
+      }
+    }.named("first"))
+        .sequentialDo("onInput", new PipelineCallable<PCollection<String>>() {
+          @Override
+          protected PCollection<String> getOutput(Pipeline pipeline) {
+            return getOnlyPCollection();
+          }
+
+          @Override
+          public Status call() throws Exception {
+            return Status.SUCCESS;
+          }
+        })
+        .count()
+        .sequentialDo("label", new PipelineCallable<PTable<String, Long>>() {
+          @Override
+          public Status call() {
+            INC2 = 29;
+            if (getPCollection("label") != null) {
+              return Status.SUCCESS;
+            }
+            return Status.FAILURE;
+          }
+
+          @Override
+          public PTable<String, Long> getOutput(Pipeline pipeline) {
+            return (PTable<String, Long>) getOnlyPCollection();
+          }
+        }.named("second"))
+        .top(3);
+
+    if (fail) {
+      assertFalse(p.run().succeeded());
+    } else {
+      Map<String, Long> counts = top3.materializeToMap();
+      assertEquals(ImmutableMap.of("", 788L, "Enter Macbeth.", 7L, "Exeunt.", 21L), counts);
+      assertEquals(17, INC1);
+      assertEquals(29, INC2);
+    }
+    p.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
index 7a69707..95ccd2c 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
@@ -146,7 +146,8 @@ public class SparkPipeline extends DistributedPipeline {
     }
 
     copyConfiguration(conf, sparkContext.hadoopConfiguration());
-    SparkRuntime runtime = new SparkRuntime(this, sparkContext, conf, outputTargets, toMaterialize, cachedCollections);
+    SparkRuntime runtime = new SparkRuntime(this, sparkContext, conf, outputTargets,
+        toMaterialize, cachedCollections, allPipelineCallables);
     runtime.execute();
     outputTargets.clear();
     return runtime;

http://git-wip-us.apache.org/repos/asf/crunch/blob/a5c59276/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
index 22375ee..a9537e5 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
@@ -18,13 +18,17 @@
 package org.apache.crunch.impl.spark;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.CombineFn;
 import org.apache.crunch.PCollection;
+import org.apache.crunch.PipelineCallable;
 import org.apache.crunch.PipelineExecution;
 import org.apache.crunch.PipelineResult;
 import org.apache.crunch.SourceTarget;
@@ -58,10 +62,13 @@ import org.apache.spark.storage.StorageLevel;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -79,6 +86,8 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
   private Map<PCollectionImpl<?>, Set<Target>> outputTargets;
   private Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
   private Map<PCollection<?>, StorageLevel> toCache;
+  private Map<PipelineCallable<?>, Set<Target>> allPipelineCallables;
+  private Set<PipelineCallable<?>> activePipelineCallables;
   private final CountDownLatch doneSignal = new CountDownLatch(1);
   private AtomicReference<Status> status = new AtomicReference<Status>(Status.READY);
   private boolean started;
@@ -104,7 +113,8 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
                       Configuration conf,
                       Map<PCollectionImpl<?>, Set<Target>> outputTargets,
                       Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize,
-                      Map<PCollection<?>, StorageLevel> toCache) {
+                      Map<PCollection<?>, StorageLevel> toCache,
+                      Map<PipelineCallable<?>, Set<Target>> allPipelineCallables) {
     this.pipeline = pipeline;
     this.sparkContext = sparkContext;
     this.conf = conf;
@@ -115,6 +125,8 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
     this.outputTargets.putAll(outputTargets);
     this.toMaterialize = toMaterialize;
     this.toCache = toCache;
+    this.allPipelineCallables = allPipelineCallables;
+    this.activePipelineCallables = allPipelineCallables.keySet();
     this.status.set(Status.READY);
     this.monitorThread = new Thread(new Runnable() {
       @Override
@@ -201,13 +213,67 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
     doneSignal.await();
   }
 
+  private void runCallables(Set<Target> unfinished) {
+    Set<PipelineCallable<?>> oldCallables = activePipelineCallables;
+    activePipelineCallables = Sets.newHashSet();
+    List<PipelineCallable<?>> callablesToRun = Lists.newArrayList();
+    List<PipelineCallable<?>> failedCallables = Lists.newArrayList();
+    for (PipelineCallable<?> pipelineCallable : oldCallables) {
+      if (Sets.intersection(allPipelineCallables.get(pipelineCallable), unfinished).isEmpty()) {
+        if (pipelineCallable.runSingleThreaded()) {
+          try {
+            if (pipelineCallable.call() != PipelineCallable.Status.SUCCESS) {
+              failedCallables.add(pipelineCallable);
+            }
+          } catch (Throwable t) {
+            pipelineCallable.setMessage(t.getLocalizedMessage());
+            failedCallables.add(pipelineCallable);
+          }
+        } else {
+          callablesToRun.add(pipelineCallable);
+        }
+      } else {
+        // Still need to run this one
+        activePipelineCallables.add(pipelineCallable);
+      }
+    }
+
+    ListeningExecutorService es = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+    try {
+      List<Future<PipelineCallable.Status>> res = es.invokeAll(callablesToRun);
+      for (int i = 0; i < res.size(); i++) {
+        if (res.get(i).get() != PipelineCallable.Status.SUCCESS) {
+          failedCallables.add((PipelineCallable) callablesToRun.get(i));
+        }
+      }
+    } catch (Throwable t) {
+      t.printStackTrace();
+      failedCallables.addAll((List) callablesToRun);
+    } finally {
+      es.shutdownNow();
+    }
+
+    if (!failedCallables.isEmpty()) {
+      LOG.error(failedCallables.size() + " callable failure(s) occurred:");
+      for (PipelineCallable<?> c : failedCallables) {
+        LOG.error(c.getName() + ": " + c.getMessage());
+      }
+      status.set(Status.FAILED);
+      set(PipelineResult.EMPTY);
+      doneSignal.countDown();
+    }
+  }
+
   private void monitorLoop() {
     status.set(Status.RUNNING);
     long start = System.currentTimeMillis();
-    Map<PCollectionImpl<?>, Set<SourceTarget<?>>> targetDeps = Maps.<PCollectionImpl<?>, PCollectionImpl<?>, Set<SourceTarget<?>>>newTreeMap(DEPTH_COMPARATOR);
+    Map<PCollectionImpl<?>, Set<Target>> targetDeps = Maps.newTreeMap(DEPTH_COMPARATOR);
+    Set<Target> unfinished = Sets.newHashSet();
     for (PCollectionImpl<?> pcollect : outputTargets.keySet()) {
       targetDeps.put(pcollect, pcollect.getTargetDependencies());
+      unfinished.addAll(outputTargets.get(pcollect));
     }
+    runCallables(unfinished);
     while (!targetDeps.isEmpty() && doneSignal.getCount() > 0) {
       Set<Target> allTargets = Sets.newHashSet();
       for (PCollectionImpl<?> pcollect : targetDeps.keySet()) {
@@ -271,21 +337,26 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
             }
           }
         }
+        unfinished.removeAll(targets);
       }
-      for (PCollectionImpl<?> output : pcolToRdd.keySet()) {
-        if (toMaterialize.containsKey(output)) {
-          MaterializableIterable mi = toMaterialize.get(output);
-          if (mi.isSourceTarget()) {
-            output.materializeAt((SourceTarget) mi.getSource());
+      if (status.get() == Status.RUNNING) {
+        for (PCollectionImpl<?> output : pcolToRdd.keySet()) {
+          if (toMaterialize.containsKey(output)) {
+            MaterializableIterable mi = toMaterialize.get(output);
+            if (mi.isSourceTarget()) {
+              output.materializeAt((SourceTarget) mi.getSource());
+            }
           }
+          targetDeps.remove(output);
         }
-        targetDeps.remove(output);
       }
+      runCallables(unfinished);
     }
     if (status.get() != Status.FAILED || status.get() != Status.KILLED) {
       status.set(Status.SUCCEEDED);
       set(new PipelineResult(
-          ImmutableList.of(new PipelineResult.StageResult("Spark", getCounters(), start, System.currentTimeMillis())),
+          ImmutableList.of(new PipelineResult.StageResult("Spark", getCounters(),
+              start, System.currentTimeMillis())),
           Status.SUCCEEDED));
     } else {
       set(PipelineResult.EMPTY);


Mime
View raw message