crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [32/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
Date Tue, 23 Apr 2013 20:41:34 GMT
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
new file mode 100644
index 0000000..f22b5a1
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
+import org.apache.crunch.impl.mr.collect.DoTableImpl;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+import org.apache.crunch.impl.mr.exec.CrunchJobHooks;
+import org.apache.crunch.impl.mr.run.CrunchCombiner;
+import org.apache.crunch.impl.mr.run.CrunchInputFormat;
+import org.apache.crunch.impl.mr.run.CrunchMapper;
+import org.apache.crunch.impl.mr.run.CrunchReducer;
+import org.apache.crunch.impl.mr.run.NodeContext;
+import org.apache.crunch.impl.mr.run.RTNode;
+import org.apache.crunch.util.DistCache;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+class JobPrototype {
+
+  public static JobPrototype createMapReduceJob(int jobID, PGroupedTableImpl<?, ?> group,
+      Set<NodePath> inputs, Path workingPath) {
+    return new JobPrototype(jobID, inputs, group, workingPath);
+  }
+
+  public static JobPrototype createMapOnlyJob(int jobID, HashMultimap<Target, NodePath> mapNodePaths, Path workingPath) {
+    return new JobPrototype(jobID, mapNodePaths, workingPath);
+  }
+
+  private final int jobID; // TODO: maybe stageID sounds better
+  private final Set<NodePath> mapNodePaths;
+  private final PGroupedTableImpl<?, ?> group;
+  private final Set<JobPrototype> dependencies = Sets.newHashSet();
+  private final Map<PCollectionImpl<?>, DoNode> nodes = Maps.newHashMap();
+  private final Path workingPath;
+
+  private HashMultimap<Target, NodePath> targetsToNodePaths;
+  private DoTableImpl<?, ?> combineFnTable;
+
+  private CrunchControlledJob job;
+
+  private JobPrototype(int jobID, Set<NodePath> inputs, PGroupedTableImpl<?, ?> group, Path workingPath) {
+    this.jobID = jobID;
+    this.mapNodePaths = ImmutableSet.copyOf(inputs);
+    this.group = group;
+    this.workingPath = workingPath;
+    this.targetsToNodePaths = null;
+  }
+
+  private JobPrototype(int jobID, HashMultimap<Target, NodePath> outputPaths, Path workingPath) {
+    this.jobID = jobID;
+    this.group = null;
+    this.mapNodePaths = null;
+    this.workingPath = workingPath;
+    this.targetsToNodePaths = outputPaths;
+  }
+
+  public int getJobID() {
+    return jobID;
+  }
+
+  public boolean isMapOnly() {
+    return this.group == null;
+  }
+
+  Set<NodePath> getMapNodePaths() {
+    return mapNodePaths;
+  }
+
+  PGroupedTableImpl<?, ?> getGroupingTable() {
+    return group;
+  }
+
+  HashMultimap<Target, NodePath> getTargetsToNodePaths() {
+    return targetsToNodePaths;
+  }
+
+  public void addReducePaths(HashMultimap<Target, NodePath> outputPaths) {
+    if (group == null) {
+      throw new IllegalStateException("Cannot add a reduce phase to a map-only job");
+    }
+    this.targetsToNodePaths = outputPaths;
+  }
+
+  public void addDependency(JobPrototype dependency) {
+    this.dependencies.add(dependency);
+  }
+
+  public CrunchControlledJob getCrunchJob(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
+    if (job == null) {
+      job = build(jarClass, conf, pipeline);
+      for (JobPrototype proto : dependencies) {
+        job.addDependingJob(proto.getCrunchJob(jarClass, conf, pipeline));
+      }
+    }
+    return job;
+  }
+
+  private CrunchControlledJob build(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
+    Job job = new Job(conf);
+    conf = job.getConfiguration();
+    conf.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, workingPath.toString());
+    job.setJarByClass(jarClass);
+
+    Set<DoNode> outputNodes = Sets.newHashSet();
+    Set<Target> targets = targetsToNodePaths.keySet();
+    Path outputPath = new Path(workingPath, "output");
+    MSCROutputHandler outputHandler = new MSCROutputHandler(job, outputPath, group == null);
+    for (Target target : targets) {
+      DoNode node = null;
+      for (NodePath nodePath : targetsToNodePaths.get(target)) {
+        if (node == null) {
+          PCollectionImpl<?> collect = nodePath.tail();
+          node = DoNode.createOutputNode(target.toString(), collect.getPType());
+          outputHandler.configureNode(node, target);
+        }
+        outputNodes.add(walkPath(nodePath.descendingIterator(), node));
+      }
+    }
+
+    job.setMapperClass(CrunchMapper.class);
+    List<DoNode> inputNodes;
+    DoNode reduceNode = null;
+    if (group != null) {
+      job.setReducerClass(CrunchReducer.class);
+      List<DoNode> reduceNodes = Lists.newArrayList(outputNodes);
+      serialize(reduceNodes, conf, workingPath, NodeContext.REDUCE);
+      reduceNode = reduceNodes.get(0);
+
+      if (combineFnTable != null) {
+        job.setCombinerClass(CrunchCombiner.class);
+        DoNode combinerInputNode = group.createDoNode();
+        DoNode combineNode = combineFnTable.createDoNode();
+        combineNode.addChild(group.getGroupingNode());
+        combinerInputNode.addChild(combineNode);
+        serialize(ImmutableList.of(combinerInputNode), conf, workingPath, NodeContext.COMBINE);
+      }
+
+      group.configureShuffle(job);
+
+      DoNode mapOutputNode = group.getGroupingNode();
+      Set<DoNode> mapNodes = Sets.newHashSet();
+      for (NodePath nodePath : mapNodePaths) {
+        // Advance these one step, since we've already configured
+        // the grouping node, and the PGroupedTableImpl is the tail
+        // of the NodePath.
+        Iterator<PCollectionImpl<?>> iter = nodePath.descendingIterator();
+        iter.next();
+        mapNodes.add(walkPath(iter, mapOutputNode));
+      }
+      inputNodes = Lists.newArrayList(mapNodes);
+    } else { // No grouping
+      job.setNumReduceTasks(0);
+      inputNodes = Lists.newArrayList(outputNodes);
+    }
+    serialize(inputNodes, conf, workingPath, NodeContext.MAP);
+
+    if (inputNodes.size() == 1) {
+      DoNode inputNode = inputNodes.get(0);
+      inputNode.getSource().configureSource(job, -1);
+    } else {
+      for (int i = 0; i < inputNodes.size(); i++) {
+        DoNode inputNode = inputNodes.get(i);
+        inputNode.getSource().configureSource(job, i);
+      }
+      job.setInputFormatClass(CrunchInputFormat.class);
+    }
+    job.setJobName(createJobName(pipeline.getName(), inputNodes, reduceNode));
+
+    return new CrunchControlledJob(
+        jobID,
+        job,
+        new CrunchJobHooks.PrepareHook(job),
+        new CrunchJobHooks.CompletionHook(job, outputPath, outputHandler.getMultiPaths(), group == null));
+  }
+
+  private void serialize(List<DoNode> nodes, Configuration conf, Path workingPath, NodeContext context)
+      throws IOException {
+    List<RTNode> rtNodes = Lists.newArrayList();
+    for (DoNode node : nodes) {
+      rtNodes.add(node.toRTNode(true, conf, context));
+    }
+    Path path = new Path(workingPath, context.toString());
+    DistCache.write(conf, path, rtNodes);
+  }
+
+  private String createJobName(String pipelineName, List<DoNode> mapNodes, DoNode reduceNode) {
+    JobNameBuilder builder = new JobNameBuilder(pipelineName);
+    builder.visit(mapNodes);
+    if (reduceNode != null) {
+      builder.visit(reduceNode);
+    }
+    return builder.build();
+  }
+
+  private DoNode walkPath(Iterator<PCollectionImpl<?>> iter, DoNode working) {
+    while (iter.hasNext()) {
+      PCollectionImpl<?> collect = iter.next();
+      if (combineFnTable != null && !(collect instanceof PGroupedTableImpl)) {
+        combineFnTable = null;
+      } else if (collect instanceof DoTableImpl && ((DoTableImpl<?, ?>) collect).hasCombineFn()) {
+        combineFnTable = (DoTableImpl<?, ?>) collect;
+      }
+      if (!nodes.containsKey(collect)) {
+        nodes.put(collect, collect.createDoNode());
+      }
+      DoNode parent = nodes.get(collect);
+      parent.addChild(working);
+      working = parent;
+    }
+    return working;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
new file mode 100644
index 0000000..36c565e
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.Map;
+
+import org.apache.crunch.Target;
+import org.apache.crunch.io.MapReduceTarget;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import com.google.common.collect.Maps;
+
+public class MSCROutputHandler implements OutputHandler {
+
+  private final Job job;
+  private final Path path;
+  private final boolean mapOnlyJob;
+
+  private DoNode workingNode;
+  private Map<Integer, PathTarget> multiPaths;
+  private int jobCount;
+
+  public MSCROutputHandler(Job job, Path outputPath, boolean mapOnlyJob) {
+    this.job = job;
+    this.path = outputPath;
+    this.mapOnlyJob = mapOnlyJob;
+    this.multiPaths = Maps.newHashMap();
+  }
+
+  public void configureNode(DoNode node, Target target) {
+    workingNode = node;
+    target.accept(this, node.getPType());
+  }
+
+  public boolean configure(Target target, PType<?> ptype) {
+    if (target instanceof MapReduceTarget) {
+      if (target instanceof PathTarget) {
+        multiPaths.put(jobCount, (PathTarget) target);
+      }
+
+      String name = PlanningParameters.MULTI_OUTPUT_PREFIX + jobCount;
+      jobCount++;
+      workingNode.setOutputName(name);
+      ((MapReduceTarget) target).configureForMapReduce(job, ptype, path, name);
+      return true;
+    }
+
+    return false;
+  }
+
+  public boolean isMapOnlyJob() {
+    return mapOnlyJob;
+  }
+
+  public Map<Integer, PathTarget> getMultiPaths() {
+    return multiPaths;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
new file mode 100644
index 0000000..3e1de38
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.collect.InputCollection;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+import org.apache.crunch.impl.mr.exec.MRExecutor;
+import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+public class MSCRPlanner {
+
+  private final MRPipeline pipeline;
+  private final Map<PCollectionImpl<?>, Set<Target>> outputs;
+  private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
+  private int lastJobID = 0;
+
+  public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs,
+      Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) {
+    this.pipeline = pipeline;
+    this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR);
+    this.outputs.putAll(outputs);
+    this.toMaterialize = toMaterialize;
+  }
+
+  // Used to ensure that we always build pipelines starting from the deepest
+  // outputs, which helps ensure that we handle intermediate outputs correctly.
+  private static final Comparator<PCollectionImpl<?>> DEPTH_COMPARATOR = new Comparator<PCollectionImpl<?>>() {
+    @Override
+    public int compare(PCollectionImpl<?> left, PCollectionImpl<?> right) {
+      int cmp = right.getDepth() - left.getDepth();
+      if (cmp == 0) {
+        // Ensure we don't throw away two output collections at the same depth.
+        // Using the collection name would be nicer here, but names aren't
+        // necessarily unique.
+        cmp = new Integer(right.hashCode()).compareTo(left.hashCode());
+      }
+      return cmp;
+    }
+  };  
+
+  public MRExecutor plan(Class<?> jarClass, Configuration conf) throws IOException {
+    Map<PCollectionImpl<?>, Set<SourceTarget<?>>> targetDeps = Maps.newTreeMap(DEPTH_COMPARATOR);
+    for (PCollectionImpl<?> pcollect : outputs.keySet()) {
+      targetDeps.put(pcollect, pcollect.getTargetDependencies());
+    }
+    
+    Multimap<Vertex, JobPrototype> assignments = HashMultimap.create();
+    Multimap<PCollectionImpl<?>, Vertex> protoDependency = HashMultimap.create();
+    while (!targetDeps.isEmpty()) {
+      Set<Target> allTargets = Sets.newHashSet();
+      for (PCollectionImpl<?> pcollect : targetDeps.keySet()) {
+        allTargets.addAll(outputs.get(pcollect));
+      }
+      GraphBuilder graphBuilder = new GraphBuilder();
+      
+      // Walk the current plan tree and build a graph in which the vertices are
+      // sources, targets, and GBK operations.
+      Set<PCollectionImpl<?>> currentStage = Sets.newHashSet();
+      Set<PCollectionImpl<?>> laterStage = Sets.newHashSet();
+      for (PCollectionImpl<?> output : targetDeps.keySet()) {
+        if (Sets.intersection(allTargets, targetDeps.get(output)).isEmpty()) {
+          graphBuilder.visitOutput(output);
+          currentStage.add(output);
+        } else {
+          laterStage.add(output);
+        }
+      }
+      
+      Graph baseGraph = graphBuilder.getGraph();
+      
+      // Create a new graph that splits up up dependent GBK nodes.
+      Graph graph = prepareFinalGraph(baseGraph);
+      
+      // Break the graph up into connected components.
+      List<List<Vertex>> components = graph.connectedComponents();
+      
+      // For each component, we will create one or more job prototypes,
+      // depending on its profile.
+      // For dependency handling, we only need to care about which
+      // job prototype a particular GBK is assigned to.
+      for (List<Vertex> component : components) {
+        assignments.putAll(constructJobPrototypes(component));
+      }
+
+      // Add in the job dependency information here.
+      for (Map.Entry<Vertex, JobPrototype> e : assignments.entries()) {
+        JobPrototype current = e.getValue();
+        List<Vertex> parents = graph.getParents(e.getKey());
+        for (Vertex parent : parents) {
+          for (JobPrototype parentJobProto : assignments.get(parent)) {
+            current.addDependency(parentJobProto);
+          }
+        }
+      }
+      
+      // Add cross-stage dependencies.
+      for (PCollectionImpl<?> output : currentStage) {
+        Set<Target> targets = outputs.get(output);
+        Vertex vertex = graph.getVertexAt(output);
+        for (PCollectionImpl<?> later : laterStage) {
+          if (!Sets.intersection(targets, targetDeps.get(later)).isEmpty()) {
+            protoDependency.put(later, vertex);
+          }
+        }
+        targetDeps.remove(output);
+      }
+    }
+    
+    // Cross-job dependencies.
+    for (Entry<PCollectionImpl<?>, Vertex> pd : protoDependency.entries()) {
+      Vertex d = new Vertex(pd.getKey());
+      Vertex dj = pd.getValue();
+      for (JobPrototype parent : assignments.get(dj)) {
+        for (JobPrototype child : assignments.get(d)) {
+          child.addDependency(parent);
+        }
+      }
+    }
+    
+    // Finally, construct the jobs from the prototypes and return.
+    DotfileWriter dotfileWriter = new DotfileWriter();
+    MRExecutor exec = new MRExecutor(jarClass, outputs, toMaterialize);
+    for (JobPrototype proto : Sets.newHashSet(assignments.values())) {
+      dotfileWriter.addJobPrototype(proto);
+      exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline));
+    }
+
+    String planDotFile = dotfileWriter.buildDotfile();
+    exec.setPlanDotFile(planDotFile);
+    conf.set(PlanningParameters.PIPELINE_PLAN_DOTFILE, planDotFile);
+
+    return exec;
+  }
+  
+  private Graph prepareFinalGraph(Graph baseGraph) {
+    Graph graph = new Graph();
+    
+    for (Vertex baseVertex : baseGraph) {
+      // Add all of the vertices in the base graph, but no edges (yet).
+      graph.addVertex(baseVertex.getPCollection(), baseVertex.isOutput());
+    }
+    
+    for (Edge e : baseGraph.getAllEdges()) {
+      // Add back all of the edges where neither vertex is a GBK and we do not
+      // have an output feeding into a GBK.
+      if (!(e.getHead().isGBK() && e.getTail().isGBK()) &&
+          !(e.getHead().isOutput() && e.getTail().isGBK())) {
+        Vertex head = graph.getVertexAt(e.getHead().getPCollection());
+        Vertex tail = graph.getVertexAt(e.getTail().getPCollection());
+        graph.getEdge(head, tail).addAllNodePaths(e.getNodePaths());
+      }
+    }
+    
+    for (Vertex baseVertex : baseGraph) {
+      if (baseVertex.isGBK()) {
+        Vertex vertex = graph.getVertexAt(baseVertex.getPCollection());
+        for (Edge e : baseVertex.getIncomingEdges()) {
+          if (e.getHead().isOutput()) {
+            // Execute an edge split.
+            Vertex splitTail = e.getHead();
+            PCollectionImpl<?> split = splitTail.getPCollection();
+            InputCollection<?> inputNode = handleSplitTarget(split);
+            Vertex splitHead = graph.addVertex(inputNode, false);
+            
+            // Divide up the node paths in the edge between the two GBK nodes so
+            // that each node is either owned by GBK1 -> newTail or newHead -> GBK2.
+            for (NodePath path : e.getNodePaths()) {
+              NodePath headPath = path.splitAt(split, splitHead.getPCollection());
+              graph.getEdge(vertex, splitTail).addNodePath(headPath);
+              graph.getEdge(splitHead, vertex).addNodePath(path);
+            }
+            
+            // Note the dependency between the vertices in the graph.
+            graph.markDependency(splitHead, splitTail);
+          } else if (!e.getHead().isGBK()) {
+            Vertex newHead = graph.getVertexAt(e.getHead().getPCollection());
+            graph.getEdge(newHead, vertex).addAllNodePaths(e.getNodePaths());
+          }
+        }
+        for (Edge e : baseVertex.getOutgoingEdges()) {
+          if (!e.getTail().isGBK()) {
+            Vertex newTail = graph.getVertexAt(e.getTail().getPCollection());
+            graph.getEdge(vertex, newTail).addAllNodePaths(e.getNodePaths());
+          } else {
+            // Execute an Edge split
+            Vertex newGraphTail = graph.getVertexAt(e.getTail().getPCollection());
+            PCollectionImpl split = e.getSplit();
+            InputCollection<?> inputNode = handleSplitTarget(split);
+            Vertex splitTail = graph.addVertex(split, true);
+            Vertex splitHead = graph.addVertex(inputNode, false);
+            
+            // Divide up the node paths in the edge between the two GBK nodes so
+            // that each node is either owned by GBK1 -> newTail or newHead -> GBK2.
+            for (NodePath path : e.getNodePaths()) {
+              NodePath headPath = path.splitAt(split, splitHead.getPCollection());
+              graph.getEdge(vertex, splitTail).addNodePath(headPath);
+              graph.getEdge(splitHead, newGraphTail).addNodePath(path);
+            }
+            
+            // Note the dependency between the vertices in the graph.
+            graph.markDependency(splitHead, splitTail);
+          }
+        }
+      }
+    }
+    
+    return graph;
+  }
+  
+  private Multimap<Vertex, JobPrototype> constructJobPrototypes(List<Vertex> component) {
+    Multimap<Vertex, JobPrototype> assignment = HashMultimap.create();
+    List<Vertex> gbks = Lists.newArrayList();
+    for (Vertex v : component) {
+      if (v.isGBK()) {
+        gbks.add(v);
+      }
+    }
+
+    if (gbks.isEmpty()) {
+      HashMultimap<Target, NodePath> outputPaths = HashMultimap.create();
+      for (Vertex v : component) {
+        if (v.isInput()) {
+          for (Edge e : v.getOutgoingEdges()) {
+            for (NodePath nodePath : e.getNodePaths()) {
+              PCollectionImpl target = nodePath.tail();
+              for (Target t : outputs.get(target)) {
+                outputPaths.put(t, nodePath);
+              }
+            }
+          }
+        }
+      }
+      if (outputPaths.isEmpty()) {
+        throw new IllegalStateException("No outputs?");
+      }
+      JobPrototype prototype = JobPrototype.createMapOnlyJob(
+          ++lastJobID, outputPaths, pipeline.createTempPath());
+      for (Vertex v : component) {
+        assignment.put(v, prototype);
+      }
+    } else {
+      Set<Edge> usedEdges = Sets.newHashSet();
+      for (Vertex g : gbks) {
+        Set<NodePath> inputs = Sets.newHashSet();
+        for (Edge e : g.getIncomingEdges()) {
+          inputs.addAll(e.getNodePaths());
+          usedEdges.add(e);
+        }
+        JobPrototype prototype = JobPrototype.createMapReduceJob(
+            ++lastJobID, (PGroupedTableImpl) g.getPCollection(), inputs, pipeline.createTempPath());
+        assignment.put(g, prototype);
+        for (Edge e : g.getIncomingEdges()) {
+          assignment.put(e.getHead(), prototype);
+          usedEdges.add(e);
+        }
+        HashMultimap<Target, NodePath> outputPaths = HashMultimap.create();
+        for (Edge e : g.getOutgoingEdges()) {
+          Vertex output = e.getTail();
+          for (Target t : outputs.get(output.getPCollection())) {
+            outputPaths.putAll(t, e.getNodePaths());
+          }
+          assignment.put(output, prototype);
+          usedEdges.add(e);
+        }
+        prototype.addReducePaths(outputPaths);
+      }
+      
+      // Check for any un-assigned vertices, which should be map-side outputs
+      // that we will need to run in a map-only job.
+      HashMultimap<Target, NodePath> outputPaths = HashMultimap.create();
+      Set<Vertex> orphans = Sets.newHashSet();
+      for (Vertex v : component) {
+
+        // Check if this vertex has multiple inputs but only a subset of
+        // them have already been assigned
+        boolean vertexHasUnassignedIncomingEdges = false;
+        if (v.isOutput()) {
+          for (Edge e : v.getIncomingEdges()) {
+            if (!usedEdges.contains(e)) {
+              vertexHasUnassignedIncomingEdges = true;
+            }
+          }
+        }
+
+        if (v.isOutput() && (vertexHasUnassignedIncomingEdges || !assignment.containsKey(v))) {
+          orphans.add(v);
+          for (Edge e : v.getIncomingEdges()) {
+            if (vertexHasUnassignedIncomingEdges && usedEdges.contains(e)) {
+              // We've already dealt with this incoming edge
+              continue;
+            }
+            orphans.add(e.getHead());
+            for (NodePath nodePath : e.getNodePaths()) {
+              PCollectionImpl target = nodePath.tail();
+              for (Target t : outputs.get(target)) {
+                outputPaths.put(t, nodePath);
+              }
+            }
+          }
+        }
+
+      }
+      if (!outputPaths.isEmpty()) {
+        JobPrototype prototype = JobPrototype.createMapOnlyJob(
+            ++lastJobID, outputPaths, pipeline.createTempPath());
+        for (Vertex orphan : orphans) {
+          assignment.put(orphan, prototype);
+        }
+      }
+    }
+    
+    return assignment;
+  }
+  
+  private InputCollection<?> handleSplitTarget(PCollectionImpl<?> splitTarget) {
+    if (!outputs.containsKey(splitTarget)) {
+      outputs.put(splitTarget, Sets.<Target> newHashSet());
+    }
+
+    SourceTarget srcTarget = null;
+    Target targetToReplace = null;
+    for (Target t : outputs.get(splitTarget)) {
+      if (t instanceof SourceTarget) {
+        srcTarget = (SourceTarget<?>) t;
+        break;
+      } else {
+        srcTarget = t.asSourceTarget(splitTarget.getPType());
+        if (srcTarget != null) {
+          targetToReplace = t;
+          break;
+        }
+      }
+    }
+    if (targetToReplace != null) {
+      outputs.get(splitTarget).remove(targetToReplace);
+    } else if (srcTarget == null) {
+      srcTarget = pipeline.createIntermediateOutput(splitTarget.getPType());
+    }
+    outputs.get(splitTarget).add(srcTarget);
+    splitTarget.materializeAt(srcTarget);
+
+    return (InputCollection<?>) pipeline.read(srcTarget);
+  }  
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
new file mode 100644
index 0000000..a090d93
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+
+import com.google.common.collect.Lists;
+
+class NodePath implements Iterable<PCollectionImpl<?>> {
+  private LinkedList<PCollectionImpl<?>> path;
+
+  public NodePath() {
+    this.path = Lists.newLinkedList();
+  }
+
+  public NodePath(PCollectionImpl<?> tail) {
+    this.path = Lists.newLinkedList();
+    this.path.add(tail);
+  }
+
+  public NodePath(NodePath other) {
+    this.path = Lists.newLinkedList(other.path);
+  }
+
+  public void push(PCollectionImpl<?> stage) {
+    this.path.push((PCollectionImpl<?>) stage);
+  }
+
+  public NodePath close(PCollectionImpl<?> head) {
+    this.path.push(head);
+    return this;
+  }
+
+  public Iterator<PCollectionImpl<?>> iterator() {
+    return path.iterator();
+  }
+
+  public Iterator<PCollectionImpl<?>> descendingIterator() {
+    return path.descendingIterator();
+  }
+
+  public PCollectionImpl<?> get(int index) {
+    return path.get(index);
+  }
+
+  public PCollectionImpl<?> head() {
+    return path.peekFirst();
+  }
+
+  public PCollectionImpl<?> tail() {
+    return path.peekLast();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !(other instanceof NodePath)) {
+      return false;
+    }
+    NodePath nodePath = (NodePath) other;
+    return path.equals(nodePath.path);
+  }
+
+  @Override
+  public int hashCode() {
+    return 17 + 37 * path.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    for (PCollectionImpl<?> collect : path) {
+      sb.append(collect.getName() + "|");
+    }
+    sb.deleteCharAt(sb.length() - 1);
+    return sb.toString();
+  }
+
+  public NodePath splitAt(int splitIndex, PCollectionImpl<?> newHead) {
+    NodePath top = new NodePath();
+    for (int i = 0; i <= splitIndex; i++) {
+      top.path.add(path.get(i));
+    }
+    LinkedList<PCollectionImpl<?>> nextPath = Lists.newLinkedList();
+    nextPath.add(newHead);
+    nextPath.addAll(path.subList(splitIndex + 1, path.size()));
+    path = nextPath;
+    return top;
+  }
+  
+  public NodePath splitAt(PCollectionImpl split, PCollectionImpl<?> newHead) {
+    NodePath top = new NodePath();
+    int splitIndex = 0;
+    for (PCollectionImpl p : path) {
+      top.path.add(p);
+      if (p == split) {
+        break;
+      }
+      splitIndex++;
+    }
+    LinkedList<PCollectionImpl<?>> nextPath = Lists.newLinkedList();
+    nextPath.add(newHead);
+    nextPath.addAll(path.subList(splitIndex + 1, path.size()));
+    path = nextPath;
+    return top;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
new file mode 100644
index 0000000..b90a911
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+/**
+ * Collection of Configuration keys and various constants used when planning MapReduce jobs for a
+ * pipeline.
+ */
+public class PlanningParameters {
+
+  public static final String MULTI_OUTPUT_PREFIX = "out";
+
+  public static final String CRUNCH_WORKING_DIRECTORY = "crunch.work.dir";
+
+  /**
+   * Configuration key under which a <a href="http://www.graphviz.org">DOT</a> file containing the
+   * pipeline job graph is stored by the planner.
+   */
+  public static final String PIPELINE_PLAN_DOTFILE = "crunch.planner.dotfile";
+
+  private PlanningParameters() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
new file mode 100644
index 0000000..f4aa668
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.crunch.Source;
+import org.apache.crunch.impl.mr.collect.InputCollection;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ *
+ */
+class Vertex {
+  private final PCollectionImpl impl;
+  
+  private boolean output;
+  private Set<Edge> incoming;
+  private Set<Edge> outgoing;
+  
+  public Vertex(PCollectionImpl impl) {
+    this.impl = impl;
+    this.incoming = Sets.newHashSet();
+    this.outgoing = Sets.newHashSet();
+  }
+  
+  public PCollectionImpl getPCollection() {
+    return impl;
+  }
+  
+  public boolean isInput() {
+    return impl instanceof InputCollection;
+  }
+  
+  public boolean isGBK() {
+    return impl instanceof PGroupedTableImpl;
+  }
+  
+  public void setOutput() {
+    this.output = true;
+  }
+  
+  public boolean isOutput() {
+    return output;
+  }
+  
+  public Source getSource() {
+    if (isInput()) {
+      return ((InputCollection) impl).getSource();
+    }
+    return null;
+  }
+  
+  public void addIncoming(Edge edge) {
+    this.incoming.add(edge);
+  }
+  
+  public void addOutgoing(Edge edge) {
+    this.outgoing.add(edge);
+  }
+  
+  public List<Vertex> getAllNeighbors() {
+    List<Vertex> n = Lists.newArrayList();
+    for (Edge e : incoming) {
+      n.add(e.getHead());
+    }
+    for (Edge e : outgoing) {
+      n.add(e.getTail());
+    }
+    return n;
+  }
+  
+  public Set<Edge> getAllEdges() {
+    return Sets.union(incoming, outgoing);
+  }
+  
+  public Set<Edge> getIncomingEdges() {
+    return incoming;
+  }
+  
+  public Set<Edge> getOutgoingEdges() {
+    return outgoing;
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || !(obj instanceof Vertex)) {
+      return false;
+    }
+    Vertex other = (Vertex) obj;
+    return impl.equals(other.impl);
+  }
+  
+  @Override
+  public int hashCode() {
+    return 17 + 37 * impl.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return new ReflectionToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).setExcludeFieldNames(
+        new String[] { "outgoing", "incoming" }).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java
new file mode 100644
index 0000000..47a3ded
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+public class CrunchCombiner extends CrunchReducer {
+
+  @Override
+  protected NodeContext getNodeContext() {
+    return NodeContext.COMBINE;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
new file mode 100644
index 0000000..eb5dd8a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.crunch.io.CrunchInputs;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.collect.Lists;
+
+public class CrunchInputFormat<K, V> extends InputFormat<K, V> {
+
+  @Override
+  public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
+    List<InputSplit> splits = Lists.newArrayList();
+    Configuration base = job.getConfiguration();
+    Map<FormatBundle, Map<Integer, List<Path>>> formatNodeMap = CrunchInputs.getFormatNodeMap(job);
+
+    // First, build a map of InputFormats to Paths
+    for (Map.Entry<FormatBundle, Map<Integer, List<Path>>> entry : formatNodeMap.entrySet()) {
+      FormatBundle inputBundle = entry.getKey();
+      Configuration conf = new Configuration(base);
+      inputBundle.configure(conf);
+      Job jobCopy = new Job(conf);
+      InputFormat<?, ?> format = (InputFormat<?, ?>) ReflectionUtils.newInstance(inputBundle.getFormatClass(),
+          jobCopy.getConfiguration());
+      for (Map.Entry<Integer, List<Path>> nodeEntry : entry.getValue().entrySet()) {
+        Integer nodeIndex = nodeEntry.getKey();
+        List<Path> paths = nodeEntry.getValue();
+        FileInputFormat.setInputPaths(jobCopy, paths.toArray(new Path[paths.size()]));
+
+        // Get splits for each input path and tag with InputFormat
+        // and Mapper types by wrapping in a TaggedInputSplit.
+        List<InputSplit> pathSplits = format.getSplits(jobCopy);
+        for (InputSplit pathSplit : pathSplits) {
+          splits.add(new CrunchInputSplit(pathSplit, inputBundle.getFormatClass(),
+              nodeIndex, jobCopy.getConfiguration()));
+        }
+      }
+    }
+    return splits;
+  }
+
+  @Override
+  public RecordReader<K, V> createRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
+      InterruptedException {
+    return new CrunchRecordReader<K, V>(inputSplit, context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
new file mode 100644
index 0000000..b41062b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+
+class CrunchInputSplit extends InputSplit implements Writable {
+
+  private InputSplit inputSplit;
+  private Class<? extends InputFormat<?, ?>> inputFormatClass;
+  private int nodeIndex;
+  private Configuration conf;
+
+  public CrunchInputSplit() {
+    // default constructor
+  }
+
+  public CrunchInputSplit(
+      InputSplit inputSplit,
+      Class<? extends InputFormat<?, ?>> inputFormatClass,
+      int nodeIndex,
+      Configuration conf) {
+    this.inputSplit = inputSplit;
+    this.inputFormatClass = inputFormatClass;
+    this.nodeIndex = nodeIndex;
+    this.conf = conf;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  public int getNodeIndex() {
+    return nodeIndex;
+  }
+
+  public InputSplit getInputSplit() {
+    return inputSplit;
+  }
+
+  public Class<? extends InputFormat<?, ?>> getInputFormatClass() {
+    return inputFormatClass;
+  }
+
+  @Override
+  public long getLength() throws IOException, InterruptedException {
+    return inputSplit.getLength();
+  }
+
+  @Override
+  public String[] getLocations() throws IOException, InterruptedException {
+    return inputSplit.getLocations();
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    nodeIndex = in.readInt();
+    conf = new Configuration();
+    conf.readFields(in);
+    inputFormatClass = (Class<? extends InputFormat<?, ?>>) readClass(in);
+    Class<? extends InputSplit> inputSplitClass = (Class<? extends InputSplit>) readClass(in);
+    inputSplit = (InputSplit) ReflectionUtils.newInstance(inputSplitClass, conf);
+    SerializationFactory factory = new SerializationFactory(conf);
+    Deserializer deserializer = factory.getDeserializer(inputSplitClass);
+    deserializer.open((DataInputStream) in);
+    inputSplit = (InputSplit) deserializer.deserialize(inputSplit);
+  }
+
+  private Class<?> readClass(DataInput in) throws IOException {
+    String className = Text.readString(in);
+    try {
+      return conf.getClassByName(className);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException("readObject can't find class", e);
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(nodeIndex);
+    conf.write(out);
+    Text.writeString(out, inputFormatClass.getName());
+    Text.writeString(out, inputSplit.getClass().getName());
+    SerializationFactory factory = new SerializationFactory(conf);
+    Serializer serializer = factory.getSerializer(inputSplit.getClass());
+    serializer.open((DataOutputStream) out);
+    serializer.serialize(inputSplit);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
new file mode 100644
index 0000000..70f0b01
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class CrunchMapper extends Mapper<Object, Object, Object, Object> {
+
+  private static final Log LOG = LogFactory.getLog(CrunchMapper.class);
+
+  private RTNode node;
+  private CrunchTaskContext ctxt;
+  private boolean debug;
+
+  @Override
+  protected void setup(Mapper<Object, Object, Object, Object>.Context context) {
+    List<RTNode> nodes;
+    this.ctxt = new CrunchTaskContext(context, NodeContext.MAP);
+    try {
+      nodes = ctxt.getNodes();
+    } catch (IOException e) {
+      LOG.info("Crunch deserialization error", e);
+      throw new CrunchRuntimeException(e);
+    }
+    if (nodes.size() == 1) {
+      this.node = nodes.get(0);
+    } else {
+      CrunchInputSplit split = (CrunchInputSplit) context.getInputSplit();
+      this.node = nodes.get(split.getNodeIndex());
+    }
+    this.debug = ctxt.isDebugRun();
+  }
+
+  @Override
+  protected void map(Object k, Object v, Mapper<Object, Object, Object, Object>.Context context) {
+    if (debug) {
+      try {
+        node.process(k, v);
+      } catch (Exception e) {
+        LOG.error("Mapper exception", e);
+      }
+    } else {
+      node.process(k, v);
+    }
+  }
+
+  @Override
+  protected void cleanup(Mapper<Object, Object, Object, Object>.Context context) {
+    node.cleanup();
+    ctxt.cleanup();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
new file mode 100644
index 0000000..fc8fb32
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+
+import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+class CrunchRecordReader<K, V> extends RecordReader<K, V> {
+
+  private final RecordReader<K, V> delegate;
+
+  public CrunchRecordReader(InputSplit inputSplit, final TaskAttemptContext context) throws IOException,
+      InterruptedException {
+    CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
+    InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils.newInstance(crunchSplit.getInputFormatClass(),
+        crunchSplit.getConf());
+    this.delegate = inputFormat.createRecordReader(crunchSplit.getInputSplit(),
+        TaskAttemptContextFactory.create(crunchSplit.getConf(), context.getTaskAttemptID()));
+  }
+
+  @Override
+  public void close() throws IOException {
+    delegate.close();
+  }
+
+  @Override
+  public K getCurrentKey() throws IOException, InterruptedException {
+    return delegate.getCurrentKey();
+  }
+
+  @Override
+  public V getCurrentValue() throws IOException, InterruptedException {
+    return delegate.getCurrentValue();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return delegate.getProgress();
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+    CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
+    InputSplit delegateSplit = crunchSplit.getInputSplit();
+    delegate.initialize(delegateSplit,
+        TaskAttemptContextFactory.create(crunchSplit.getConf(), context.getTaskAttemptID()));
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    return delegate.nextKeyValue();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
new file mode 100644
index 0000000..e5ddbd2
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.impl.SingleUseIterable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+public class CrunchReducer extends Reducer<Object, Object, Object, Object> {
+
+  private static final Log LOG = LogFactory.getLog(CrunchReducer.class);
+
+  private RTNode node;
+  private CrunchTaskContext ctxt;
+  private boolean debug;
+
+  protected NodeContext getNodeContext() {
+    return NodeContext.REDUCE;
+  }
+
+  @Override
+  protected void setup(Reducer<Object, Object, Object, Object>.Context context) {
+    this.ctxt = new CrunchTaskContext(context, getNodeContext());
+    try {
+      List<RTNode> nodes = ctxt.getNodes();
+      this.node = nodes.get(0);
+    } catch (IOException e) {
+      LOG.info("Crunch deserialization error", e);
+      throw new CrunchRuntimeException(e);
+    }
+    this.debug = ctxt.isDebugRun();
+  }
+
+  @Override
+  protected void reduce(Object key, Iterable<Object> values, Reducer<Object, Object, Object, Object>.Context context) {
+    values = new SingleUseIterable<Object>(values);
+    if (debug) {
+      try {
+        node.processIterable(key, values);
+      } catch (Exception e) {
+        LOG.error("Reducer exception", e);
+      }
+    } else {
+      node.processIterable(key, values);
+    }
+  }
+
+  @Override
+  protected void cleanup(Reducer<Object, Object, Object, Object>.Context context) {
+    node.cleanup();
+    ctxt.cleanup();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
new file mode 100644
index 0000000..c4f2873
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.io.CrunchOutputs;
+import org.apache.crunch.util.DistCache;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+class CrunchTaskContext {
+
+  private final TaskInputOutputContext<Object, Object, Object, Object> taskContext;
+  private final NodeContext nodeContext;
+  private CrunchOutputs<Object, Object> multipleOutputs;
+
+  public CrunchTaskContext(TaskInputOutputContext<Object, Object, Object, Object> taskContext, NodeContext nodeContext) {
+    this.taskContext = taskContext;
+    this.nodeContext = nodeContext;
+  }
+
+  public TaskInputOutputContext<Object, Object, Object, Object> getContext() {
+    return taskContext;
+  }
+
+  public NodeContext getNodeContext() {
+    return nodeContext;
+  }
+
+  public List<RTNode> getNodes() throws IOException {
+    Configuration conf = taskContext.getConfiguration();
+    Path path = new Path(new Path(conf.get(PlanningParameters.CRUNCH_WORKING_DIRECTORY)), nodeContext.toString());
+    @SuppressWarnings("unchecked")
+    List<RTNode> nodes = (List<RTNode>) DistCache.read(conf, path);
+    if (nodes != null) {
+      for (RTNode node : nodes) {
+        node.initialize(this);
+      }
+    }
+    return nodes;
+  }
+
+  public boolean isDebugRun() {
+    Configuration conf = taskContext.getConfiguration();
+    return conf.getBoolean(RuntimeParameters.DEBUG, false);
+  }
+
+  public void cleanup() {
+    if (multipleOutputs != null) {
+      try {
+        multipleOutputs.close();
+      } catch (IOException e) {
+        throw new CrunchRuntimeException(e);
+      } catch (InterruptedException e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+  }
+
+  public CrunchOutputs<Object, Object> getMultipleOutputs() {
+    if (multipleOutputs == null) {
+      multipleOutputs = new CrunchOutputs<Object, Object>(taskContext);
+    }
+    return multipleOutputs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
new file mode 100644
index 0000000..ffc9e7c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import org.apache.crunch.impl.mr.plan.DoNode;
+
+/**
+ * Enum that is associated with a serialized {@link DoNode} instance, so we know
+ * how to use it within the context of a particular MR job.
+ * 
+ */
+public enum NodeContext {
+  MAP,
+  REDUCE,
+  COMBINE;
+
+  public String getConfigurationKey() {
+    return "crunch.donode." + toString().toLowerCase();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
new file mode 100644
index 0000000..ce7b795
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.impl.mr.emit.IntermediateEmitter;
+import org.apache.crunch.impl.mr.emit.MultipleOutputEmitter;
+import org.apache.crunch.impl.mr.emit.OutputEmitter;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
+
+public class RTNode implements Serializable {
+
+  private static final Log LOG = LogFactory.getLog(RTNode.class);
+
+  private final String nodeName;
+  private DoFn<Object, Object> fn;
+  private PType<Object> outputPType;
+  private final List<RTNode> children;
+  private final Converter inputConverter;
+  private final Converter outputConverter;
+  private final String outputName;
+
+  private transient Emitter<Object> emitter;
+
+  public RTNode(DoFn<Object, Object> fn, PType<Object> outputPType, String name, List<RTNode> children,
+      Converter inputConverter,
+      Converter outputConverter, String outputName) {
+    this.fn = fn;
+    this.outputPType = outputPType;
+    this.nodeName = name;
+    this.children = children;
+    this.inputConverter = inputConverter;
+    this.outputConverter = outputConverter;
+    this.outputName = outputName;
+  }
+
+  public void initialize(CrunchTaskContext ctxt) {
+    if (emitter != null) {
+      // Already initialized
+      return;
+    }
+
+    fn.setContext(ctxt.getContext());
+    fn.initialize();
+    for (RTNode child : children) {
+      child.initialize(ctxt);
+    }
+
+    if (outputConverter != null) {
+      if (outputName != null) {
+        this.emitter = new MultipleOutputEmitter(outputConverter, ctxt.getMultipleOutputs(),
+            outputName);
+      } else {
+        this.emitter = new OutputEmitter(outputConverter, ctxt.getContext());
+      }
+    } else if (!children.isEmpty()) {
+      this.emitter = new IntermediateEmitter(outputPType, children,
+          ctxt.getContext().getConfiguration());
+    } else {
+      throw new CrunchRuntimeException("Invalid RTNode config: no emitter for: " + nodeName);
+    }
+  }
+
+  public boolean isLeafNode() {
+    return outputConverter != null && children.isEmpty();
+  }
+
+  public void process(Object input) {
+    try {
+      fn.process(input, emitter);
+    } catch (CrunchRuntimeException e) {
+      if (!e.wasLogged()) {
+        LOG.info(String.format("Crunch exception in '%s' for input: %s", nodeName, input.toString()), e);
+        e.markLogged();
+      }
+      throw e;
+    }
+  }
+
+  public void process(Object key, Object value) {
+    process(inputConverter.convertInput(key, value));
+  }
+
+  public void processIterable(Object key, Iterable values) {
+    process(inputConverter.convertIterableInput(key, values));
+  }
+
+  public void cleanup() {
+    fn.cleanup(emitter);
+    emitter.flush();
+    for (RTNode child : children) {
+      child.cleanup();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "RTNode [nodeName=" + nodeName + ", fn=" + fn + ", children=" + children + ", inputConverter="
+        + inputConverter + ", outputConverter=" + outputConverter + ", outputName=" + outputName + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
new file mode 100644
index 0000000..604c49c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+/**
+ * Parameters used during the runtime execution.
+ */
+public class RuntimeParameters {
+
+  public static final String AGGREGATOR_BUCKETS = "crunch.aggregator.buckets";
+
+  public static final String DEBUG = "crunch.debug";
+
+  public static final String TMP_DIR = "crunch.tmp.dir";
+
+  public static final String LOG_JOB_PROGRESS = "crunch.log.job.progress";
+
+  public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
+
+  // Not instantiated
+  private RuntimeParameters() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/At.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/At.java b/crunch-core/src/main/java/org/apache/crunch/io/At.java
new file mode 100644
index 0000000..a6f0782
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/At.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.TableSourceTarget;
+import org.apache.crunch.io.avro.AvroFileSourceTarget;
+import org.apache.crunch.io.seq.SeqFileSourceTarget;
+import org.apache.crunch.io.seq.SeqFileTableSourceTarget;
+import org.apache.crunch.io.text.TextFileSourceTarget;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * <p>Static factory methods for creating common {@link SourceTarget} types, which may be treated as both a {@code Source}
+ * and a {@code Target}.</p>
+ * 
+ * <p>The {@code At} methods is analogous to the {@link From} and {@link To} factory methods, but is used for
+ * storing intermediate outputs that need to be passed from one run of a MapReduce pipeline to another run. The
+ * {@code SourceTarget} object acts as both a {@code Source} and a {@Target}, which enables it to provide this
+ * functionality.
+ * 
+ * <code>
+ *   Pipeline pipeline = new MRPipeline(this.getClass());
+ *   // Create our intermediate storage location
+ *   SourceTarget<String> intermediate = At.textFile("/temptext");
+ *   ...
+ *   // Write out the output of the first phase of a pipeline.
+ *   pipeline.write(phase1, intermediate);
+ *   
+ *   // Explicitly call run to kick off the pipeline.
+ *   pipeline.run();
+ *   
+ *   // And then kick off a second phase by consuming the output
+ *   // from the first phase.
+ *   PCollection<String> phase2Input = pipeline.read(intermediate);
+ *   ...
+ * </code>
+ * </p>
+ * 
+ * <p>The {@code SourceTarget} abstraction is useful when we care about reading the intermediate
+ * outputs of a pipeline as well as the final results.</p>
+ */
+public class At {
+
+  /**
+   * Creates a {@code SourceTarget<T>} instance from the Avro file(s) at the given path name.
+   * 
+   * @param pathName The name of the path to the data on the filesystem
+   * @param avroClass The subclass of {@code SpecificRecord} to use for the Avro file
+   * @return A new {@code SourceTarget<T>} instance
+   */
+  public static <T extends SpecificRecord> SourceTarget<T> avroFile(String pathName, Class<T> avroClass) {
+    return avroFile(new Path(pathName), avroClass);  
+  }
+
+  /**
+   * Creates a {@code SourceTarget<T>} instance from the Avro file(s) at the given {@code Path}.
+   * 
+   * @param path The {@code Path} to the data
+   * @param avroClass The subclass of {@code SpecificRecord} to use for the Avro file
+   * @return A new {@code SourceTarget<T>} instance
+   */
+  public static <T extends SpecificRecord> SourceTarget<T> avroFile(Path path, Class<T> avroClass) {
+    return avroFile(path, Avros.specifics(avroClass));  
+  }
+  
+  /**
+   * Creates a {@code SourceTarget<T>} instance from the Avro file(s) at the given path name.
+   * 
+   * @param pathName The name of the path to the data on the filesystem
+   * @param avroType The {@code AvroType} for the Avro records
+   * @return A new {@code SourceTarget<T>} instance
+   */
+  public static <T> SourceTarget<T> avroFile(String pathName, AvroType<T> avroType) {
+    return avroFile(new Path(pathName), avroType);
+  }
+
+  /**
+   * Creates a {@code SourceTarget<T>} instance from the Avro file(s) at the given {@code Path}.
+   * 
+   * @param path The {@code Path} to the data
+   * @param avroType The {@code AvroType} for the Avro records
+   * @return A new {@code SourceTarget<T>} instance
+   */
+  public static <T> SourceTarget<T> avroFile(Path path, AvroType<T> avroType) {
+    return new AvroFileSourceTarget<T>(path, avroType);
+  }
+
+  /**
+   * Creates a {@code SourceTarget<T>} instance from the SequenceFile(s) at the given path name
+   * from the value field of each key-value pair in the SequenceFile(s).
+   * 
+   * @param pathName The name of the path to the data on the filesystem
+   * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
+   * @return A new {@code SourceTarget<T>} instance
+   */
+  public static <T extends Writable> SourceTarget<T> sequenceFile(String pathName, Class<T> valueClass) {
+    return sequenceFile(new Path(pathName), valueClass);
+  }
+
+  /**
+   * Creates a {@code SourceTarget<T>} instance from the SequenceFile(s) at the given {@code Path}
+   * from the value field of each key-value pair in the SequenceFile(s).
+   * 
+   * @param path The {@code Path} to the data
+   * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
+   * @return A new {@code SourceTarget<T>} instance
+   */
+  public static <T extends Writable> SourceTarget<T> sequenceFile(Path path, Class<T> valueClass) {
+    return sequenceFile(path, Writables.writables(valueClass));
+  }
+  
+  /**
+   * Creates a {@code SourceTarget<T>} instance from the SequenceFile(s) at the given path name
+   * from the value field of each key-value pair in the SequenceFile(s).
+   * 
+   * @param pathName The name of the path to the data on the filesystem
+   * @param ptype The {@code PType} for the value of the SequenceFile entry
+   * @return A new {@code SourceTarget<T>} instance
+   */
+  public static <T> SourceTarget<T> sequenceFile(String pathName, PType<T> ptype) {
+    return sequenceFile(new Path(pathName), ptype);
+  }
+
+  /**
+   * Creates a {@code SourceTarget<T>} instance from the SequenceFile(s) at the given {@code Path}
+   * from the value field of each key-value pair in the SequenceFile(s).
+   * 
+   * @param path The {@code Path} to the data
+   * @param ptype The {@code PType} for the value of the SequenceFile entry
+   * @return A new {@code SourceTarget<T>} instance
+   */
+  public static <T> SourceTarget<T> sequenceFile(Path path, PType<T> ptype) {
+    return new SeqFileSourceTarget<T>(path, ptype);
+  }
+
+  /**
+   * Creates a {@code TableSourceTarget<K, V>} instance from the SequenceFile(s) at the given path name
+   * from the key-value pairs in the SequenceFile(s).
+   * 
+   * @param pathName The name of the path to the data on the filesystem
+   * @param keyClass The {@code Writable} type for the key of the SequenceFile entry
+   * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
+   * @return A new {@code TableSourceTarget<K, V>} instance
+   */
+  public static <K extends Writable, V extends Writable> TableSourceTarget<K, V> sequenceFile(
+      String pathName, Class<K> keyClass, Class<V> valueClass) {
+    return sequenceFile(new Path(pathName), keyClass, valueClass);
+  }
+
+  /**
+   * Creates a {@code TableSourceTarget<K, V>} instance from the SequenceFile(s) at the given {@code Path}
+   * from the key-value pairs in the SequenceFile(s).
+   * 
+   * @param path The {@code Path} to the data
+   * @param keyClass The {@code Writable} type for the key of the SequenceFile entry
+   * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
+   * @return A new {@code TableSourceTarget<K, V>} instance
+   */
+  public static <K extends Writable, V extends Writable> TableSourceTarget<K, V> sequenceFile(
+      Path path, Class<K> keyClass, Class<V> valueClass) {
+    return sequenceFile(path, Writables.writables(keyClass), Writables.writables(valueClass));
+  }
+  
+  /**
+   * Creates a {@code TableSourceTarget<K, V>} instance from the SequenceFile(s) at the given path name
+   * from the key-value pairs in the SequenceFile(s).
+   * 
+   * @param pathName The name of the path to the data on the filesystem
+   * @param keyType The {@code PType} for the key of the SequenceFile entry
+   * @param valueType The {@code PType} for the value of the SequenceFile entry
+   * @return A new {@code TableSourceTarget<K, V>} instance
+   */
+  public static <K, V> TableSourceTarget<K, V> sequenceFile(String pathName, PType<K> keyType, PType<V> valueType) {
+    return sequenceFile(new Path(pathName), keyType, valueType);
+  }
+
+  /**
+   * Creates a {@code TableSourceTarget<K, V>} instance from the SequenceFile(s) at the given {@code Path}
+   * from the key-value pairs in the SequenceFile(s).
+   * 
+   * @param path The {@code Path} to the data
+   * @param keyType The {@code PType} for the key of the SequenceFile entry
+   * @param valueType The {@code PType} for the value of the SequenceFile entry
+   * @return A new {@code TableSourceTarget<K, V>} instance
+   */
+  public static <K, V> TableSourceTarget<K, V> sequenceFile(Path path, PType<K> keyType, PType<V> valueType) {
+    PTypeFamily ptf = keyType.getFamily();
+    return new SeqFileTableSourceTarget<K, V>(path, ptf.tableOf(keyType, valueType));
+  }
+
+  /**
+   * Creates a {@code SourceTarget<String>} instance for the text file(s) at the given path name.
+   * 
+   * @param pathName The name of the path to the data on the filesystem
+   * @return A new {@code SourceTarget<String>} instance
+   */
+  public static SourceTarget<String> textFile(String pathName) {
+    return textFile(new Path(pathName));
+  }
+
+  /**
+   * Creates a {@code SourceTarget<String>} instance for the text file(s) at the given {@code Path}.
+   * 
+   * @param path The {@code Path} to the data
+   * @return A new {@code SourceTarget<String>} instance
+   */
+  public static SourceTarget<String> textFile(Path path) {
+    return textFile(path, Writables.strings());
+  }
+
+  /**
+   * Creates a {@code SourceTarget<T>} instance for the text file(s) at the given path name using
+   * the provided {@code PType<T>} to convert the input text.
+   * 
+   * @param pathName The name of the path to the data on the filesystem
+   * @param ptype The {@code PType<T>} to use to process the input text
+   * @return A new {@code SourceTarget<T>} instance
+   */
+  public static <T> SourceTarget<T> textFile(String pathName, PType<T> ptype) {
+    return textFile(new Path(pathName), ptype);
+  }
+
+  /**
+   * Creates a {@code SourceTarget<T>} instance for the text file(s) at the given {@code Path} using
+   * the provided {@code PType<T>} to convert the input text.
+   * 
+   * @param path The {@code Path} to the data
+   * @param ptype The {@code PType<T>} to use to process the input text
+   * @return A new {@code SourceTarget<T>} instance
+   */
+  public static <T> SourceTarget<T> textFile(Path path, PType<T> ptype) {
+    return new TextFileSourceTarget<T>(path, ptype);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/CompositePathIterable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CompositePathIterable.java b/crunch-core/src/main/java/org/apache/crunch/io/CompositePathIterable.java
new file mode 100644
index 0000000..a4723e9
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CompositePathIterable.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+public class CompositePathIterable<T> implements Iterable<T> {
+
+  private final FileStatus[] stati;
+  private final FileSystem fs;
+  private final FileReaderFactory<T> readerFactory;
+
+  private static final PathFilter FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return !path.getName().startsWith("_");
+    }
+  };
+
+  public static <S> Iterable<S> create(FileSystem fs, Path path, FileReaderFactory<S> readerFactory) throws IOException {
+
+    if (!fs.exists(path)) {
+      throw new IOException("No files found to materialize at: " + path);
+    }
+
+    FileStatus[] stati = null;
+    try {
+      stati = fs.listStatus(path, FILTER);
+    } catch (FileNotFoundException e) {
+      stati = null;
+    }
+    if (stati == null) {
+      throw new IOException("No files found to materialize at: " + path);
+    }
+
+    if (stati.length == 0) {
+      return Collections.emptyList();
+    } else {
+      return new CompositePathIterable<S>(stati, fs, readerFactory);
+    }
+
+  }
+
+  private CompositePathIterable(FileStatus[] stati, FileSystem fs, FileReaderFactory<T> readerFactory) {
+    this.stati = stati;
+    this.fs = fs;
+    this.readerFactory = readerFactory;
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+
+    return new UnmodifiableIterator<T>() {
+      private int index = 0;
+      private Iterator<T> iter = readerFactory.read(fs, stati[index++].getPath());
+
+      @Override
+      public boolean hasNext() {
+        if (!iter.hasNext()) {
+          while (index < stati.length) {
+            iter = readerFactory.read(fs, stati[index++].getPath());
+            if (iter.hasNext()) {
+              return true;
+            }
+          }
+          return false;
+        }
+        return true;
+      }
+
+      @Override
+      public T next() {
+        return iter.next();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
new file mode 100644
index 0000000..d154db2
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Helper functions for configuring multiple {@code InputFormat} instances within a single
+ * Crunch MapReduce job.
+ */
+public class CrunchInputs {
+  public static final String CRUNCH_INPUTS = "crunch.inputs.dir";
+
+  private static final char RECORD_SEP = ',';
+  private static final char FIELD_SEP = ';';
+  private static final Joiner JOINER = Joiner.on(FIELD_SEP);
+  private static final Splitter SPLITTER = Splitter.on(FIELD_SEP);
+
+  public static void addInputPath(Job job, Path path, FormatBundle inputBundle, int nodeIndex) {
+    Configuration conf = job.getConfiguration();
+    String inputs = JOINER.join(inputBundle.serialize(), String.valueOf(nodeIndex), path.toString());
+    String existing = conf.get(CRUNCH_INPUTS);
+    conf.set(CRUNCH_INPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs);
+  }
+
+  public static Map<FormatBundle, Map<Integer, List<Path>>> getFormatNodeMap(JobContext job) {
+    Map<FormatBundle, Map<Integer, List<Path>>> formatNodeMap = Maps.newHashMap();
+    Configuration conf = job.getConfiguration();
+    for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_INPUTS))) {
+      List<String> fields = Lists.newArrayList(SPLITTER.split(input));
+      FormatBundle<InputFormat> inputBundle = FormatBundle.fromSerialized(fields.get(0), InputFormat.class);
+      if (!formatNodeMap.containsKey(inputBundle)) {
+        formatNodeMap.put(inputBundle, Maps.<Integer, List<Path>> newHashMap());
+      }
+      Integer nodeIndex = Integer.valueOf(fields.get(1));
+      if (!formatNodeMap.get(inputBundle).containsKey(nodeIndex)) {
+        formatNodeMap.get(inputBundle).put(nodeIndex, Lists.<Path> newLinkedList());
+      }
+      formatNodeMap.get(inputBundle).get(nodeIndex).add(new Path(fields.get(2)));
+    }
+    return formatNodeMap;
+  }
+
+}


Mime
View raw message