crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [33/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
Date Tue, 23 Apr 2013 20:41:35 GMT
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
new file mode 100644
index 0000000..6ea9c4c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.FilterFn;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PObject;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
+import org.apache.crunch.fn.ExtractKeyFn;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.plan.DoNode;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.materialize.pobject.CollectionPObject;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public abstract class PCollectionImpl<S> implements PCollection<S> {
+
+  private static final Log LOG = LogFactory.getLog(PCollectionImpl.class);
+
+  private final String name;
+  protected MRPipeline pipeline;
+  protected SourceTarget<S> materializedAt;
+  private final ParallelDoOptions options;
+  
+  public PCollectionImpl(String name) {
+    this(name, ParallelDoOptions.builder().build());
+  }
+  
+  public PCollectionImpl(String name, ParallelDoOptions options) {
+    this.name = name;
+    this.options = options;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public String toString() {
+    return getName();
+  }
+
+  @Override
+  public PCollection<S> union(PCollection<S> other) {
+    return union(new PCollection[] { other });
+  }
+  
+  @Override
+  public PCollection<S> union(PCollection<S>... collections) {
+    List<PCollectionImpl<S>> internal = Lists.newArrayList();
+    internal.add(this);
+    for (PCollection<S> collection : collections) {
+      internal.add((PCollectionImpl<S>) collection.parallelDo(IdentityFn.<S>getInstance(), collection.getPType()));
+    }
+    return new UnionCollection<S>(internal);
+  }
+
+  @Override
+  public <T> PCollection<T> parallelDo(DoFn<S, T> fn, PType<T> type) {
+    MRPipeline pipeline = (MRPipeline) getPipeline();
+    return parallelDo("S" + pipeline.getNextAnonymousStageId(), fn, type);
+  }
+
+  @Override
+  public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type) {
+    return new DoCollectionImpl<T>(name, getChainingCollection(), fn, type);
+  }
+  
+  @Override
+  public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type,
+      ParallelDoOptions options) {
+    return new DoCollectionImpl<T>(name, getChainingCollection(), fn, type, options);
+  }
+  
+  @Override
+  public <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
+    MRPipeline pipeline = (MRPipeline) getPipeline();
+    return parallelDo("S" + pipeline.getNextAnonymousStageId(), fn, type);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
+    return new DoTableImpl<K, V>(name, getChainingCollection(), fn, type);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> fn, PTableType<K, V> type,
+      ParallelDoOptions options) {
+    return new DoTableImpl<K, V>(name, getChainingCollection(), fn, type, options);
+  }
+
+  public PCollection<S> write(Target target) {
+    if (materializedAt != null) {
+      getPipeline().write(new InputCollection<S>(materializedAt, (MRPipeline) getPipeline()), target);
+    } else {
+      getPipeline().write(this, target);
+    }
+    return this;
+  }
+
+  @Override
+  public PCollection<S> write(Target target, Target.WriteMode writeMode) {
+    if (materializedAt != null) {
+      getPipeline().write(new InputCollection<S>(materializedAt, (MRPipeline) getPipeline()), target,
+          writeMode);
+    } else {
+      getPipeline().write(this, target, writeMode);
+    }
+    return this;
+  }
+  
+  @Override
+  public Iterable<S> materialize() {
+    if (getSize() == 0) {
+      LOG.warn("Materializing an empty PCollection: " + this.getName());
+      return Collections.emptyList();
+    }
+    return getPipeline().materialize(this);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public PObject<Collection<S>> asCollection() {
+    return new CollectionPObject<S>(this);
+  }
+
+  public SourceTarget<S> getMaterializedAt() {
+    return materializedAt;
+  }
+
+  public void materializeAt(SourceTarget<S> sourceTarget) {
+    this.materializedAt = sourceTarget;
+  }
+
+  @Override
+  public PCollection<S> filter(FilterFn<S> filterFn) {
+    return parallelDo(filterFn, getPType());
+  }
+
+  @Override
+  public PCollection<S> filter(String name, FilterFn<S> filterFn) {
+    return parallelDo(name, filterFn, getPType());
+  }
+
+  @Override
+  public <K> PTable<K, S> by(MapFn<S, K> mapFn, PType<K> keyType) {
+    return parallelDo(new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
+  }
+
+  @Override
+  public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) {
+    return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
+  }
+
+  @Override
+  public PTable<S, Long> count() {
+    return Aggregate.count(this);
+  }
+
+  @Override
+  public PObject<Long> length() {
+    return Aggregate.length(this);
+  }
+
+  @Override
+  public PObject<S> max() {
+    return Aggregate.max(this);
+  }
+
+  @Override
+  public PObject<S> min() {
+    return Aggregate.min(this);
+  }
+
+  @Override
+  public PTypeFamily getTypeFamily() {
+    return getPType().getFamily();
+  }
+
+  public abstract DoNode createDoNode();
+
+  public abstract List<PCollectionImpl<?>> getParents();
+
+  public PCollectionImpl<?> getOnlyParent() {
+    List<PCollectionImpl<?>> parents = getParents();
+    if (parents.size() != 1) {
+      throw new IllegalArgumentException("Expected exactly one parent PCollection");
+    }
+    return parents.get(0);
+  }
+
+  @Override
+  public Pipeline getPipeline() {
+    if (pipeline == null) {
+      pipeline = (MRPipeline) getParents().get(0).getPipeline();
+    }
+    return pipeline;
+  }
+  
+  public Set<SourceTarget<?>> getTargetDependencies() {
+    Set<SourceTarget<?>> targetDeps = options.getSourceTargets();
+    for (PCollectionImpl<?> parent : getParents()) {
+      targetDeps = Sets.union(targetDeps, parent.getTargetDependencies());
+    }
+    return targetDeps;
+  }
+  
+  public int getDepth() {
+    int parentMax = 0;
+    for (PCollectionImpl parent : getParents()) {
+      parentMax = Math.max(parent.getDepth(), parentMax);
+    }
+    return 1 + parentMax;
+  }
+
+  public interface Visitor {
+    void visitInputCollection(InputCollection<?> collection);
+
+    void visitUnionCollection(UnionCollection<?> collection);
+
+    void visitDoFnCollection(DoCollectionImpl<?> collection);
+
+    void visitDoTable(DoTableImpl<?, ?> collection);
+
+    void visitGroupedTable(PGroupedTableImpl<?, ?> collection);
+  }
+
+  public void accept(Visitor visitor) {
+    if (materializedAt != null) {
+      visitor.visitInputCollection(new InputCollection<S>(materializedAt, (MRPipeline) getPipeline()));
+    } else {
+      acceptInternal(visitor);
+    }
+  }
+
+  protected abstract void acceptInternal(Visitor visitor);
+
+  @Override
+  public long getSize() {
+    if (materializedAt != null) {
+      long sz = materializedAt.getSize(getPipeline().getConfiguration());
+      if (sz > 0) {
+        return sz;
+      }
+    }
+    return getSizeInternal();
+  }
+
+  protected abstract long getSizeInternal();
+  
+  /**
+   * Retrieve the PCollectionImpl to be used for chaining within PCollectionImpls further down the pipeline.
+   * @return The PCollectionImpl instance to be chained
+   */
+  protected PCollectionImpl<S> getChainingCollection(){
+    return this;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
new file mode 100644
index 0000000..ccac5d5
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.Aggregator;
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.impl.mr.plan.DoNode;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.util.PartitionUtils;
+import org.apache.hadoop.mapreduce.Job;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>>> implements PGroupedTable<K, V> {
+
+  private static final Log LOG = LogFactory.getLog(PGroupedTableImpl.class);
+
+  private final PTableBase<K, V> parent;
+  private final GroupingOptions groupingOptions;
+  private final PGroupedTableType<K, V> ptype;
+  
+  PGroupedTableImpl(PTableBase<K, V> parent) {
+    this(parent, null);
+  }
+
+  PGroupedTableImpl(PTableBase<K, V> parent, GroupingOptions groupingOptions) {
+    super("GBK");
+    this.parent = parent;
+    this.groupingOptions = groupingOptions;
+    this.ptype = parent.getPTableType().getGroupedTableType();
+  }
+
+  public void configureShuffle(Job job) {
+    ptype.configureShuffle(job, groupingOptions);
+    if (groupingOptions == null || groupingOptions.getNumReducers() <= 0) {
+      int numReduceTasks = PartitionUtils.getRecommendedPartitions(this, getPipeline().getConfiguration());
+      if (numReduceTasks > 0) {
+        job.setNumReduceTasks(numReduceTasks);
+        LOG.info(String.format("Setting num reduce tasks to %d", numReduceTasks));
+      } else {
+        LOG.warn("Attempted to set a negative number of reduce tasks");
+      }
+    }
+  }
+
+  @Override
+  protected long getSizeInternal() {
+    return parent.getSizeInternal();
+  }
+
+  @Override
+  public PType<Pair<K, Iterable<V>>> getPType() {
+    return ptype;
+  }
+
+  @Override
+  public PTable<K, V> combineValues(CombineFn<K, V> combineFn) {
+    return new DoTableImpl<K, V>("combine", getChainingCollection(), combineFn, parent.getPTableType());
+  }
+
+  @Override
+  public PTable<K, V> combineValues(Aggregator<V> agg) {
+    return combineValues(Aggregators.<K, V>toCombineFn(agg));
+  }
+
+  private static class Ungroup<K, V> extends DoFn<Pair<K, Iterable<V>>, Pair<K, V>> {
+    @Override
+    public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
+      for (V v : input.second()) {
+        emitter.emit(Pair.of(input.first(), v));
+      }
+    }
+  }
+
+  public PTable<K, V> ungroup() {
+    return parallelDo("ungroup", new Ungroup<K, V>(), parent.getPTableType());
+  }
+
+  @Override
+  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
+    visitor.visitGroupedTable(this);
+  }
+
+  @Override
+  public Set<SourceTarget<?>> getTargetDependencies() {
+    Set<SourceTarget<?>> td = Sets.newHashSet(super.getTargetDependencies());
+    if (groupingOptions != null) {
+      td.addAll(groupingOptions.getSourceTargets());
+    }
+    return ImmutableSet.copyOf(td);
+  }
+  
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.<PCollectionImpl<?>> of(parent);
+  }
+
+  @Override
+  public DoNode createDoNode() {
+    return DoNode.createFnNode(getName(), ptype.getInputMapFn(), ptype);
+  }
+
+  public DoNode getGroupingNode() {
+    return DoNode.createGroupingNode("", ptype);
+  }
+  
+  @Override
+  protected PCollectionImpl<Pair<K, Iterable<V>>> getChainingCollection() {
+    // Use a copy for chaining to allow sending the output of a single grouped table to multiple outputs
+    // TODO This should be implemented in a cleaner way in the planner
+    return new PGroupedTableImpl<K, V>(parent, groupingOptions);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
new file mode 100644
index 0000000..3c2393d
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.crunch.FilterFn;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PObject;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.lib.Cogroup;
+import org.apache.crunch.lib.Join;
+import org.apache.crunch.lib.PTables;
+import org.apache.crunch.materialize.MaterializableMap;
+import org.apache.crunch.materialize.pobject.MapPObject;
+import org.apache.crunch.types.PType;
+
+import com.google.common.collect.Lists;
+
+abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> implements PTable<K, V> {
+
+  public PTableBase(String name) {
+    super(name);
+  }
+
+  public PTableBase(String name, ParallelDoOptions options) {
+    super(name, options);
+  }
+  
+  public PType<K> getKeyType() {
+    return getPTableType().getKeyType();
+  }
+
+  public PType<V> getValueType() {
+    return getPTableType().getValueType();
+  }
+
+  public PGroupedTableImpl<K, V> groupByKey() {
+    return new PGroupedTableImpl<K, V>(this);
+  }
+
+  public PGroupedTableImpl<K, V> groupByKey(int numReduceTasks) {
+    return new PGroupedTableImpl<K, V>(this, GroupingOptions.builder().numReducers(numReduceTasks).build());
+  }
+
+  public PGroupedTableImpl<K, V> groupByKey(GroupingOptions groupingOptions) {
+    return new PGroupedTableImpl<K, V>(this, groupingOptions);
+  }
+
+  @Override
+  public PTable<K, V> union(PTable<K, V> other) {
+    return union(new PTable[] { other });
+  }
+  
+  @Override
+  public PTable<K, V> union(PTable<K, V>... others) {
+    List<PTableBase<K, V>> internal = Lists.newArrayList();
+    internal.add(this);
+    for (PTable<K, V> table : others) {
+      internal.add((PTableBase<K, V>) table);
+    }
+    return new UnionTable<K, V>(internal);
+  }
+
+  @Override
+  public PTable<K, V> write(Target target) {
+    if (getMaterializedAt() != null) {
+      getPipeline().write(new InputTable<K, V>(
+          (TableSource<K, V>) getMaterializedAt(), (MRPipeline) getPipeline()), target);
+    } else {
+      getPipeline().write(this, target);
+    }
+    return this;
+  }
+
+  @Override
+  public PTable<K, V> write(Target target, Target.WriteMode writeMode) {
+    if (getMaterializedAt() != null) {
+      getPipeline().write(new InputTable<K, V>(
+          (TableSource<K, V>) getMaterializedAt(), (MRPipeline) getPipeline()), target, writeMode);
+    } else {
+      getPipeline().write(this, target, writeMode);
+    }
+    return this;
+  }
+  
+  @Override
+  public PTable<K, V> filter(FilterFn<Pair<K, V>> filterFn) {
+    return parallelDo(filterFn, getPTableType());
+  }
+  
+  @Override
+  public PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn) {
+    return parallelDo(name, filterFn, getPTableType());
+  }
+  
+  @Override
+  public PTable<K, V> top(int count) {
+    return Aggregate.top(this, count, true);
+  }
+
+  @Override
+  public PTable<K, V> bottom(int count) {
+    return Aggregate.top(this, count, false);
+  }
+
+  @Override
+  public PTable<K, Collection<V>> collectValues() {
+    return Aggregate.collectValues(this);
+  }
+
+  @Override
+  public <U> PTable<K, Pair<V, U>> join(PTable<K, U> other) {
+    return Join.join(this, other);
+  }
+
+  @Override
+  public <U> PTable<K, Pair<Collection<V>, Collection<U>>> cogroup(PTable<K, U> other) {
+    return Cogroup.cogroup(this, other);
+  }
+
+  @Override
+  public PCollection<K> keys() {
+    return PTables.keys(this);
+  }
+
+  @Override
+  public PCollection<V> values() {
+    return PTables.values(this);
+  }
+
+  /**
+   * Returns a Map<K, V> made up of the keys and values in this PTable.
+   */
+  @Override
+  public Map<K, V> materializeToMap() {
+    return new MaterializableMap<K, V>(this.materialize());
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public PObject<Map<K, V>> asMap() {
+    return new MapPObject<K, V>(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
new file mode 100644
index 0000000..7b3dd7b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import java.util.List;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.plan.DoNode;
+import org.apache.crunch.types.PType;
+
+import com.google.common.collect.ImmutableList;
+
+public class UnionCollection<S> extends PCollectionImpl<S> {
+
+  private List<PCollectionImpl<S>> parents;
+  private long size = 0;
+
+  private static <S> String flatName(List<PCollectionImpl<S>> collections) {
+    StringBuilder sb = new StringBuilder("union(");
+    for (int i = 0; i < collections.size(); i++) {
+      if (i != 0) {
+        sb.append(',');
+      }
+      sb.append(collections.get(i).getName());
+    }
+    return sb.append(')').toString();
+  }
+
+  UnionCollection(List<PCollectionImpl<S>> collections) {
+    super(flatName(collections));
+    this.parents = ImmutableList.copyOf(collections);
+    this.pipeline = (MRPipeline) parents.get(0).getPipeline();
+    for (PCollectionImpl<S> parent : parents) {
+      if (this.pipeline != parent.getPipeline()) {
+        throw new IllegalStateException("Cannot union PCollections from different Pipeline instances");
+      }
+      size += parent.getSize();
+    }
+  }
+
+  @Override
+  protected long getSizeInternal() {
+    return size;
+  }
+
+  @Override
+  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
+    visitor.visitUnionCollection(this);
+  }
+
+  @Override
+  public PType<S> getPType() {
+    return parents.get(0).getPType();
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.<PCollectionImpl<?>> copyOf(parents);
+  }
+
+  @Override
+  public DoNode createDoNode() {
+    throw new UnsupportedOperationException("Unioned collection does not support DoNodes");
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
new file mode 100644
index 0000000..a369432
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import java.util.List;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.plan.DoNode;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class UnionTable<K, V> extends PTableBase<K, V> {
+
+  private PTableType<K, V> ptype;
+  private List<PCollectionImpl<Pair<K, V>>> parents;
+  private long size;
+
+  private static <K, V> String flatName(List<PTableBase<K, V>> tables) {
+    StringBuilder sb = new StringBuilder("union(");
+    for (int i = 0; i < tables.size(); i++) {
+      if (i != 0) {
+        sb.append(',');
+      }
+      sb.append(tables.get(i).getName());
+    }
+    return sb.append(')').toString();
+  }
+
+  public UnionTable(List<PTableBase<K, V>> tables) {
+    super(flatName(tables));
+    this.ptype = tables.get(0).getPTableType();
+    this.pipeline = (MRPipeline) tables.get(0).getPipeline();
+    this.parents = Lists.newArrayList();
+    for (PTableBase<K, V> parent : tables) {
+      if (pipeline != parent.getPipeline()) {
+        throw new IllegalStateException("Cannot union PTables from different Pipeline instances");
+      }
+      this.parents.add(parent);
+      size += parent.getSize();
+    }
+  }
+
+  @Override
+  protected long getSizeInternal() {
+    return size;
+  }
+
+  @Override
+  public PTableType<K, V> getPTableType() {
+    return ptype;
+  }
+
+  @Override
+  public PType<Pair<K, V>> getPType() {
+    return ptype;
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.<PCollectionImpl<?>> copyOf(parents);
+  }
+
+  @Override
+  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
+    visitor.visitUnionCollection(new UnionCollection<Pair<K, V>>(parents));
+  }
+
+  @Override
+  public DoNode createDoNode() {
+    throw new UnsupportedOperationException("Unioned table does not support do nodes");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
new file mode 100644
index 0000000..b6df98b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.emit;
+
+import java.util.List;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.impl.mr.run.RTNode;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * An {@link Emitter} implementation that links the output of one {@link DoFn} to the input of
+ * another {@code DoFn}.
+ * 
+ */
+public class IntermediateEmitter implements Emitter<Object> {
+
+  private final List<RTNode> children;
+  private final Configuration conf;
+  private final PType<Object> outputPType;
+  private final boolean needDetachedValues;
+
+  public IntermediateEmitter(PType<Object> outputPType, List<RTNode> children, Configuration conf) {
+    this.outputPType = outputPType;
+    this.children = ImmutableList.copyOf(children);
+    this.conf = conf;
+
+    outputPType.initialize(conf);
+    needDetachedValues = this.children.size() > 1;
+  }
+
+  public void emit(Object emitted) {
+    for (RTNode child : children) {
+      Object value = emitted;
+      if (needDetachedValues) {
+        value = this.outputPType.getDetachedValue(emitted);
+      }
+      child.process(value);
+    }
+  }
+
+  public void flush() {
+    // No-op
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
new file mode 100644
index 0000000..2e58fed
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.emit;
+
+import java.io.IOException;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.io.CrunchOutputs;
+import org.apache.crunch.types.Converter;
+
+public class MultipleOutputEmitter<T, K, V> implements Emitter<T> {
+
+  private final Converter converter;
+  private final CrunchOutputs<K, V> outputs;
+  private final String outputName;
+
+  public MultipleOutputEmitter(Converter converter, CrunchOutputs<K, V> outputs,
+      String outputName) {
+    this.converter = converter;
+    this.outputs = outputs;
+    this.outputName = outputName;
+  }
+
+  @Override
+  public void emit(T emitted) {
+    try {
+      this.outputs.write(outputName,
+          (K) converter.outputKey(emitted),
+          (V) converter.outputValue(emitted));
+    } catch (Exception e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+
+  @Override
+  public void flush() {
+    // No-op
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/OutputEmitter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/OutputEmitter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/OutputEmitter.java
new file mode 100644
index 0000000..bc3ae0d
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/OutputEmitter.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.emit;
+
+import java.io.IOException;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.types.Converter;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+public class OutputEmitter<T, K, V> implements Emitter<T> {
+
+  private final Converter<K, V, Object, Object> converter;
+  private final TaskInputOutputContext<?, ?, K, V> context;
+
+  public OutputEmitter(Converter<K, V, Object, Object> converter, TaskInputOutputContext<?, ?, K, V> context) {
+    this.converter = converter;
+    this.context = context;
+  }
+
+  public void emit(T emitted) {
+    try {
+      K key = converter.outputKey(emitted);
+      V value = converter.outputValue(emitted);
+      this.context.write(key, value);
+    } catch (IOException e) {
+      throw new CrunchRuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+
+  public void flush() {
+    // No-op
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounter.java
new file mode 100644
index 0000000..d90f2e8
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CappedExponentialCounter.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.exec;
+
+/**
+ * Generate a series of capped numbers exponentially.
+ *
+ * It is used for creating retry intervals. It is NOT thread-safe.
+ */
+public class CappedExponentialCounter {
+
+  private long current;
+  private final long limit;
+
+  public CappedExponentialCounter(long start, long limit) {
+    this.current = start;
+    this.limit = limit;
+  }
+
+  public long get() {
+    long result = current;
+    current = Math.min(current * 2, limit);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
new file mode 100644
index 0000000..74bc9ac
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.exec;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
+import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.PathTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+public final class CrunchJobHooks {
+
+  private CrunchJobHooks() {}
+
+  /** Creates missing input directories before job is submitted. */
+  public static final class PrepareHook implements CrunchControlledJob.Hook {
+    private final Job job;
+
+    public PrepareHook(Job job) {
+      this.job = job;
+    }
+
+    @Override
+    public void run() throws IOException {
+      Configuration conf = job.getConfiguration();
+      if (conf.getBoolean(RuntimeParameters.CREATE_DIR, false)) {
+        Path[] inputPaths = FileInputFormat.getInputPaths(job);
+        for (Path inputPath : inputPaths) {
+          FileSystem fs = inputPath.getFileSystem(conf);
+          if (!fs.exists(inputPath)) {
+            try {
+              fs.mkdirs(inputPath);
+            } catch (IOException e) {
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /** Moving output files produced by the MapReduce job to specified directories. */
+  public static final class CompletionHook implements CrunchControlledJob.Hook {
+    private final Job job;
+    private final Path workingPath;
+    private final Map<Integer, PathTarget> multiPaths;
+    private final boolean mapOnlyJob;
+
+    public CompletionHook(Job job, Path workingPath, Map<Integer, PathTarget> multiPaths, boolean mapOnlyJob) {
+      this.job = job;
+      this.workingPath = workingPath;
+      this.multiPaths = multiPaths;
+      this.mapOnlyJob = mapOnlyJob;
+    }
+
+    @Override
+    public void run() throws IOException {
+      handleMultiPaths();
+    }
+
+    private synchronized void handleMultiPaths() throws IOException {
+      if (!multiPaths.isEmpty()) {
+        // Need to handle moving the data from the output directory of the
+        // job to the output locations specified in the paths.
+        FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration());
+        for (Map.Entry<Integer, PathTarget> entry : multiPaths.entrySet()) {
+          final int i = entry.getKey();
+          final Path dst = entry.getValue().getPath();
+          FileNamingScheme fileNamingScheme = entry.getValue().getFileNamingScheme();
+
+          Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*");
+          Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
+          Configuration conf = job.getConfiguration();
+          FileSystem dstFs = dst.getFileSystem(conf);
+          if (!dstFs.exists(dst)) {
+            dstFs.mkdirs(dst);
+          }
+          boolean sameFs = isCompatible(srcFs, dst);
+          for (Path s : srcs) {
+            Path d = getDestFile(conf, s, dst, fileNamingScheme);
+            if (sameFs) {
+              srcFs.rename(s, d);
+            } else {
+              FileUtil.copy(srcFs, s, dstFs, d, true, true, job.getConfiguration());
+            }
+          }
+        }
+      }
+    }
+
+    private boolean isCompatible(FileSystem fs, Path path) {
+      try {
+        fs.makeQualified(path);
+        return true;
+      } catch (IllegalArgumentException e) {
+        return false;
+      }
+    }
+    private Path getDestFile(Configuration conf, Path src, Path dir, FileNamingScheme fileNamingScheme)
+        throws IOException {
+      String outputFilename = null;
+      if (mapOnlyJob) {
+        outputFilename = fileNamingScheme.getMapOutputName(conf, dir);
+      } else {
+        outputFilename = fileNamingScheme.getReduceOutputName(conf, dir, extractPartitionNumber(src.getName()));
+      }
+      if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) {
+        outputFilename += org.apache.avro.mapred.AvroOutputFormat.EXT;
+      }
+      return new Path(dir, outputFilename);
+    }
+  }
+
+  /**
+   * Extract the partition number from a raw reducer output filename.
+   *
+   * @param reduceOutputFileName The raw reducer output file name
+   * @return The partition number encoded in the filename
+   */
+  static int extractPartitionNumber(String reduceOutputFileName) {
+    Matcher matcher = Pattern.compile(".*-r-(\\d{5})").matcher(reduceOutputFileName);
+    if (matcher.find()) {
+      return Integer.parseInt(matcher.group(1), 10);
+    } else {
+      throw new IllegalArgumentException("Reducer output name '" + reduceOutputFileName + "' cannot be parsed");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
new file mode 100644
index 0000000..4c7b7ea
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.exec;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.PipelineExecution;
+import org.apache.crunch.PipelineResult;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
+import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Provides APIs for job control at runtime to clients.
+ *
+ * This class has a thread that submits jobs when they become ready, monitors
+ * the states of the running jobs, and updates the states of jobs based on the
+ * state changes of their depending jobs states.
+ *
+ * It is thread-safe.
+ */
+public class MRExecutor implements PipelineExecution {
+
+  private static final Log LOG = LogFactory.getLog(MRExecutor.class);
+
+  private final CrunchJobControl control;
+  private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
+  private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
+  private final CountDownLatch doneSignal = new CountDownLatch(1);
+  private final CountDownLatch killSignal = new CountDownLatch(1);
+  private final CappedExponentialCounter pollInterval;
+  private AtomicReference<Status> status = new AtomicReference<Status>(Status.READY);
+  private PipelineResult result;
+  private Thread monitorThread;
+
+  private String planDotFile;
+  
+  public MRExecutor(Class<?> jarClass, Map<PCollectionImpl<?>, Set<Target>> outputTargets,
+      Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) {
+    this.control = new CrunchJobControl(jarClass.toString());
+    this.outputTargets = outputTargets;
+    this.toMaterialize = toMaterialize;
+    this.monitorThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        monitorLoop();
+      }
+    });
+    this.pollInterval = isLocalMode()
+      ? new CappedExponentialCounter(50, 1000)
+      : new CappedExponentialCounter(500, 10000);
+  }
+
+  public void addJob(CrunchControlledJob job) {
+    this.control.addJob(job);
+  }
+
+  public void setPlanDotFile(String planDotFile) {
+    this.planDotFile = planDotFile;
+  }
+  
+  public PipelineExecution execute() {
+    monitorThread.start();
+    return this;
+  }
+
+  /** Monitors running status. It is called in {@code MonitorThread}. */
+  private void monitorLoop() {
+    try {
+      while (killSignal.getCount() > 0 && !control.allFinished()) {
+        control.pollJobStatusAndStartNewOnes();
+        killSignal.await(pollInterval.get(), TimeUnit.MILLISECONDS);
+      }
+      control.killAllRunningJobs();
+
+      List<CrunchControlledJob> failures = control.getFailedJobList();
+      if (!failures.isEmpty()) {
+        System.err.println(failures.size() + " job failure(s) occurred:");
+        for (CrunchControlledJob job : failures) {
+          System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage());
+        }
+      }
+      List<PipelineResult.StageResult> stages = Lists.newArrayList();
+      for (CrunchControlledJob job : control.getSuccessfulJobList()) {
+        stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters()));
+      }
+
+      for (PCollectionImpl<?> c : outputTargets.keySet()) {
+        if (toMaterialize.containsKey(c)) {
+          MaterializableIterable iter = toMaterialize.get(c);
+          if (iter.isSourceTarget()) {
+            iter.materialize();
+            c.materializeAt((SourceTarget) iter.getSource());
+          }
+        } else {
+          boolean materialized = false;
+          for (Target t : outputTargets.get(c)) {
+            if (!materialized) {
+              if (t instanceof SourceTarget) {
+                c.materializeAt((SourceTarget) t);
+                materialized = true;
+              } else {
+                SourceTarget st = t.asSourceTarget(c.getPType());
+                if (st != null) {
+                  c.materializeAt(st);
+                  materialized = true;
+                }
+              }
+            }
+          }
+        }
+      }
+
+      synchronized (this) {
+        result = new PipelineResult(stages);
+        if (killSignal.getCount() == 0) {
+          status.set(Status.KILLED);
+        } else {
+          status.set(result.succeeded() ? Status.SUCCEEDED : Status.FAILED);
+        }
+      }
+    } catch (InterruptedException e) {
+      throw new AssertionError(e); // Nobody should interrupt us.
+    } catch (IOException e) {
+      LOG.error("Pipeline failed due to exception", e);
+      status.set(Status.FAILED);
+    } finally {
+      doneSignal.countDown();
+    }
+  }
+
+  @Override
+  public String getPlanDotFile() {
+    return planDotFile;
+  }
+
+  @Override
+  public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException {
+    doneSignal.await(timeout, timeUnit);
+  }
+
+  @Override
+  public void waitUntilDone() throws InterruptedException {
+    doneSignal.await();
+  }
+
+  @Override
+  public synchronized Status getStatus() {
+    return status.get();
+  }
+
+  @Override
+  public synchronized PipelineResult getResult() {
+    return result;
+  }
+
+  @Override
+  public void kill() throws InterruptedException {
+    killSignal.countDown();
+  }
+
+  private static boolean isLocalMode() {
+    Configuration conf = new Configuration();
+    // Try to handle MapReduce version 0.20 or 0.22
+    String jobTrackerAddress = conf.get("mapreduce.jobtracker.address",
+        conf.get("mapred.job.tracker", "local"));
+    return "local".equals(jobTrackerAddress);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/package-info.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/package-info.java
new file mode 100644
index 0000000..7e403c3
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * A Pipeline implementation that runs on Hadoop MapReduce.
+ */
+package org.apache.crunch.impl.mr;

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
new file mode 100644
index 0000000..865369c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.List;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Source;
+import org.apache.crunch.impl.mr.run.NodeContext;
+import org.apache.crunch.impl.mr.run.RTNode;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class DoNode {
+
+  private static final List<DoNode> NO_CHILDREN = ImmutableList.of();
+
+  private final DoFn fn;
+  private final String name;
+  private final PType<?> ptype;
+  private final List<DoNode> children;
+  private final Converter outputConverter;
+  private final Source<?> source;
+  private String outputName;
+
+  private DoNode(DoFn fn, String name, PType<?> ptype, List<DoNode> children, Converter outputConverter,
+      Source<?> source) {
+    this.fn = fn;
+    this.name = name;
+    this.ptype = ptype;
+    this.children = children;
+    this.outputConverter = outputConverter;
+    this.source = source;
+  }
+
+  private static List<DoNode> allowsChildren() {
+    return Lists.newArrayList();
+  }
+
+  public static <K, V> DoNode createGroupingNode(String name, PGroupedTableType<K, V> ptype) {
+    DoFn<?, ?> fn = ptype.getOutputMapFn();
+    return new DoNode(fn, name, ptype, NO_CHILDREN, ptype.getGroupingConverter(), null);
+  }
+
+  public static <S> DoNode createOutputNode(String name, PType<S> ptype) {
+    Converter outputConverter = ptype.getConverter();
+    DoFn<?, ?> fn = ptype.getOutputMapFn();
+    return new DoNode(fn, name, ptype, NO_CHILDREN, outputConverter, null);
+  }
+
+  public static DoNode createFnNode(String name, DoFn<?, ?> function, PType<?> ptype) {
+    return new DoNode(function, name, ptype, allowsChildren(), null, null);
+  }
+
+  public static <S> DoNode createInputNode(Source<S> source) {
+    PType<?> ptype = source.getType();
+    DoFn<?, ?> fn = ptype.getInputMapFn();
+    return new DoNode(fn, source.toString(), ptype, allowsChildren(), null, source);
+  }
+
+  public boolean isInputNode() {
+    return source != null;
+  }
+
+  public boolean isOutputNode() {
+    return outputConverter != null;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public List<DoNode> getChildren() {
+    return children;
+  }
+
+  public Source<?> getSource() {
+    return source;
+  }
+
+  public PType<?> getPType() {
+    return ptype;
+  }
+
+  public DoNode addChild(DoNode node) {
+    // TODO: This is sort of terrible, refactor the code to make this make more sense.
+    boolean exists = false;
+    for (DoNode child : children) {
+      if (node == child) {
+        exists = true;
+        break;
+      }
+    }
+    if (!exists) {
+      children.add(node);
+    }
+    return this;
+  }
+
+  public void setOutputName(String outputName) {
+    if (outputConverter == null) {
+      throw new IllegalStateException("Cannot set output name w/o output converter: " + outputName);
+    }
+    this.outputName = outputName;
+  }
+
+  public RTNode toRTNode(boolean inputNode, Configuration conf, NodeContext nodeContext) {
+    List<RTNode> childRTNodes = Lists.newArrayList();
+    fn.configure(conf);
+    for (DoNode child : children) {
+      childRTNodes.add(child.toRTNode(false, conf, nodeContext));
+    }
+
+    Converter inputConverter = null;
+    if (inputNode) {
+      if (nodeContext == NodeContext.MAP) {
+        inputConverter = ptype.getConverter();
+      } else {
+        inputConverter = ((PGroupedTableType<?, ?>) ptype).getGroupingConverter();
+      }
+    }
+    return new RTNode(fn, (PType<Object>) getPType(), name, childRTNodes, inputConverter, outputConverter, outputName);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !(other instanceof DoNode)) {
+      return false;
+    }
+    if (this == other) {
+      return true;
+    }
+    DoNode o = (DoNode) other;
+    return (name.equals(o.name) && fn.equals(o.fn) && source == o.source && outputConverter == o.outputConverter);
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    return hcb.append(name).append(fn).append(source).append(outputConverter).toHashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
new file mode 100644
index 0000000..46d8c53
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.collect.InputCollection;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * Writes <a href="http://www.graphviz.org">Graphviz</a> dot files to illustrate
+ * the topology of Crunch pipelines.
+ */
+public class DotfileWriter {
+  
+  /** The types of tasks within a MapReduce job. */
+  enum MRTaskType { MAP, REDUCE };
+
+  private Set<JobPrototype> jobPrototypes = Sets.newHashSet();
+  private HashMultimap<Pair<JobPrototype, MRTaskType>, String> jobNodeDeclarations = HashMultimap.create();
+  private Set<String> globalNodeDeclarations = Sets.newHashSet();
+  private Set<String> nodePathChains = Sets.newHashSet();
+
+  /**
+   * Format the declaration of a node based on a PCollection.
+   * 
+   * @param pcollectionImpl PCollection for which a node will be declared
+   * @param jobPrototype The job containing the PCollection
+   * @return The node declaration
+   */
+  String formatPCollectionNodeDeclaration(PCollectionImpl<?> pcollectionImpl, JobPrototype jobPrototype) {
+    String shape = "box";
+    if (pcollectionImpl instanceof InputCollection) {
+      shape = "folder";
+    }
+    return String.format("%s [label=\"%s\" shape=%s];", formatPCollection(pcollectionImpl, jobPrototype), pcollectionImpl.getName(),
+        shape);
+  }
+
+  /**
+   * Format a Target as a node declaration.
+   * 
+   * @param target A Target used within a MapReduce pipeline
+   * @return The global node declaration for the Target
+   */
+  String formatTargetNodeDeclaration(Target target) {
+    return String.format("\"%s\" [label=\"%s\" shape=folder];", target.toString(), target.toString());
+  }
+
+  /**
+   * Format a PCollectionImpl into a format to be used for dot files.
+   * 
+   * @param pcollectionImpl The PCollectionImpl to be formatted
+   * @param jobPrototype The job containing the PCollection
+   * @return The dot file formatted representation of the PCollectionImpl
+   */
+  String formatPCollection(PCollectionImpl<?> pcollectionImpl, JobPrototype jobPrototype) {
+    if (pcollectionImpl instanceof InputCollection) {
+      InputCollection<?> inputCollection = (InputCollection<?>) pcollectionImpl;
+      return String.format("\"%s\"", inputCollection.getSource());
+    }
+    return String.format("\"%s@%d@%d\"", pcollectionImpl.getName(), pcollectionImpl.hashCode(), jobPrototype.hashCode());
+  }
+
+  /**
+   * Format a collection of node strings into dot file syntax.
+   * 
+   * @param nodeCollection Collection of chained node strings
+   * @return The dot-formatted chain of nodes
+   */
+  String formatNodeCollection(List<String> nodeCollection) {
+    return String.format("%s;", Joiner.on(" -> ").join(nodeCollection));
+  }
+
+  /**
+   * Format a NodePath in dot file syntax.
+   * 
+   * @param nodePath The node path to be formatted
+   * @param jobPrototype The job containing the NodePath
+   * @return The dot file representation of the node path
+   */
+  List<String> formatNodePath(NodePath nodePath, JobPrototype jobPrototype) {
+    List<String> formattedNodePaths = Lists.newArrayList();
+    
+    List<PCollectionImpl<?>> pcollections = Lists.newArrayList(nodePath);
+    for (int collectionIndex = 1; collectionIndex < pcollections.size(); collectionIndex++){
+      String fromNode = formatPCollection(pcollections.get(collectionIndex - 1), jobPrototype);
+      String toNode = formatPCollection(pcollections.get(collectionIndex), jobPrototype);
+      formattedNodePaths.add(formatNodeCollection(Lists.newArrayList(fromNode, toNode)));
+    }
+    return formattedNodePaths;
+  }
+
+  /**
+   * Add a NodePath to be formatted as a list of node declarations within a
+   * single job.
+   * 
+   * @param jobPrototype The job containing the node path
+   * @param nodePath The node path to be formatted
+   */
+  void addNodePathDeclarations(JobPrototype jobPrototype, NodePath nodePath) {
+    boolean groupingEncountered = false;
+    for (PCollectionImpl<?> pcollectionImpl : nodePath) {
+      if (pcollectionImpl instanceof InputCollection) {
+        globalNodeDeclarations.add(formatPCollectionNodeDeclaration(pcollectionImpl, jobPrototype));
+      } else {
+        if (!groupingEncountered){
+          groupingEncountered = (pcollectionImpl instanceof PGroupedTableImpl);
+        }
+
+        MRTaskType taskType = groupingEncountered ? MRTaskType.REDUCE : MRTaskType.MAP;
+        jobNodeDeclarations.put(Pair.of(jobPrototype, taskType), formatPCollectionNodeDeclaration(pcollectionImpl, jobPrototype));
+      }
+    }
+  }
+
+  /**
+   * Add the chaining of a NodePath to the graph.
+   * 
+   * @param nodePath The path to be formatted as a node chain in the dot file
+   * @param jobPrototype The job containing the NodePath
+   */
+  void addNodePathChain(NodePath nodePath, JobPrototype jobPrototype) {
+    for (String nodePathChain : formatNodePath(nodePath, jobPrototype)){
+      this.nodePathChains.add(nodePathChain);
+    }
+  }
+
+  /**
+   * Get the graph attributes for a task-specific subgraph.
+   * 
+   * @param taskType The type of task in the subgraph
+   * @return Graph attributes
+   */
+  String getTaskGraphAttributes(MRTaskType taskType) {
+    if (taskType == MRTaskType.MAP) {
+      return "label = Map; color = blue;";
+    } else {
+      return "label = Reduce; color = red;";
+    }
+  }
+
+  /**
+   * Add the contents of a {@link JobPrototype} to the graph describing a
+   * pipeline.
+   * 
+   * @param jobPrototype A JobPrototype representing a portion of a MapReduce
+   *          pipeline
+   */
+  public void addJobPrototype(JobPrototype jobPrototype) {
+    jobPrototypes.add(jobPrototype);
+    if (!jobPrototype.isMapOnly()) {
+      for (NodePath nodePath : jobPrototype.getMapNodePaths()) {
+        addNodePathDeclarations(jobPrototype, nodePath);
+        addNodePathChain(nodePath, jobPrototype);
+      }
+    }
+
+    HashMultimap<Target, NodePath> targetsToNodePaths = jobPrototype.getTargetsToNodePaths();
+    for (Target target : targetsToNodePaths.keySet()) {
+      globalNodeDeclarations.add(formatTargetNodeDeclaration(target));
+      for (NodePath nodePath : targetsToNodePaths.get(target)) {
+        addNodePathDeclarations(jobPrototype, nodePath);
+        addNodePathChain(nodePath, jobPrototype);
+        nodePathChains.add(formatNodeCollection(Lists.newArrayList(formatPCollection(nodePath.descendingIterator()
+            .next(), jobPrototype), String.format("\"%s\"", target.toString()))));
+      }
+    }
+  }
+
+  /**
+   * Build up the full dot file containing the description of a MapReduce
+   * pipeline.
+   * 
+   * @return Graphviz dot file contents
+   */
+  public String buildDotfile() {
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append("digraph G {\n");
+    int clusterIndex = 0;
+
+    for (String globalDeclaration : globalNodeDeclarations) {
+      stringBuilder.append(String.format("  %s\n", globalDeclaration));
+    }
+
+    for (JobPrototype jobPrototype : jobPrototypes){
+      StringBuilder jobProtoStringBuilder = new StringBuilder();
+      jobProtoStringBuilder.append(String.format("  subgraph cluster%d {\n", clusterIndex++));
+      for (MRTaskType taskType : MRTaskType.values()){
+        Pair<JobPrototype,MRTaskType> jobTaskKey = Pair.of(jobPrototype, taskType);
+        if (jobNodeDeclarations.containsKey(jobTaskKey)){
+          jobProtoStringBuilder.append(String.format("    subgraph cluster%d {\n", clusterIndex++));
+          jobProtoStringBuilder.append(String.format("      %s\n", getTaskGraphAttributes(taskType)));
+          for (String declarationEntry : jobNodeDeclarations.get(jobTaskKey)){
+            jobProtoStringBuilder.append(String.format("      %s\n", declarationEntry));
+          }
+          jobProtoStringBuilder.append("    }\n");
+        }
+      }
+      jobProtoStringBuilder.append("  }\n");
+      stringBuilder.append(jobProtoStringBuilder.toString());
+    }
+    
+    for (String nodePathChain : nodePathChains) {
+      stringBuilder.append(String.format("  %s\n", nodePathChain));
+    }
+
+    stringBuilder.append("}\n");
+    return stringBuilder.toString();
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
new file mode 100644
index 0000000..1e59df0
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.lang.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ *
+ */
+class Edge {
+  private final Vertex head;
+  private final Vertex tail;
+  private final Set<NodePath> paths;
+  
+  public Edge(Vertex head, Vertex tail) {
+    this.head = head;
+    this.tail = tail;
+    this.paths = Sets.newHashSet();
+  }
+  
+  public Vertex getHead() {
+    return head;
+  }
+  
+  public Vertex getTail() {
+    return tail;
+  }
+
+  public void addNodePath(NodePath path) {
+    this.paths.add(path);
+  }
+  
+  public void addAllNodePaths(Collection<NodePath> paths) {
+    this.paths.addAll(paths);
+  }
+  
+  public Set<NodePath> getNodePaths() {
+    return paths;
+  }
+  
+  public PCollectionImpl getSplit() {
+    List<Iterator<PCollectionImpl<?>>> iters = Lists.newArrayList();
+    for (NodePath nodePath : paths) {
+      Iterator<PCollectionImpl<?>> iter = nodePath.iterator();
+      iter.next(); // prime this past the initial NGroupedTableImpl
+      iters.add(iter);
+    }
+
+    // Find the lowest point w/the lowest cost to be the split point for
+    // all of the dependent paths.
+    boolean end = false;
+    int splitIndex = -1;
+    while (!end) {
+      splitIndex++;
+      PCollectionImpl<?> current = null;
+      for (Iterator<PCollectionImpl<?>> iter : iters) {
+        if (iter.hasNext()) {
+          PCollectionImpl<?> next = iter.next();
+          if (next instanceof PGroupedTableImpl) {
+            end = true;
+            break;
+          } else if (current == null) {
+            current = next;
+          } else if (current != next) {
+            end = true;
+            break;
+          }
+        } else {
+          end = true;
+          break;
+        }
+      }
+    }
+    // TODO: Add costing calcs here.
+    
+    return Iterables.getFirst(paths, null).get(splitIndex);
+  }
+  
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !(other instanceof Edge)) {
+      return false;
+    }
+    Edge e = (Edge) other;
+    return head.equals(e.head) && tail.equals(e.tail) && paths.equals(e.paths);
+  }
+  
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(head).append(tail).toHashCode();
+  }
+  
+  @Override
+  public String toString() {
+    return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java
new file mode 100644
index 0000000..ce0a847
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ *
+ */
+class Graph implements Iterable<Vertex> {
+
+  private final Map<PCollectionImpl, Vertex> vertices;
+  private final Map<Pair<Vertex, Vertex>, Edge> edges;  
+  private final Map<Vertex, List<Vertex>> dependencies;
+  
+  public Graph() {
+    this.vertices = Maps.newHashMap();
+    this.edges = Maps.newHashMap();
+    this.dependencies = Maps.newHashMap();
+  }
+  
+  public Vertex getVertexAt(PCollectionImpl impl) {
+    return vertices.get(impl);
+  }
+  
+  public Vertex addVertex(PCollectionImpl impl, boolean output) {
+    if (vertices.containsKey(impl)) {
+      Vertex v = vertices.get(impl);
+      if (output) {
+        v.setOutput();
+      }
+      return v;
+    }
+    Vertex v = new Vertex(impl);
+    vertices.put(impl, v);
+    if (output) {
+      v.setOutput();
+    }
+    return v;
+  }
+  
+  public Edge getEdge(Vertex head, Vertex tail) {
+    Pair<Vertex, Vertex> p = Pair.of(head, tail);
+    if (edges.containsKey(p)) {
+      return edges.get(p);
+    }
+    
+    Edge e = new Edge(head, tail);
+    edges.put(p, e);
+    tail.addIncoming(e);
+    head.addOutgoing(e);
+    return e;
+  }
+  
+  @Override
+  public Iterator<Vertex> iterator() {
+    return Sets.newHashSet(vertices.values()).iterator();
+  }
+
+  public Set<Edge> getAllEdges() {
+    return Sets.newHashSet(edges.values());
+  }
+  
+  public void markDependency(Vertex child, Vertex parent) {
+    List<Vertex> parents = dependencies.get(child);
+    if (parents == null) {
+      parents = Lists.newArrayList();
+      dependencies.put(child, parents);
+    }
+    parents.add(parent);
+  }
+  
+  public List<Vertex> getParents(Vertex child) {
+    if (dependencies.containsKey(child)) {
+      return dependencies.get(child);
+    }
+    return ImmutableList.of();
+  }
+  
+  public List<List<Vertex>> connectedComponents() {
+    List<List<Vertex>> components = Lists.newArrayList();
+    Set<Vertex> unassigned = Sets.newHashSet(vertices.values());
+    while (!unassigned.isEmpty()) {
+      Vertex base = unassigned.iterator().next();
+      List<Vertex> component = Lists.newArrayList();
+      component.add(base);
+      unassigned.remove(base);
+      Set<Vertex> working = Sets.newHashSet(base.getAllNeighbors());
+      while (!working.isEmpty()) {
+        Vertex n = working.iterator().next();
+        working.remove(n);
+        if (unassigned.contains(n)) {
+          component.add(n);
+          unassigned.remove(n);
+          for (Vertex n2 : n.getAllNeighbors()) {
+            if (unassigned.contains(n2)) {
+              working.add(n2);
+            }
+          }
+        }
+      }
+      components.add(component);
+    }
+    
+    return components;
+  }  
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java
new file mode 100644
index 0000000..925c39a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import org.apache.crunch.impl.mr.collect.DoCollectionImpl;
+import org.apache.crunch.impl.mr.collect.DoTableImpl;
+import org.apache.crunch.impl.mr.collect.InputCollection;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+import org.apache.crunch.impl.mr.collect.UnionCollection;
+
+/**
+ *
+ */
+class GraphBuilder implements PCollectionImpl.Visitor {
+
+  private Graph graph = new Graph();
+  private Vertex workingVertex;
+  private NodePath workingPath;
+  
+  public Graph getGraph() {
+    return graph;
+  }
+  
+  public void visitOutput(PCollectionImpl<?> output) {
+    workingVertex = graph.addVertex(output, true);
+    workingPath = new NodePath();
+    output.accept(this);
+  }
+  
+  @Override
+  public void visitInputCollection(InputCollection<?> collection) {
+    Vertex v = graph.addVertex(collection, false);
+    graph.getEdge(v, workingVertex).addNodePath(workingPath.close(collection));
+  }
+
+  @Override
+  public void visitUnionCollection(UnionCollection<?> collection) {
+    Vertex baseVertex = workingVertex;
+    NodePath basePath = workingPath;
+    for (PCollectionImpl<?> parent : collection.getParents()) {
+      workingPath = new NodePath(basePath);
+      workingVertex = baseVertex;
+      processParent(parent);
+    }
+  }
+
+  @Override
+  public void visitDoFnCollection(DoCollectionImpl<?> collection) {
+    workingPath.push(collection);
+    processParent(collection.getOnlyParent());
+  }
+
+  @Override
+  public void visitDoTable(DoTableImpl<?, ?> collection) {
+    workingPath.push(collection);
+    processParent(collection.getOnlyParent());
+  }
+
+  @Override
+  public void visitGroupedTable(PGroupedTableImpl<?, ?> collection) {
+    Vertex v = graph.addVertex(collection, false);
+    graph.getEdge(v, workingVertex).addNodePath(workingPath.close(collection));
+    workingVertex = v;
+    workingPath = new NodePath(collection);
+    processParent(collection.getOnlyParent());
+  }
+  
+  private void processParent(PCollectionImpl<?> parent) {
+    Vertex v = graph.getVertexAt(parent);
+    if (v == null) {
+      parent.accept(this);
+    } else {
+      graph.getEdge(v, workingVertex).addNodePath(workingPath.close(parent));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java
new file mode 100644
index 0000000..9ad7300
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+/**
+ * Visitor that traverses the {@code DoNode} instances in a job and builds a
+ * String that identifies the stages of the pipeline that belong to this job.
+ */
+class JobNameBuilder {
+
+  private static final Joiner JOINER = Joiner.on("+");
+  private static final Joiner CHILD_JOINER = Joiner.on("/");
+
+  private String pipelineName;
+  List<String> rootStack = Lists.newArrayList();
+
+  public JobNameBuilder(final String pipelineName) {
+    this.pipelineName = pipelineName;
+  }
+
+  public void visit(DoNode node) {
+    visit(node, rootStack);
+  }
+
+  public void visit(List<DoNode> nodes) {
+    visit(nodes, rootStack);
+  }
+
+  private void visit(List<DoNode> nodes, List<String> stack) {
+    if (nodes.size() == 1) {
+      visit(nodes.get(0), stack);
+    } else {
+      List<String> childStack = Lists.newArrayList();
+      for (int i = 0; i < nodes.size(); i++) {
+        DoNode node = nodes.get(i);
+        List<String> subStack = Lists.newArrayList();
+        visit(node, subStack);
+        if (!subStack.isEmpty()) {
+          childStack.add("[" + JOINER.join(subStack) + "]");
+        }
+      }
+      if (!childStack.isEmpty()) {
+        stack.add("[" + CHILD_JOINER.join(childStack) + "]");
+      }
+    }
+  }
+
+  private void visit(DoNode node, List<String> stack) {
+    String name = node.getName();
+    if (!name.isEmpty()) {
+      stack.add(node.getName());
+    }
+    visit(node.getChildren(), stack);
+  }
+
+  public String build() {
+    return String.format("%s: %s", pipelineName, JOINER.join(rootStack));
+  }
+}


Mime
View raw message