incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [11/28] Rename com.cloudera.crunch -> org.apache.crunch in the Java core
Date Sat, 07 Jul 2012 21:49:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java b/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
new file mode 100644
index 0000000..5780da8
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.List;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Source;
+import org.apache.crunch.impl.mr.run.NodeContext;
+import org.apache.crunch.impl.mr.run.RTNode;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.PType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class DoNode {
+
+  private static final List<DoNode> NO_CHILDREN = ImmutableList.of();
+
+  private final DoFn fn;
+  private final String name;
+  private final PType<?> ptype;
+  private final List<DoNode> children;
+  private final Converter outputConverter;
+  private final Source<?> source;
+  private String outputName;
+
+  private DoNode(DoFn fn, String name, PType<?> ptype, List<DoNode> children,
+      Converter outputConverter, Source<?> source) {
+    this.fn = fn;
+    this.name = name;
+    this.ptype = ptype;
+    this.children = children;
+    this.outputConverter = outputConverter;
+    this.source = source;
+  }
+
+  private static List<DoNode> allowsChildren() {
+    return Lists.newArrayList();
+  }
+
+  public static <K, V> DoNode createGroupingNode(String name,
+      PGroupedTableType<K, V> ptype) {
+    DoFn<?,?> fn = ptype.getOutputMapFn();
+    return new DoNode(fn, name, ptype, NO_CHILDREN,
+        ptype.getGroupingConverter(), null);
+  }
+  
+  public static <S> DoNode createOutputNode(String name, PType<S> ptype) {
+    Converter outputConverter = ptype.getConverter();
+    DoFn<?,?> fn = ptype.getOutputMapFn();
+    return new DoNode(fn, name, ptype, NO_CHILDREN,
+        outputConverter, null);
+  }
+
+  public static DoNode createFnNode(String name, DoFn<?, ?> function,
+      PType<?> ptype) {
+    return new DoNode(function, name, ptype, allowsChildren(), null, null);
+  }
+
+  public static <S> DoNode createInputNode(Source<S> source) {
+    PType<?> ptype = source.getType();
+    DoFn<?,?> fn = ptype.getInputMapFn();
+    return new DoNode(fn, source.toString(), ptype, allowsChildren(), null,
+        source);
+  }
+
+  public boolean isInputNode() {
+    return source != null;
+  }
+  
+  public boolean isOutputNode() {
+    return outputConverter != null;
+  }
+  
+  public String getName() {
+    return name;
+  }
+  
+  public List<DoNode> getChildren() {
+    return children;
+  }
+  
+  public Source<?> getSource() {
+    return source;
+  }
+
+  public PType<?> getPType() {
+    return ptype;
+  }
+
+  public DoNode addChild(DoNode node) {
+    if (!children.contains(node)) {
+      this.children.add(node);
+    }
+    return this;
+  }
+
+  public void setOutputName(String outputName) {
+    if (outputConverter == null) {
+      throw new IllegalStateException(
+          "Cannot set output name w/o output converter: " + outputName);
+    }
+    this.outputName = outputName;
+  }
+
+  public RTNode toRTNode(boolean inputNode, Configuration conf, NodeContext nodeContext) {
+    List<RTNode> childRTNodes = Lists.newArrayList();
+    fn.configure(conf);
+    for (DoNode child : children) {
+      childRTNodes.add(child.toRTNode(false, conf, nodeContext));
+    }
+
+    Converter inputConverter = null;
+    if (inputNode) {
+      if (nodeContext == NodeContext.MAP) {
+        inputConverter = ptype.getConverter();
+      } else {
+        inputConverter = ((PGroupedTableType<?,?>) ptype).getGroupingConverter();
+      }
+    }          
+    return new RTNode(fn, name, childRTNodes, inputConverter, outputConverter,
+        outputName);
+  }
+  
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !(other instanceof DoNode)) {
+      return false;
+    }
+    if (this == other) {
+      return true;
+    }
+    DoNode o = (DoNode) other;
+    return (name.equals(o.name) && fn.equals(o.fn) && source == o.source &&
+        outputConverter == o.outputConverter);
+  }
+  
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    return hcb.append(name).append(fn).append(source)
+        .append(outputConverter).toHashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java b/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java
new file mode 100644
index 0000000..55f690a
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+/**
+ * Visitor that traverses the {@code DoNode} instances in a job and builds
+ * a String that identifies the stages of the pipeline that belong to
+ * this job.
+ */
+public class JobNameBuilder {
+  
+  private static final Joiner JOINER = Joiner.on("+");
+  private static final Joiner CHILD_JOINER = Joiner.on("/");
+  
+  private String pipelineName;
+  List<String> rootStack = Lists.newArrayList();
+  
+  public JobNameBuilder(final String pipelineName){
+    this.pipelineName = pipelineName;
+  }
+  
+  public void visit(DoNode node) {
+    visit(node, rootStack);
+  }
+  
+  public void visit(List<DoNode> nodes) {
+    visit(nodes, rootStack);
+  }
+  
+  private void visit(List<DoNode> nodes, List<String> stack) {
+    if (nodes.size() == 1) {
+      visit(nodes.get(0), stack);
+    } else {
+      List<String> childStack = Lists.newArrayList();
+      for (int i = 0; i < nodes.size(); i++) {
+        DoNode node = nodes.get(i);
+        List<String> subStack = Lists.newArrayList();
+        visit(node, subStack);
+        if (!subStack.isEmpty()) {
+          childStack.add("[" + JOINER.join(subStack) + "]");
+        }
+      }
+      if (!childStack.isEmpty()) {
+        stack.add("[" + CHILD_JOINER.join(childStack) + "]");
+      }
+    }
+  }
+  
+  private void visit(DoNode node, List<String> stack) {
+    String name = node.getName();
+    if (!name.isEmpty()) {
+      stack.add(node.getName());
+    }
+    visit(node.getChildren(), stack);
+  }
+  
+  public String build() {
+    return String.format("%s: %s", pipelineName, JOINER.join(rootStack));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
new file mode 100644
index 0000000..165641a
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.collect.DoTableImpl;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+import org.apache.crunch.impl.mr.exec.CrunchJob;
+import org.apache.crunch.impl.mr.run.CrunchCombiner;
+import org.apache.crunch.impl.mr.run.CrunchInputFormat;
+import org.apache.crunch.impl.mr.run.CrunchMapper;
+import org.apache.crunch.impl.mr.run.CrunchReducer;
+import org.apache.crunch.impl.mr.run.NodeContext;
+import org.apache.crunch.impl.mr.run.RTNode;
+import org.apache.crunch.util.DistCache;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class JobPrototype {
+
+  public static JobPrototype createMapReduceJob(PGroupedTableImpl<?,?> group,
+      Set<NodePath> inputs, Path workingPath) {
+    return new JobPrototype(inputs, group, workingPath);
+  }
+
+  public static JobPrototype createMapOnlyJob(
+      HashMultimap<Target, NodePath> mapNodePaths, Path workingPath) {
+    return new JobPrototype(mapNodePaths, workingPath);
+  }
+
+  private final Set<NodePath> mapNodePaths;
+  private final PGroupedTableImpl<?,?> group;
+  private final Set<JobPrototype> dependencies = Sets.newHashSet();
+  private final Map<PCollectionImpl<?>, DoNode> nodes = Maps.newHashMap();
+  private final Path workingPath;
+  
+  private HashMultimap<Target, NodePath> targetsToNodePaths;
+  private DoTableImpl<?,?> combineFnTable;
+
+  private CrunchJob job;
+
+  private JobPrototype(Set<NodePath> inputs, PGroupedTableImpl<?,?> group,
+      Path workingPath) {
+    this.mapNodePaths = ImmutableSet.copyOf(inputs);
+    this.group = group;
+    this.workingPath = workingPath;
+    this.targetsToNodePaths = null;
+  }
+
+  private JobPrototype(HashMultimap<Target, NodePath> outputPaths, Path workingPath) {
+    this.group = null;
+    this.mapNodePaths = null;
+    this.workingPath = workingPath;
+    this.targetsToNodePaths = outputPaths;
+  }
+
+  public void addReducePaths(HashMultimap<Target, NodePath> outputPaths) {
+    if (group == null) {
+      throw new IllegalStateException(
+          "Cannot add a reduce phase to a map-only job");
+    }
+    this.targetsToNodePaths = outputPaths;
+  }
+
+  public void addDependency(JobPrototype dependency) {
+    this.dependencies.add(dependency);
+  }
+
+  public CrunchJob getCrunchJob(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
+    if (job == null) {
+      job = build(jarClass, conf, pipeline);
+      for (JobPrototype proto : dependencies) {
+        job.addDependingJob(proto.getCrunchJob(jarClass, conf, pipeline));
+      }
+    }
+    return job;
+  }
+
+  private CrunchJob build(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException {
+    Job job = new Job(conf);
+    conf = job.getConfiguration();
+    conf.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, workingPath.toString());
+    job.setJarByClass(jarClass);
+    
+    Set<DoNode> outputNodes = Sets.newHashSet();
+    Set<Target> targets = targetsToNodePaths.keySet();
+    Path outputPath = new Path(workingPath, "output");
+    MSCROutputHandler outputHandler = new MSCROutputHandler(job, outputPath,
+        group == null);
+    for (Target target : targets) {
+      DoNode node = null;
+      for (NodePath nodePath : targetsToNodePaths.get(target)) {
+        if (node == null) {
+          PCollectionImpl<?> collect = nodePath.tail();
+          node = DoNode.createOutputNode(target.toString(), collect.getPType());
+          outputHandler.configureNode(node, target);
+        }
+        outputNodes.add(walkPath(nodePath.descendingIterator(), node));
+      }
+    }
+
+    job.setMapperClass(CrunchMapper.class);
+    List<DoNode> inputNodes;
+    DoNode reduceNode = null;
+    if (group != null) {
+      job.setReducerClass(CrunchReducer.class);
+      List<DoNode> reduceNodes = Lists.newArrayList(outputNodes);
+      serialize(reduceNodes, conf, workingPath, NodeContext.REDUCE);
+      reduceNode = reduceNodes.get(0);
+
+      if (combineFnTable != null) {
+        job.setCombinerClass(CrunchCombiner.class);
+        DoNode combinerInputNode = group.createDoNode();
+        DoNode combineNode = combineFnTable.createDoNode();
+        combineNode.addChild(group.getGroupingNode());
+        combinerInputNode.addChild(combineNode);
+        serialize(ImmutableList.of(combinerInputNode), conf, workingPath,
+            NodeContext.COMBINE);
+      }
+
+      group.configureShuffle(job);
+
+      DoNode mapOutputNode = group.getGroupingNode();      
+      Set<DoNode> mapNodes = Sets.newHashSet();
+      for (NodePath nodePath : mapNodePaths) {
+        // Advance these one step, since we've already configured
+        // the grouping node, and the PGroupedTableImpl is the tail
+        // of the NodePath.
+        Iterator<PCollectionImpl<?>> iter = nodePath.descendingIterator();
+        iter.next();
+        mapNodes.add(walkPath(iter, mapOutputNode));
+      }
+      inputNodes = Lists.newArrayList(mapNodes);
+    } else { // No grouping
+      job.setNumReduceTasks(0);
+      inputNodes = Lists.newArrayList(outputNodes);
+    }
+    serialize(inputNodes, conf, workingPath, NodeContext.MAP);
+
+    if (inputNodes.size() == 1) {
+      DoNode inputNode = inputNodes.get(0);
+      inputNode.getSource().configureSource(job, -1);
+    } else {
+      for (int i = 0; i < inputNodes.size(); i++) {
+        DoNode inputNode = inputNodes.get(i);
+        inputNode.getSource().configureSource(job, i);
+      }
+      job.setInputFormatClass(CrunchInputFormat.class);
+    }
+    job.setJobName(createJobName(pipeline.getName(), inputNodes, reduceNode));
+    
+    return new CrunchJob(job, outputPath, outputHandler);
+  }
+
+  private void serialize(List<DoNode> nodes, Configuration conf, Path workingPath,
+      NodeContext context) throws IOException {
+    List<RTNode> rtNodes = Lists.newArrayList();
+    for (DoNode node : nodes) {
+      rtNodes.add(node.toRTNode(true, conf, context));
+    }
+    Path path = new Path(workingPath, context.toString());
+    DistCache.write(conf, path, rtNodes);
+  }
+
+  private String createJobName(String pipelineName, List<DoNode> mapNodes, DoNode reduceNode) {
+    JobNameBuilder builder = new JobNameBuilder(pipelineName);
+    builder.visit(mapNodes);
+    if (reduceNode != null) {
+      builder.visit(reduceNode);
+    }
+    return builder.build();
+  }
+  
+  private DoNode walkPath(Iterator<PCollectionImpl<?>> iter, DoNode working) {
+    while (iter.hasNext()) {
+      PCollectionImpl<?> collect = iter.next();
+      if (combineFnTable != null &&
+          !(collect instanceof PGroupedTableImpl)) {
+        combineFnTable = null;
+      } else if (collect instanceof DoTableImpl &&
+          ((DoTableImpl<?,?>) collect).hasCombineFn()) {
+        combineFnTable = (DoTableImpl<?,?>) collect;
+      }
+      if (!nodes.containsKey(collect)) {
+        nodes.put(collect, collect.createDoNode());
+      }
+      DoNode parent = nodes.get(collect);
+      parent.addChild(working);
+      working = parent;
+    }
+    return working;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java b/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
new file mode 100644
index 0000000..af193f4
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.crunch.Target;
+import org.apache.crunch.io.MapReduceTarget;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.types.PType;
+import com.google.common.collect.Lists;
+
+public class MSCROutputHandler implements OutputHandler {
+
+  private final Job job;
+  private final Path path;
+  private final boolean mapOnlyJob;
+  
+  private DoNode workingNode;
+  private List<Path> multiPaths;
+  
+  public MSCROutputHandler(Job job, Path outputPath, boolean mapOnlyJob) {
+    this.job = job;
+    this.path = outputPath;
+    this.mapOnlyJob = mapOnlyJob;
+    this.multiPaths = Lists.newArrayList();
+  }
+  
+  public void configureNode(DoNode node, Target target) {
+    workingNode = node;
+    target.accept(this, node.getPType());
+  }
+  
+  public boolean configure(Target target, PType<?> ptype) {
+    if (target instanceof MapReduceTarget && target instanceof PathTarget) {
+      String name = PlanningParameters.MULTI_OUTPUT_PREFIX + multiPaths.size();
+      multiPaths.add(((PathTarget) target).getPath());
+      workingNode.setOutputName(name);
+      ((MapReduceTarget) target).configureForMapReduce(job, ptype, path, name);
+      return true;
+    }
+    if (target instanceof MapReduceTarget) {
+      ((MapReduceTarget) target).configureForMapReduce(job, ptype, null, null);
+      return true;
+    }
+    return false;
+  }
+
+  public boolean isMapOnlyJob() {
+    return mapOnlyJob;
+  }
+  
+  public List<Path> getMultiPaths() {
+    return multiPaths;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
new file mode 100644
index 0000000..29aeafd
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.crunch.Source;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.collect.DoCollectionImpl;
+import org.apache.crunch.impl.mr.collect.DoTableImpl;
+import org.apache.crunch.impl.mr.collect.InputCollection;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+import org.apache.crunch.impl.mr.collect.UnionCollection;
+import org.apache.crunch.impl.mr.exec.MRExecutor;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class MSCRPlanner {
+
+  // Used to ensure that we always build pipelines starting from the deepest outputs, which
+  // helps ensure that we handle intermediate outputs correctly.
+  private static final Comparator<PCollectionImpl<?>> DEPTH_COMPARATOR = new Comparator<PCollectionImpl<?>>() {
+    @Override
+    public int compare(PCollectionImpl<?> left, PCollectionImpl<?> right) {
+      int cmp = right.getDepth() - left.getDepth();   
+      if (cmp == 0){
+          // Ensure we don't throw away two output collections at the same depth.
+          // Using the collection name would be nicer here, but names aren't necessarily unique
+          cmp = new Integer(right.hashCode()).compareTo(left.hashCode());
+      }
+      return cmp;
+    }
+  };
+  
+  private final MRPipeline pipeline;
+  private final Map<PCollectionImpl<?>, Set<Target>> outputs;
+
+  public MSCRPlanner(MRPipeline pipeline,
+      Map<PCollectionImpl<?>, Set<Target>> outputs) {
+    this.pipeline = pipeline;
+    this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR);
+    this.outputs.putAll(outputs);
+  }
+
+  public MRExecutor plan(Class<?> jarClass, Configuration conf)
+      throws IOException {
+    // Constructs all of the node paths, which either start w/an input
+    // or a GBK and terminate in an output collection of any type.
+    NodeVisitor visitor = new NodeVisitor();
+    for (PCollectionImpl<?> output : outputs.keySet()) {
+      visitor.visitOutput(output);
+    }
+
+    // Pull out the node paths.
+    Map<PCollectionImpl<?>, Set<NodePath>> nodePaths = visitor.getNodePaths();
+
+    // Keeps track of the dependencies from collections -> jobs and then
+    // between different jobs.
+    Map<PCollectionImpl<?>, JobPrototype> assignments = Maps.newHashMap();
+    Map<PCollectionImpl<?>, Set<JobPrototype>> jobDependencies =
+        new HashMap<PCollectionImpl<?>, Set<JobPrototype>>();
+
+    // Find the set of GBKs that DO NOT depend on any other GBK.
+    Set<PGroupedTableImpl<?,?>> workingGroupings = null;
+    while (!(workingGroupings = getWorkingGroupings(nodePaths)).isEmpty()) {
+
+      for (PGroupedTableImpl<?,?> grouping : workingGroupings) {
+        Set<NodePath> mapInputPaths = nodePaths.get(grouping);
+        JobPrototype proto = JobPrototype.createMapReduceJob(grouping,
+            mapInputPaths, pipeline.createTempPath());
+        assignments.put(grouping, proto);
+        if (jobDependencies.containsKey(grouping)) {
+          for (JobPrototype dependency : jobDependencies.get(grouping)) {
+            proto.addDependency(dependency);
+          }
+        }
+      }
+
+      Map<PGroupedTableImpl<?,?>, Set<NodePath>> dependencyPaths = getDependencyPaths(
+          workingGroupings, nodePaths);
+      for (Map.Entry<PGroupedTableImpl<?,?>, Set<NodePath>> entry : dependencyPaths.entrySet()) {
+        PGroupedTableImpl<?,?> grouping = entry.getKey();
+        Set<NodePath> currentNodePaths = entry.getValue();
+
+        JobPrototype proto = assignments.get(grouping);
+        Set<NodePath> gbkPaths = Sets.newHashSet();
+        for (NodePath nodePath : currentNodePaths) {
+          PCollectionImpl<?> tail = nodePath.tail();
+          if (tail instanceof PGroupedTableImpl) {
+            gbkPaths.add(nodePath);
+            if (!jobDependencies.containsKey(tail)) {
+              jobDependencies.put(tail, Sets.<JobPrototype>newHashSet());
+            }
+            jobDependencies.get(tail).add(proto);
+          }
+        }
+
+        if (!gbkPaths.isEmpty()) {
+          handleGroupingDependencies(gbkPaths, currentNodePaths);
+        }
+
+        // At this point, all of the dependencies for the working groups will be
+        // file outputs, and so we can add them all to the JobPrototype-- we now have
+        // a complete job.
+        HashMultimap<Target, NodePath> reduceOutputs = HashMultimap.create();
+        for (NodePath nodePath : currentNodePaths) {
+          assignments.put(nodePath.tail(), proto);
+          for (Target target : outputs.get(nodePath.tail())) {
+            reduceOutputs.put(target, nodePath);
+          }
+        }
+        proto.addReducePaths(reduceOutputs);
+
+        // We've processed this GBK-- remove it from the set of nodePaths we
+        // need to process in the next step.
+        nodePaths.remove(grouping);
+      }
+    }
+
+    // Process any map-only jobs that are remaining.
+    if (!nodePaths.isEmpty()) {
+      for (Map.Entry<PCollectionImpl<?>, Set<NodePath>> entry : nodePaths
+          .entrySet()) {
+        PCollectionImpl<?> collect = entry.getKey();
+        if (!assignments.containsKey(collect)) {
+          HashMultimap<Target, NodePath> mapOutputs = HashMultimap.create();
+          for (NodePath nodePath : entry.getValue()) {
+            for (Target target : outputs.get(nodePath.tail())) {
+              mapOutputs.put(target, nodePath);
+            }
+          }
+          JobPrototype proto = JobPrototype.createMapOnlyJob(mapOutputs,
+              pipeline.createTempPath());
+          
+          if (jobDependencies.containsKey(collect)) {
+            for (JobPrototype dependency : jobDependencies.get(collect)) {
+              proto.addDependency(dependency);
+            }
+          }
+          assignments.put(collect, proto);
+        }
+      }
+    }
+
+    MRExecutor exec = new MRExecutor(jarClass);
+    for (JobPrototype proto : Sets.newHashSet(assignments.values())) {
+      exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline));
+    }
+    return exec;
+  }
+
+  private Map<PGroupedTableImpl<?,?>, Set<NodePath>> getDependencyPaths(
+      Set<PGroupedTableImpl<?,?>> workingGroupings,
+      Map<PCollectionImpl<?>, Set<NodePath>> nodePaths) {
+    Map<PGroupedTableImpl<?,?>, Set<NodePath>> dependencyPaths = Maps.newHashMap();
+    for (PGroupedTableImpl<?,?> grouping : workingGroupings) {
+      dependencyPaths.put(grouping, Sets.<NodePath> newHashSet());
+    }
+
+    // Find the targets that depend on one of the elements of the current
+    // working group.
+    for (PCollectionImpl<?> target : nodePaths.keySet()) {
+      if (!workingGroupings.contains(target)) {
+        for (NodePath nodePath : nodePaths.get(target)) {
+          if (workingGroupings.contains(nodePath.head())) {
+            dependencyPaths.get(nodePath.head()).add(nodePath);
+          }
+        }
+      }
+    }
+    return dependencyPaths;
+  }
+
+  private int getSplitIndex(Set<NodePath> currentNodePaths) {
+    List<Iterator<PCollectionImpl<?>>> iters = Lists.newArrayList();
+    for (NodePath nodePath : currentNodePaths) {
+      Iterator<PCollectionImpl<?>> iter = nodePath.iterator();
+      iter.next(); // prime this past the initial NGroupedTableImpl
+      iters.add(iter);
+    }
+
+    // Find the lowest point w/the lowest cost to be the split point for
+    // all of the dependent paths.
+    boolean end = false;
+    int splitIndex = -1;
+    while (!end) {
+      splitIndex++;
+      PCollectionImpl<?> current = null;
+      for (Iterator<PCollectionImpl<?>> iter : iters) {
+        if (iter.hasNext()) {
+          PCollectionImpl<?> next = iter.next();
+          if (next instanceof PGroupedTableImpl) {
+            end = true;
+            break;
+          } else if (current == null) {
+            current = next;
+          } else if (current != next) {
+            end = true;
+            break;
+          }
+        } else {
+          end = true;
+          break;
+        }
+      }
+    }
+    // TODO: Add costing calcs here.
+    return splitIndex;
+  }
+
+  private void handleGroupingDependencies(Set<NodePath> gbkPaths,
+      Set<NodePath> currentNodePaths) throws IOException {
+    int splitIndex = getSplitIndex(currentNodePaths);
+    PCollectionImpl<?> splitTarget = currentNodePaths.iterator().next()
+        .get(splitIndex);
+    if (!outputs.containsKey(splitTarget)) {
+      outputs.put(splitTarget, Sets.<Target>newHashSet());
+    }
+    
+    SourceTarget srcTarget = null;
+    Target targetToReplace = null;
+    for (Target t : outputs.get(splitTarget)) {
+      if (t instanceof SourceTarget) {
+        srcTarget = (SourceTarget<?>) t;
+        break;
+      } else {
+        srcTarget = t.asSourceTarget(splitTarget.getPType());
+        if (srcTarget != null) {
+          targetToReplace = t;
+          break;
+        }
+      }
+    }
+    if (targetToReplace != null) {
+      outputs.get(splitTarget).remove(targetToReplace);
+    } else if (srcTarget == null) {
+      srcTarget = pipeline.createIntermediateOutput(splitTarget.getPType());
+    }
+    outputs.get(splitTarget).add(srcTarget);
+    splitTarget.materializeAt(srcTarget);
+
+    PCollectionImpl<?> inputNode = (PCollectionImpl<?>) pipeline.read(srcTarget);
+    Set<NodePath> nextNodePaths = Sets.newHashSet();
+    for (NodePath nodePath : currentNodePaths) {
+      if (gbkPaths.contains(nodePath)) {
+    	nextNodePaths.add(nodePath.splitAt(splitIndex, inputNode));
+      } else {
+    	nextNodePaths.add(nodePath);
+      }
+    }
+    currentNodePaths.clear();
+    currentNodePaths.addAll(nextNodePaths);
+  }
+
+  private Set<PGroupedTableImpl<?,?>> getWorkingGroupings(
+      Map<PCollectionImpl<?>, Set<NodePath>> nodePaths) {
+    Set<PGroupedTableImpl<?,?>> gbks = Sets.newHashSet();
+    for (PCollectionImpl<?> target : nodePaths.keySet()) {
+      if (target instanceof PGroupedTableImpl) {
+        boolean hasGBKDependency = false;
+        for (NodePath nodePath : nodePaths.get(target)) {
+          if (nodePath.head() instanceof PGroupedTableImpl) {
+            hasGBKDependency = true;
+            break;
+          }
+        }
+        if (!hasGBKDependency) {
+          gbks.add((PGroupedTableImpl<?,?>) target);
+        }
+      }
+    }
+    return gbks;
+  }
+
+  private static class NodeVisitor implements PCollectionImpl.Visitor {
+
+    private final Map<PCollectionImpl<?>, Set<NodePath>> nodePaths;
+    private final Map<PCollectionImpl<?>, Source<?>> inputs;
+    private PCollectionImpl<?> workingNode;
+    private NodePath workingPath;
+
+    public NodeVisitor() {
+      this.nodePaths = new HashMap<PCollectionImpl<?>, Set<NodePath>>();
+      this.inputs = new HashMap<PCollectionImpl<?>, Source<?>>();
+    }
+
+    public Map<PCollectionImpl<?>, Set<NodePath>> getNodePaths() {
+      return nodePaths;
+    }
+
+    public void visitOutput(PCollectionImpl<?> output) {
+      nodePaths.put(output, Sets.<NodePath> newHashSet());
+      workingNode = output;
+      workingPath = new NodePath();
+      output.accept(this);
+    }
+
+    @Override
+    public void visitInputCollection(InputCollection<?> collection) {
+      workingPath.close(collection);
+      inputs.put(collection, collection.getSource());
+      nodePaths.get(workingNode).add(workingPath);
+    }
+
+    @Override
+    public void visitUnionCollection(UnionCollection<?> collection) {
+      PCollectionImpl<?> baseNode = workingNode;
+      NodePath basePath = workingPath;
+      for (PCollectionImpl<?> parent : collection.getParents()) {
+        workingPath = new NodePath(basePath);
+        workingNode = baseNode;
+        processParent(parent);
+      }
+    }
+
+    @Override
+    public void visitDoFnCollection(DoCollectionImpl<?> collection) {
+      workingPath.push(collection);
+      processParent(collection.getOnlyParent());
+    }
+
+    @Override
+    public void visitDoTable(DoTableImpl<?, ?> collection) {
+      workingPath.push(collection);
+      processParent(collection.getOnlyParent());
+    }
+
+    @Override
+    public void visitGroupedTable(PGroupedTableImpl<?, ?> collection) {
+      workingPath.close(collection);
+      nodePaths.get(workingNode).add(workingPath);
+      workingNode = collection;
+      nodePaths.put(workingNode, Sets.<NodePath> newHashSet());
+      workingPath = new NodePath(collection);
+      processParent(collection.getOnlyParent());
+    }
+
+    private void processParent(PCollectionImpl<?> parent) {
+      if (!nodePaths.containsKey(parent)) {
+        parent.accept(this);
+      } else {
+        workingPath.close(parent);
+        nodePaths.get(workingNode).add(workingPath);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java b/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
new file mode 100644
index 0000000..4ab30ca
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import com.google.common.collect.Lists;
+
+class NodePath implements Iterable<PCollectionImpl<?>> {
+  private LinkedList<PCollectionImpl<?>> path;
+
+  public NodePath() {
+    this.path = Lists.newLinkedList();
+  }
+
+  public NodePath(PCollectionImpl<?> tail) {
+    this.path = Lists.newLinkedList();
+    this.path.add(tail);
+  }
+
+  public NodePath(NodePath other) {
+    this.path = Lists.newLinkedList(other.path);
+  }
+
+  public void push(PCollectionImpl<?> stage) {
+    this.path.push((PCollectionImpl<?>) stage);
+  }
+
+  public void close(PCollectionImpl<?> head) {
+    this.path.push(head);
+  }
+
+  public Iterator<PCollectionImpl<?>> iterator() {
+    return path.iterator();
+  }
+
+  public Iterator<PCollectionImpl<?>> descendingIterator() {
+    return path.descendingIterator();
+  }
+
+  public PCollectionImpl<?> get(int index) {
+    return path.get(index);
+  }
+
+  public PCollectionImpl<?> head() {
+    return path.peekFirst();
+  }
+
+  public PCollectionImpl<?> tail() {
+    return path.peekLast();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !(other instanceof NodePath)) {
+      return false;
+    }
+    NodePath nodePath = (NodePath) other;
+    return path.equals(nodePath.path);
+  }
+  
+  @Override
+  public int hashCode() {
+    return 17 + 37 * path.hashCode();
+  }
+  
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    for (PCollectionImpl<?> collect : path) {
+      sb.append(collect.getName() + "|");
+    }
+    sb.deleteCharAt(sb.length() - 1);
+    return sb.toString();
+  }
+  
+  public NodePath splitAt(int splitIndex, PCollectionImpl<?> newHead) {
+    NodePath top = new NodePath();
+    for (int i = 0; i <= splitIndex; i++) {
+      top.path.add(path.get(i));
+    }
+    LinkedList<PCollectionImpl<?>> nextPath = Lists.newLinkedList();
+    nextPath.add(newHead);
+    nextPath.addAll(path.subList(splitIndex + 1, path.size()));
+    path = nextPath;
+    return top;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java b/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
new file mode 100644
index 0000000..b8e59a3
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+public class PlanningParameters {
+
+  public static final String MULTI_OUTPUT_PREFIX = "out";
+  
+  public static final String CRUNCH_WORKING_DIRECTORY = "crunch.work.dir";
+  
+  private PlanningParameters() {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java
new file mode 100644
index 0000000..fcfbc36
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+public class CrunchCombiner extends CrunchReducer {
+
+  @Override
+  protected NodeContext getNodeContext() {
+    return NodeContext.COMBINE;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
new file mode 100644
index 0000000..6b49735
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.apache.crunch.io.impl.InputBundle;
+import com.google.common.collect.Lists;
+
+public class CrunchInputFormat<K, V> extends InputFormat<K, V> {
+
+  @Override
+  public List<InputSplit> getSplits(JobContext job) throws IOException,
+      InterruptedException {
+    List<InputSplit> splits = Lists.newArrayList();
+    Configuration conf = job.getConfiguration();
+    Map<InputBundle, Map<Integer, List<Path>>> formatNodeMap = CrunchInputs.getFormatNodeMap(job);
+
+    // First, build a map of InputFormats to Paths
+    for (Map.Entry<InputBundle, Map<Integer, List<Path>>> entry : formatNodeMap.entrySet()) {
+      InputBundle inputBundle = entry.getKey();
+      Job jobCopy = new Job(conf);
+      InputFormat<?,?> format = (InputFormat<?,?>) ReflectionUtils.newInstance(
+          inputBundle.getInputFormatClass(), jobCopy.getConfiguration());
+      for (Map.Entry<Integer, List<Path>> nodeEntry : entry.getValue()
+          .entrySet()) {
+        Integer nodeIndex = nodeEntry.getKey();
+        List<Path> paths = nodeEntry.getValue();
+        FileInputFormat.setInputPaths(jobCopy, paths.toArray(new Path[paths.size()]));
+
+        // Get splits for each input path and tag with InputFormat
+        // and Mapper types by wrapping in a TaggedInputSplit.
+        List<InputSplit> pathSplits = format.getSplits(jobCopy);
+        for (InputSplit pathSplit : pathSplits) {
+          splits.add(new CrunchInputSplit(pathSplit, inputBundle.getInputFormatClass(),
+              inputBundle.getExtraConfiguration(), nodeIndex, jobCopy.getConfiguration()));
+        }
+      }
+    }
+    return splits;
+  }
+
+  @Override
+  public RecordReader<K, V> createRecordReader(InputSplit inputSplit,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    return new CrunchRecordReader<K,V>(inputSplit, context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
new file mode 100644
index 0000000..b57ca58
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class CrunchInputSplit extends InputSplit implements Configurable, Writable {
+
+  private InputSplit inputSplit;
+  private Class<? extends InputFormat> inputFormatClass;
+  private Map<String, String> extraConf;
+  private int nodeIndex;
+  private Configuration conf;
+
+  public CrunchInputSplit() {
+    // default constructor
+  }
+
+  public CrunchInputSplit(InputSplit inputSplit,
+      Class<? extends InputFormat> inputFormatClass, Map<String, String> extraConf,
+      int nodeIndex, Configuration conf) {
+    this.inputSplit = inputSplit;
+    this.inputFormatClass = inputFormatClass;
+    this.extraConf = extraConf;
+    this.nodeIndex = nodeIndex;
+    this.conf = conf;
+  }
+
+  public int getNodeIndex() {
+    return nodeIndex;
+  }
+
+  public InputSplit getInputSplit() {
+    return inputSplit;
+  }
+
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return inputFormatClass;
+  }
+
+  @Override
+  public long getLength() throws IOException, InterruptedException {
+    return inputSplit.getLength();
+  }
+
+  @Override
+  public String[] getLocations() throws IOException, InterruptedException {
+    return inputSplit.getLocations();
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    nodeIndex = in.readInt();
+    int extraConfSize = in.readInt();
+    if (extraConfSize > 0) {
+      for (int i = 0; i < extraConfSize; i++) {
+        conf.set(in.readUTF(), in.readUTF());
+      }
+    }
+    inputFormatClass = (Class<? extends InputFormat<?,?>>) readClass(in);
+    Class<? extends InputSplit> inputSplitClass = (Class<? extends InputSplit>) readClass(in);
+    inputSplit = (InputSplit) ReflectionUtils
+        .newInstance(inputSplitClass, conf);
+    SerializationFactory factory = new SerializationFactory(conf);
+    Deserializer deserializer = factory.getDeserializer(inputSplitClass);
+    deserializer.open((DataInputStream) in);
+    inputSplit = (InputSplit) deserializer.deserialize(inputSplit);
+  }
+
+  private Class<?> readClass(DataInput in) throws IOException {
+    String className = Text.readString(in);
+    try {
+      return conf.getClassByName(className);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException("readObject can't find class", e);
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(nodeIndex);
+    out.writeInt(extraConf.size());
+    for (Map.Entry<String, String> e : extraConf.entrySet()) {
+      out.writeUTF(e.getKey());
+      out.writeUTF(e.getValue());
+    }
+    Text.writeString(out, inputFormatClass.getName());
+    Text.writeString(out, inputSplit.getClass().getName());
+    SerializationFactory factory = new SerializationFactory(conf);
+    Serializer serializer = factory.getSerializer(inputSplit.getClass());
+    serializer.open((DataOutputStream) out);
+    serializer.serialize(inputSplit);
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java
new file mode 100644
index 0000000..8fa1d56
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import org.apache.crunch.io.impl.InputBundle;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class CrunchInputs {
+
+  private static final char RECORD_SEP = ',';
+  private static final char FIELD_SEP = ';';
+  private static final Joiner JOINER = Joiner.on(FIELD_SEP);
+  private static final Splitter SPLITTER = Splitter.on(FIELD_SEP);
+  
+  public static void addInputPath(Job job, Path path,
+      InputBundle inputBundle, int nodeIndex) {
+    Configuration conf = job.getConfiguration();
+    String inputs = JOINER.join(inputBundle.serialize(), String.valueOf(nodeIndex), path.toString());
+    String existing = conf.get(RuntimeParameters.MULTI_INPUTS);
+    conf.set(RuntimeParameters.MULTI_INPUTS, existing == null ? inputs : existing + RECORD_SEP
+        + inputs);
+  }
+
+  public static Map<InputBundle, Map<Integer, List<Path>>> getFormatNodeMap(
+      JobContext job) {
+    Map<InputBundle, Map<Integer, List<Path>>> formatNodeMap = Maps.newHashMap();
+    Configuration conf = job.getConfiguration();
+    for (String input : Splitter.on(RECORD_SEP).split(conf.get(RuntimeParameters.MULTI_INPUTS))) {
+      List<String> fields = Lists.newArrayList(SPLITTER.split(input));
+      InputBundle inputBundle = InputBundle.fromSerialized(fields.get(0));
+      if (!formatNodeMap.containsKey(inputBundle)) {
+        formatNodeMap.put(inputBundle, Maps.<Integer, List<Path>> newHashMap());
+      }
+      Integer nodeIndex = Integer.valueOf(fields.get(1));
+      if (!formatNodeMap.get(inputBundle).containsKey(nodeIndex)) {
+        formatNodeMap.get(inputBundle).put(nodeIndex,
+            Lists.<Path> newLinkedList());
+      }
+      formatNodeMap.get(inputBundle).get(nodeIndex)
+          .add(new Path(fields.get(2)));
+    }
+    return formatNodeMap;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
new file mode 100644
index 0000000..814c8c3
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class CrunchMapper extends Mapper<Object, Object, Object, Object> {
+
+  private static final Log LOG = LogFactory.getLog(CrunchMapper.class);
+  
+  private RTNode node;
+  private CrunchTaskContext ctxt;
+  private boolean debug;
+  
+  @Override
+  protected void setup(Mapper<Object, Object, Object, Object>.Context context) {
+    List<RTNode> nodes;
+    this.ctxt = new CrunchTaskContext(context, NodeContext.MAP);
+    try {
+      nodes = ctxt.getNodes();
+    } catch (IOException e) {
+      LOG.info("Crunch deserialization error", e);
+      throw new CrunchRuntimeException(e);
+    }
+    if (nodes.size() == 1) {
+      this.node = nodes.get(0);
+    } else {
+      CrunchInputSplit split = (CrunchInputSplit) context.getInputSplit();
+      this.node = nodes.get(split.getNodeIndex());
+    }
+    this.debug = ctxt.isDebugRun();
+  }
+
+  @Override
+  protected void map(Object k, Object v,
+      Mapper<Object, Object, Object, Object>.Context context) {
+    if (debug) {
+      try {
+        node.process(k, v);
+      } catch (Exception e) {
+        LOG.error("Mapper exception", e);
+      }
+    } else {
+      node.process(k, v);
+    }
+  }
+
+  @Override
+  protected void cleanup(Mapper<Object, Object, Object, Object>.Context context) {
+    node.cleanup();
+    ctxt.cleanup();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
new file mode 100644
index 0000000..5967aa9
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+class CrunchRecordReader<K, V> extends RecordReader<K, V> {
+
+  private final RecordReader<K, V> delegate;
+
+  public CrunchRecordReader(InputSplit inputSplit, final TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
+    InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
+        .newInstance(crunchSplit.getInputFormatClass(), crunchSplit.getConf());
+    this.delegate = inputFormat.createRecordReader(
+        crunchSplit.getInputSplit(), TaskAttemptContextFactory.create(
+            crunchSplit.getConf(), context.getTaskAttemptID()));
+  }
+
+  @Override
+  public void close() throws IOException {
+    delegate.close();
+  }
+
+  @Override
+  public K getCurrentKey() throws IOException, InterruptedException {
+    return delegate.getCurrentKey();
+  }
+
+  @Override
+  public V getCurrentValue() throws IOException, InterruptedException {
+    return delegate.getCurrentValue();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return delegate.getProgress();
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
+    InputSplit delegateSplit = crunchSplit.getInputSplit();
+    delegate.initialize(delegateSplit, TaskAttemptContextFactory.create(
+        crunchSplit.getConf(), context.getTaskAttemptID()));
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    return delegate.nextKeyValue();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
new file mode 100644
index 0000000..aa5fc95
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Reducer;
+
+public class CrunchReducer extends Reducer<Object, Object, Object, Object> {
+
+  private static final Log LOG = LogFactory.getLog(CrunchReducer.class);
+  
+  private RTNode node;
+  private CrunchTaskContext ctxt;
+  private boolean debug;
+  
+  protected NodeContext getNodeContext() {
+    return NodeContext.REDUCE;
+  }
+  
+  @Override
+  protected void setup(Reducer<Object, Object, Object, Object>.Context context) {
+    this.ctxt = new CrunchTaskContext(context, getNodeContext());
+    try {
+      List<RTNode> nodes = ctxt.getNodes();
+      this.node = nodes.get(0);
+    } catch (IOException e) {
+      LOG.info("Crunch deserialization error", e);
+      throw new CrunchRuntimeException(e);
+    }
+    this.debug = ctxt.isDebugRun();
+  }
+
+  @Override
+  protected void reduce(Object key, Iterable<Object> values,
+      Reducer<Object, Object, Object, Object>.Context context) {
+    if (debug) {
+      try {
+        node.processIterable(key, values);
+      } catch (Exception e) {
+        LOG.error("Reducer exception", e);
+      }
+    } else {
+      node.processIterable(key, values);
+    }
+  }
+
+  @Override
+  protected void cleanup(Reducer<Object, Object, Object, Object>.Context context) {
+    node.cleanup();
+    ctxt.cleanup();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java
new file mode 100644
index 0000000..9eac51c
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+public class CrunchRuntimeException extends RuntimeException {
+
+  private boolean logged = false;
+  
+  public CrunchRuntimeException(String msg) {
+    super(msg);
+  }
+  
+  public CrunchRuntimeException(Exception e) {
+    super(e);
+  }
+  
+  public CrunchRuntimeException(String msg, Exception e) {
+    super(msg, e);
+  }
+
+  public boolean wasLogged() {
+    return logged;
+  }
+  
+  public void markLogged() {
+    this.logged = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java b/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
new file mode 100644
index 0000000..dd04813
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
+
+import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.util.DistCache;
+
+public class CrunchTaskContext {
+
+  private final TaskInputOutputContext<Object, Object, Object, Object> taskContext;
+  private final NodeContext nodeContext;
+  private CrunchMultipleOutputs<Object, Object> multipleOutputs;
+
+  public CrunchTaskContext(
+      TaskInputOutputContext<Object, Object, Object, Object> taskContext,
+      NodeContext nodeContext) {
+    this.taskContext = taskContext;
+    this.nodeContext = nodeContext;
+  }
+
+  public TaskInputOutputContext<Object, Object, Object, Object> getContext() {
+    return taskContext;
+  }
+
+  public NodeContext getNodeContext() {
+    return nodeContext;
+  }
+
+  public List<RTNode> getNodes() throws IOException {
+    Configuration conf = taskContext.getConfiguration();
+    Path path = new Path(new Path(conf.get(PlanningParameters.CRUNCH_WORKING_DIRECTORY)), nodeContext.toString());
+    @SuppressWarnings("unchecked")
+    List<RTNode> nodes = (List<RTNode>) DistCache.read(conf, path);
+    if (nodes != null) {
+      for (RTNode node : nodes) {
+        node.initialize(this);
+      }
+    }
+    return nodes;
+  }
+  
+  public boolean isDebugRun() {
+    Configuration conf = taskContext.getConfiguration();
+    return conf.getBoolean(RuntimeParameters.DEBUG, false);
+  }
+  
+  public void cleanup() {
+    if (multipleOutputs != null) {
+      try {
+        multipleOutputs.close();
+      } catch (IOException e) {
+        throw new CrunchRuntimeException(e);
+      } catch (InterruptedException e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+  }
+
+  public CrunchMultipleOutputs<Object, Object> getMultipleOutputs() {
+    if (multipleOutputs == null) {
+      multipleOutputs = new CrunchMultipleOutputs<Object, Object>(taskContext);
+    }
+    return multipleOutputs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java b/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
new file mode 100644
index 0000000..648e87c
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import org.apache.crunch.impl.mr.plan.DoNode;
+
+/**
+ * Enum that is associated with a serialized {@link DoNode} instance, so we know
+ * how to use it within the context of a particular MR job.
+ * 
+ */
+public enum NodeContext {
+  MAP, REDUCE, COMBINE;
+
+  public String getConfigurationKey() {
+    return "crunch.donode." + toString().toLowerCase();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
new file mode 100644
index 0000000..4debfc1
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.impl.mr.emit.IntermediateEmitter;
+import org.apache.crunch.impl.mr.emit.MultipleOutputEmitter;
+import org.apache.crunch.impl.mr.emit.OutputEmitter;
+import org.apache.crunch.types.Converter;
+
+public class RTNode implements Serializable {
+  
+  private static final Log LOG = LogFactory.getLog(RTNode.class);
+  
+  private final String nodeName;
+  private DoFn<Object, Object> fn;
+  private final List<RTNode> children;
+  private final Converter inputConverter;
+  private final Converter outputConverter;
+  private final String outputName;
+
+  private transient Emitter<Object> emitter;
+
+  public RTNode(DoFn<Object, Object> fn, String name, List<RTNode> children,
+      Converter inputConverter, Converter outputConverter, String outputName) {
+    this.fn = fn;
+    this.nodeName = name;
+    this.children = children;
+    this.inputConverter = inputConverter;
+    this.outputConverter = outputConverter;
+    this.outputName = outputName;
+  }
+
+  public void initialize(CrunchTaskContext ctxt) {
+    if (emitter != null) {
+      // Already initialized
+      return;
+    }
+    
+    fn.setContext(ctxt.getContext());
+    for (RTNode child : children) {
+      child.initialize(ctxt);
+    }
+
+    if (outputConverter != null) {
+      if (outputName != null) {
+        this.emitter = new MultipleOutputEmitter(
+            outputConverter, ctxt.getMultipleOutputs(), outputName);
+      } else {
+        this.emitter = new OutputEmitter(
+            outputConverter, ctxt.getContext());
+      }
+    } else if (!children.isEmpty()) {
+      this.emitter = new IntermediateEmitter(children);
+    } else {
+      throw new CrunchRuntimeException("Invalid RTNode config: no emitter for: " + nodeName);
+    }
+  }
+
+  public boolean isLeafNode() {
+    return outputConverter != null && children.isEmpty();
+  }
+
+  public void process(Object input) {
+    try {
+      fn.process(input, emitter);
+    } catch (CrunchRuntimeException e) {
+      if (!e.wasLogged()) {
+        LOG.info(String.format("Crunch exception in '%s' for input: %s",
+            nodeName, input.toString()), e);
+        e.markLogged();
+      }
+      throw e;
+    }
+  }
+
+  public void process(Object key, Object value) {
+    process(inputConverter.convertInput(key, value));
+  }
+
+  public void processIterable(Object key, Iterable values) {
+    process(inputConverter.convertIterableInput(key, values));
+  }
+  
+  public void cleanup() {
+    fn.cleanup(emitter);
+    emitter.flush();
+    for (RTNode child : children) {
+      child.cleanup();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "RTNode [nodeName=" + nodeName + ", fn=" + fn + ", children="
+        + children + ", inputConverter=" + inputConverter
+        + ", outputConverter=" + outputConverter + ", outputName=" + outputName
+        + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
new file mode 100644
index 0000000..5e534eb
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+/**
+ * Parameters used during the runtime execution.
+ *
+ */
+public class RuntimeParameters {
+
+  public static final String AGGREGATOR_BUCKETS = "crunch.aggregator.buckets";
+  
+  public static final String MULTI_INPUTS = "crunch.inputs.dir";
+
+  public static final String DEBUG = "crunch.debug";
+  
+  // Not instantiated
+  private RuntimeParameters() {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java b/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java
new file mode 100644
index 0000000..2cfa615
--- /dev/null
+++ b/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.run;
+
+import java.lang.reflect.Constructor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class TaskAttemptContextFactory {
+
+  private static final Log LOG = LogFactory.getLog(TaskAttemptContextFactory.class);
+
+  private static final TaskAttemptContextFactory INSTANCE = new TaskAttemptContextFactory();
+  
+  public static TaskAttemptContext create(Configuration conf, TaskAttemptID taskAttemptId) {
+    return INSTANCE.createInternal(conf, taskAttemptId);
+  }
+  
+  private Constructor taskAttemptConstructor;
+  
+  private TaskAttemptContextFactory() {
+    Class implClass = TaskAttemptContext.class;
+    if (implClass.isInterface()) {
+      try {
+        implClass = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
+      } catch (ClassNotFoundException e) {
+        LOG.fatal("Could not find TaskAttemptContextImpl class, exiting", e);
+      }
+    }
+    try {
+      this.taskAttemptConstructor = implClass.getConstructor(Configuration.class, TaskAttemptID.class);
+    } catch (Exception e) {
+      LOG.fatal("Could not access TaskAttemptContext constructor, exiting", e);
+    }
+  }
+  
+  private TaskAttemptContext createInternal(Configuration conf, TaskAttemptID taskAttemptId) {
+    try {
+      return (TaskAttemptContext) taskAttemptConstructor.newInstance(conf, taskAttemptId);
+    } catch (Exception e) {
+      LOG.error("Could not construct a TaskAttemptContext instance", e);
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/At.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/At.java b/src/main/java/org/apache/crunch/io/At.java
new file mode 100644
index 0000000..2f5fe8b
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/At.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Scan;
+
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.avro.AvroFileSourceTarget;
+import org.apache.crunch.io.hbase.HBaseSourceTarget;
+import org.apache.crunch.io.seq.SeqFileSourceTarget;
+import org.apache.crunch.io.seq.SeqFileTableSourceTarget;
+import org.apache.crunch.io.text.TextFileSourceTarget;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.writable.Writables;
+
+/**
+ * Static factory methods for creating various {@link SourceTarget} types.
+ *
+ */
+public class At {
+  public static <T> AvroFileSourceTarget<T> avroFile(String pathName, AvroType<T> avroType) {
+	return avroFile(new Path(pathName), avroType);
+  }
+  
+  public static <T> AvroFileSourceTarget<T> avroFile(Path path, AvroType<T> avroType) {
+	return new AvroFileSourceTarget<T>(path, avroType);
+  }
+  
+  public static HBaseSourceTarget hbaseTable(String table) {
+	return hbaseTable(table, new Scan());
+  }
+  
+  public static HBaseSourceTarget hbaseTable(String table, Scan scan) {
+	return new HBaseSourceTarget(table, scan);
+  }
+  
+  public static <T> SeqFileSourceTarget<T> sequenceFile(String pathName, PType<T> ptype) {
+	return sequenceFile(new Path(pathName), ptype);
+  }
+  
+  public static <T> SeqFileSourceTarget<T> sequenceFile(Path path, PType<T> ptype) {
+	return new SeqFileSourceTarget<T>(path, ptype);
+  }
+  
+  public static <K, V> SeqFileTableSourceTarget<K, V> sequenceFile(String pathName, PType<K> keyType,
+      PType<V> valueType) {
+	return sequenceFile(new Path(pathName), keyType, valueType);
+  }
+  
+  public static <K, V> SeqFileTableSourceTarget<K, V> sequenceFile(Path path, PType<K> keyType,
+      PType<V> valueType) {
+	PTypeFamily ptf = keyType.getFamily();
+	return new SeqFileTableSourceTarget<K, V>(path, ptf.tableOf(keyType, valueType));
+  }
+  
+  public static TextFileSourceTarget<String> textFile(String pathName) {
+	return textFile(new Path(pathName));
+  }
+  
+  public static TextFileSourceTarget<String> textFile(Path path) {
+	return textFile(path, Writables.strings());
+  }
+  
+  public static <T> TextFileSourceTarget<T> textFile(String pathName, PType<T> ptype) {
+    return textFile(new Path(pathName), ptype);
+  }
+  
+  public static <T> TextFileSourceTarget<T> textFile(Path path, PType<T> ptype) {
+    return new TextFileSourceTarget<T>(path, ptype);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/CompositePathIterable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/CompositePathIterable.java b/src/main/java/org/apache/crunch/io/CompositePathIterable.java
new file mode 100644
index 0000000..0e4014a
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/CompositePathIterable.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+public class CompositePathIterable<T> implements Iterable<T> {
+
+  private final FileStatus[] stati;
+  private final FileSystem fs;
+  private final FileReaderFactory<T> readerFactory;
+
+  private static final PathFilter FILTER = new PathFilter() {
+	@Override
+	public boolean accept(Path path) {
+	  return !path.getName().startsWith("_");
+	}
+  };
+  
+  public static <S> Iterable<S> create(FileSystem fs, Path path, FileReaderFactory<S> readerFactory) throws IOException {
+    
+    if (!fs.exists(path)){
+      throw new IOException("No files found to materialize at: " + path);
+    }
+    
+    FileStatus[] stati = null;
+    try {
+      stati = fs.listStatus(path, FILTER);
+    } catch (FileNotFoundException e) {
+      stati = null;
+    }
+    if (stati == null) {
+      throw new IOException("No files found to materialize at: " + path);
+    }
+
+    if (stati.length == 0) {
+      return Collections.emptyList();
+    } else {
+      return new CompositePathIterable<S>(stati, fs, readerFactory);
+    }
+
+  }
+  
+  private CompositePathIterable(FileStatus[] stati, FileSystem fs, FileReaderFactory<T> readerFactory) {
+	this.stati = stati;
+	this.fs = fs;
+	this.readerFactory = readerFactory;
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+
+	return new UnmodifiableIterator<T>() {
+	  private int index = 0;
+	  private Iterator<T> iter = readerFactory.read(fs, stati[index++].getPath());
+	  
+	  @Override
+	  public boolean hasNext() {
+		if (!iter.hasNext()) {
+		  while (index < stati.length) {
+       	    iter = readerFactory.read(fs, stati[index++].getPath());
+			if (iter.hasNext()) {
+			  return true;
+			}
+		  }
+		  return false;
+		}
+		return true;
+	  }
+
+	  @Override
+	  public T next() {
+		return iter.next();
+	  }
+	};
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/FileReaderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/FileReaderFactory.java b/src/main/java/org/apache/crunch/io/FileReaderFactory.java
new file mode 100644
index 0000000..5cccb7b
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/FileReaderFactory.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public interface FileReaderFactory<T> {
+  Iterator<T> read(FileSystem fs, Path path);
+}


Mime
View raw message