crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [8/9] Generalizing Crunch's Collection APIs to support more execution frameworks
Date Wed, 11 Dec 2013 20:47:54 GMT
http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
new file mode 100644
index 0000000..32f9991
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import com.google.common.collect.Lists;
+import org.apache.crunch.CachingOptions;
+import org.apache.crunch.FilterFn;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PObject;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.lib.Cogroup;
+import org.apache.crunch.lib.Join;
+import org.apache.crunch.lib.PTables;
+import org.apache.crunch.materialize.MaterializableMap;
+import org.apache.crunch.materialize.pobject.MapPObject;
+import org.apache.crunch.types.PType;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> implements PTable<K, V> {
+
+  public PTableBase(String name, DistributedPipeline pipeline) {
+    super(name, pipeline);
+  }
+
+  public PTableBase(String name, DistributedPipeline pipeline, ParallelDoOptions options) {
+    super(name, pipeline, options);
+  }
+  
+  public PType<K> getKeyType() {
+    return getPTableType().getKeyType();
+  }
+
+  public PType<V> getValueType() {
+    return getPTableType().getValueType();
+  }
+
+  public BaseGroupedTable<K, V> groupByKey() {
+    return pipeline.getFactory().createGroupedTable(this, GroupingOptions.builder().build());
+  }
+
+  public BaseGroupedTable<K, V> groupByKey(int numReduceTasks) {
+    return pipeline.getFactory().createGroupedTable(
+        this, GroupingOptions.builder().numReducers(numReduceTasks).build());
+  }
+
+  public BaseGroupedTable<K, V> groupByKey(GroupingOptions groupingOptions) {
+    return pipeline.getFactory().createGroupedTable(this, groupingOptions);
+  }
+
+  @Override
+  public PTable<K, V> union(PTable<K, V> other) {
+    return union(new PTable[] { other });
+  }
+  
+  @Override
+  public PTable<K, V> union(PTable<K, V>... others) {
+    List<PTableBase<K, V>> internal = Lists.newArrayList();
+    internal.add(this);
+    for (PTable<K, V> table : others) {
+      internal.add((PTableBase<K, V>) table);
+    }
+    return pipeline.getFactory().createUnionTable(internal);
+  }
+
+  @Override
+  public PTable<K, V> write(Target target) {
+    if (getMaterializedAt() != null) {
+      getPipeline().write(pipeline.getFactory().createInputTable(
+          (TableSource<K, V>) getMaterializedAt(), pipeline), target);
+    } else {
+      getPipeline().write(this, target);
+    }
+    return this;
+  }
+
+  @Override
+  public PTable<K, V> write(Target target, Target.WriteMode writeMode) {
+    if (getMaterializedAt() != null) {
+      getPipeline().write(pipeline.getFactory().createInputTable(
+          (TableSource<K, V>) getMaterializedAt(), pipeline), target, writeMode);
+    } else {
+      getPipeline().write(this, target, writeMode);
+    }
+    return this;
+  }
+
+  @Override
+  public PTable<K, V> cache() {
+    return cache(CachingOptions.DEFAULT);
+  }
+
+  @Override
+  public PTable<K, V> cache(CachingOptions options) {
+    pipeline.cache(this, options);
+    return this;
+  }
+
+  @Override
+  public PTable<K, V> filter(FilterFn<Pair<K, V>> filterFn) {
+    return parallelDo(filterFn, getPTableType());
+  }
+  
+  @Override
+  public PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn) {
+    return parallelDo(name, filterFn, getPTableType());
+  }
+  
+  @Override
+  public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype) {
+    return PTables.mapValues(this, mapFn, ptype);
+  }
+  
+  @Override
+  public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype) {
+    return PTables.mapValues(name, this, mapFn, ptype);
+  }
+  
+  @Override
+  public <K2> PTable<K2, V> mapKeys(MapFn<K, K2> mapFn, PType<K2> ptype) {
+    return PTables.mapKeys(this, mapFn, ptype);
+  }
+  
+  @Override
+  public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype) {
+    return PTables.mapKeys(name, this, mapFn, ptype);
+  }
+  
+  @Override
+  public PTable<K, V> top(int count) {
+    return Aggregate.top(this, count, true);
+  }
+
+  @Override
+  public PTable<K, V> bottom(int count) {
+    return Aggregate.top(this, count, false);
+  }
+
+  @Override
+  public PTable<K, Collection<V>> collectValues() {
+    return Aggregate.collectValues(this);
+  }
+
+  @Override
+  public <U> PTable<K, Pair<V, U>> join(PTable<K, U> other) {
+    return Join.join(this, other);
+  }
+
+  @Override
+  public <U> PTable<K, Pair<Collection<V>, Collection<U>>> cogroup(PTable<K, U> other) {
+    return Cogroup.cogroup(this, other);
+  }
+
+  @Override
+  public PCollection<K> keys() {
+    return PTables.keys(this);
+  }
+
+  @Override
+  public PCollection<V> values() {
+    return PTables.values(this);
+  }
+
+  /**
+   * Returns a Map<K, V> made up of the keys and values in this PTable.
+   */
+  @Override
+  public Map<K, V> materializeToMap() {
+    return new MaterializableMap<K, V>(this.materialize());
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public PObject<Map<K, V>> asMap() {
+    return new MapPObject<K, V>(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 0a4dbea..ced1700 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -28,6 +28,7 @@ import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.io.DatumWriter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CachingOptions;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
@@ -306,6 +307,11 @@ public class MemPipeline implements Pipeline {
   }
 
   @Override
+  public <T> void cache(PCollection<T> pcollection, CachingOptions options) {
+    // No-op
+  }
+
+  @Override
   public PipelineExecution runAsync() {
     activeTargets.clear();
     return new MemExecution();

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index e8e34f2..bbcdc0b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -25,6 +25,7 @@ import javassist.util.proxy.MethodFilter;
 import javassist.util.proxy.MethodHandler;
 import javassist.util.proxy.ProxyFactory;
 
+import org.apache.crunch.CachingOptions;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.FilterFn;
 import org.apache.crunch.MapFn;
@@ -160,6 +161,18 @@ public class MemCollection<S> implements PCollection<S> {
     return collect;
   }
 
+  @Override
+  public PCollection<S> cache() {
+    // No-op
+    return this;
+  }
+
+  @Override
+  public PCollection<S> cache(CachingOptions options) {
+    // No-op
+    return this;
+  }
+
   /** {@inheritDoc} */
   @Override
   public PObject<Collection<S>> asCollection() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
index a212ed1..60279a9 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.crunch.CachingOptions;
 import org.apache.crunch.FilterFn;
 import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.MapFn;
@@ -97,7 +98,19 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<
     getPipeline().write(this, target, writeMode);
     return this;
   }
-  
+
+  @Override
+  public PTable<K, V> cache() {
+    // No-op
+    return this;
+  }
+
+  @Override
+  public PTable<K, V> cache(CachingOptions options) {
+    // No-op
+    return this;
+  }
+
   @Override
   public PTableType<K, V> getPTableType() {
     return ptype;

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java
index 6976615..93988e6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/emit/InMemoryEmitter.java
@@ -48,7 +48,7 @@ public class InMemoryEmitter<T> implements Emitter<T> {
 
   @Override
   public void flush() {
-
+    output.clear();
   }
 
   public List<T> getOutput() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index 3c2ab77..01a3ead 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -19,65 +19,33 @@ package org.apache.crunch.impl.mr;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.Random;
-import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CachingOptions;
 import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pipeline;
 import org.apache.crunch.PipelineExecution;
 import org.apache.crunch.PipelineResult;
-import org.apache.crunch.Source;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.TableSource;
-import org.apache.crunch.Target;
-import org.apache.crunch.Target.WriteMode;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.impl.mr.collect.InputCollection;
-import org.apache.crunch.impl.mr.collect.InputTable;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
-import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
-import org.apache.crunch.impl.mr.collect.UnionCollection;
-import org.apache.crunch.impl.mr.collect.UnionTable;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.MRCollectionFactory;
 import org.apache.crunch.impl.mr.exec.MRExecutor;
 import org.apache.crunch.impl.mr.plan.MSCRPlanner;
-import org.apache.crunch.impl.mr.run.RuntimeParameters;
-import org.apache.crunch.io.From;
 import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.crunch.io.To;
 import org.apache.crunch.materialize.MaterializableIterable;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 /**
  * Pipeline implementation that is executed within Hadoop MapReduce.
  */
-public class MRPipeline implements Pipeline {
+public class MRPipeline extends DistributedPipeline {
 
   private static final Log LOG = LogFactory.getLog(MRPipeline.class);
 
-  private static final Random RANDOM = new Random();
-
   private final Class<?> jarClass;
-  private final String name;
-  private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
-  private final Map<PCollectionImpl<?>, MaterializableIterable<?>> outputTargetsToMaterialize;
-  private Path tempDirectory;
-  private int tempFileIndex;
-  private int nextAnonymousStageId;
-
-  private Configuration conf;
 
   /**
    * Instantiate with a default Configuration and name.
@@ -117,25 +85,8 @@ public class MRPipeline implements Pipeline {
    * @param conf Configuration to be used within all MapReduce jobs run in the pipeline
    */
   public MRPipeline(Class<?> jarClass, String name, Configuration conf) {
+    super(name, conf, new MRCollectionFactory());
     this.jarClass = jarClass;
-    this.name = name;
-    this.outputTargets = Maps.newHashMap();
-    this.outputTargetsToMaterialize = Maps.newHashMap();
-    this.conf = conf;
-    this.tempDirectory = createTempDirectory(conf);
-    this.tempFileIndex = 0;
-    this.nextAnonymousStageId = 0;
-  }
-
-  @Override
-  public Configuration getConfiguration() {
-    return conf;
-  }
-
-  @Override
-  public void setConfiguration(Configuration conf) {
-    this.conf = conf;
-    this.tempDirectory = createTempDirectory(conf);
   }
 
   public MRExecutor plan() {
@@ -148,7 +99,7 @@ public class MRPipeline implements Pipeline {
     }
     MSCRPlanner planner = new MSCRPlanner(this, outputTargets, toMaterialize);
     try {
-      return planner.plan(jarClass, conf);
+      return planner.plan(jarClass, getConfiguration());
     } catch (IOException e) {
       throw new CrunchRuntimeException(e);
     }
@@ -175,236 +126,19 @@ public class MRPipeline implements Pipeline {
   }
 
   @Override
-  public PipelineResult done() {
-    PipelineResult res = null;
-    if (!outputTargets.isEmpty()) {
-      res = run();
-    }
-    cleanup();
-    return res;
-  }
-
-  public <S> PCollection<S> read(Source<S> source) {
-    return new InputCollection<S>(source, this);
-  }
-
-  public <K, V> PTable<K, V> read(TableSource<K, V> source) {
-    return new InputTable<K, V>(source, this);
-  }
-
-  public PCollection<String> readTextFile(String pathName) {
-    return read(From.textFile(pathName));
-  }
-
-  public void write(PCollection<?> pcollection, Target target) {
-    write(pcollection, target, Target.WriteMode.DEFAULT);
-  }
-  
-  @SuppressWarnings("unchecked")
-  public void write(PCollection<?> pcollection, Target target,
-      Target.WriteMode writeMode) {
-    if (pcollection instanceof PGroupedTableImpl) {
-      pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup();
-    } else if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable) {
-      pcollection = pcollection.parallelDo("UnionCollectionWrapper",
-          (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
-    }
-    boolean exists = target.handleExisting(writeMode, ((PCollectionImpl) pcollection).getLastModifiedAt(),
-        getConfiguration());
-    if (exists && writeMode == WriteMode.CHECKPOINT) {
-      SourceTarget<?> st = target.asSourceTarget(pcollection.getPType());
-      if (st == null) {
-        throw new CrunchRuntimeException("Target " + target + " does not support checkpointing");
-      } else {
-        ((PCollectionImpl) pcollection).materializeAt(st);
-      }
-      return;
-    } else if (writeMode != WriteMode.APPEND && targetInCurrentRun(target)) {
-      throw new CrunchRuntimeException("Target " + target + " is already written in current run." +
-          " Use WriteMode.APPEND in order to write additional data to it.");
-    }
-    addOutput((PCollectionImpl<?>) pcollection, target);
-  }
-
-  private boolean targetInCurrentRun(Target target) {
-    for (Set<Target> targets : outputTargets.values()) {
-      if (targets.contains(target)) {
-        return true;
-      }
-    }
-    return false;
-  }
-  
-  private void addOutput(PCollectionImpl<?> impl, Target target) {
-    if (!outputTargets.containsKey(impl)) {
-      outputTargets.put(impl, Sets.<Target> newHashSet());
-    }
-    outputTargets.get(impl).add(target);
-  }
-
-  @Override
   public <T> Iterable<T> materialize(PCollection<T> pcollection) {
     ((PCollectionImpl) pcollection).setBreakpoint();
-    PCollectionImpl<T> pcollectionImpl = toPcollectionImpl(pcollection);
-    ReadableSource<T> readableSrc = getMaterializeSourceTarget(pcollectionImpl);
-
+    ReadableSource<T> readableSrc = getMaterializeSourceTarget(pcollection);
     MaterializableIterable<T> c = new MaterializableIterable<T>(this, readableSrc);
-    if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) {
-      outputTargetsToMaterialize.put(pcollectionImpl, c);
+    if (!outputTargetsToMaterialize.containsKey(pcollection)) {
+      outputTargetsToMaterialize.put((PCollectionImpl) pcollection, c);
     }
     return c;
   }
 
-  /**
-   * Retrieve a ReadableSourceTarget that provides access to the contents of a {@link PCollection}.
-   * This is primarily intended as a helper method to {@link #materialize(PCollection)}. The
-   * underlying data of the ReadableSourceTarget may not be actually present until the pipeline is
-   * run.
-   * 
-   * @param pcollection The collection for which the ReadableSourceTarget is to be retrieved
-   * @return The ReadableSourceTarget
-   * @throws IllegalArgumentException If no ReadableSourceTarget can be retrieved for the given
-   *           PCollection
-   */
-  public <T> ReadableSource<T> getMaterializeSourceTarget(PCollection<T> pcollection) {
-    PCollectionImpl<T> impl = toPcollectionImpl(pcollection);
-
-    // First, check to see if this is a readable input collection.
-    if (impl instanceof InputCollection) {
-      InputCollection<T> ic = (InputCollection<T>) impl;
-      if (ic.getSource() instanceof ReadableSource) {
-        return (ReadableSource) ic.getSource();
-      } else {
-        throw new IllegalArgumentException(
-            "Cannot materialize non-readable input collection: " + ic);
-      }
-    } else if (impl instanceof InputTable) {
-      InputTable it = (InputTable) impl;
-      if (it.getSource() instanceof ReadableSource) {
-        return (ReadableSource) it.getSource();
-      } else {
-        throw new IllegalArgumentException(
-            "Cannot materialize non-readable input table: " + it);
-      }
-    }
-
-    // Next, check to see if this pcollection has already been materialized.
-    SourceTarget<T> matTarget = impl.getMaterializedAt();
-    if (matTarget != null && matTarget instanceof ReadableSourceTarget) {
-      return (ReadableSourceTarget<T>) matTarget;
-    }
-    
-    // Check to see if we plan on materializing this collection on the
-    // next run.
-    ReadableSourceTarget<T> srcTarget = null;
-    if (outputTargets.containsKey(pcollection)) {
-      for (Target target : outputTargets.get(impl)) {
-        if (target instanceof ReadableSourceTarget) {
-          return (ReadableSourceTarget<T>) target;
-        }
-      }
-    }
-
-    // If we're not planning on materializing it already, create a temporary
-    // output to hold the materialized records and return that.
-    SourceTarget<T> st = createIntermediateOutput(pcollection.getPType());
-    if (!(st instanceof ReadableSourceTarget)) {
-      throw new IllegalArgumentException("The PType for the given PCollection is not readable"
-          + " and cannot be materialized");
-    } else {
-      srcTarget = (ReadableSourceTarget<T>) st;
-      addOutput(impl, srcTarget);
-      return srcTarget;
-    }
-  }
-
-  /**
-   * Safely cast a PCollection into a PCollectionImpl, including handling the case of
-   * UnionCollections.
-   * 
-   * @param pcollection The PCollection to be cast/transformed
-   * @return The PCollectionImpl representation
-   */
-  private <T> PCollectionImpl<T> toPcollectionImpl(PCollection<T> pcollection) {
-    PCollectionImpl<T> pcollectionImpl = null;
-    if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable) {
-      pcollectionImpl = (PCollectionImpl<T>) pcollection.parallelDo("UnionCollectionWrapper",
-          (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
-    } else {
-      pcollectionImpl = (PCollectionImpl<T>) pcollection;
-    }
-    return pcollectionImpl;
-  }
-
-  public <T> SourceTarget<T> createIntermediateOutput(PType<T> ptype) {
-    return ptype.getDefaultFileSource(createTempPath());
-  }
-
-  public Path createTempPath() {
-    tempFileIndex++;
-    return new Path(tempDirectory, "p" + tempFileIndex);
-  }
-
-  private static Path createTempDirectory(Configuration conf) {
-    Path dir = createTemporaryPath(conf);
-    try {
-      dir.getFileSystem(conf).mkdirs(dir);
-    } catch (IOException e) {
-      throw new RuntimeException("Cannot create job output directory " + dir, e);
-    }
-    return dir;
-  }
-
-  private static Path createTemporaryPath(Configuration conf) {
-    String baseDir = conf.get(RuntimeParameters.TMP_DIR, "/tmp");
-    return new Path(baseDir, "crunch-" + (RANDOM.nextInt() & Integer.MAX_VALUE));
-  }
-
-  @Override
-  public <T> void writeTextFile(PCollection<T> pcollection, String pathName) {
-    pcollection.parallelDo("asText", new StringifyFn<T>(), Writables.strings())
-        .write(To.textFile(pathName));
-  }
-
-  private static class StringifyFn<T> extends MapFn<T, String> {
-    @Override
-    public String map(T input) {
-      return input.toString();
-    }
-  }
-
-  @Override
-  public void cleanup(boolean force) {
-    if (force || outputTargets.isEmpty()) {
-      try {
-        FileSystem fs = tempDirectory.getFileSystem(conf);
-        if (fs.exists(tempDirectory)) {
-          fs.delete(tempDirectory, true);
-        }
-      } catch (IOException e) {
-        LOG.info("Exception during cleanup", e);
-      }
-    } else {
-      LOG.warn("Not running cleanup while output targets remain.");
-    }
-  }
-
-  private void cleanup() {
-    cleanup(false);
-  }
-
-  public int getNextAnonymousStageId() {
-    return nextAnonymousStageId++;
-  }
-
-  @Override
-  public void enableDebug() {
-    // Turn on Crunch runtime error catching.
-    getConfiguration().setBoolean(RuntimeParameters.DEBUG, true);
-  }
-
   @Override
-  public String getName() {
-    return name;
+  public <T> void cache(PCollection<T> pcollection, CachingOptions options) {
+    // Identical to materialization in a MapReduce context
+    materialize(pcollection);
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DelegatingReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DelegatingReadableData.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DelegatingReadableData.java
deleted file mode 100644
index ce5399b..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DelegatingReadableData.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.collect;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.ReadableData;
-import org.apache.crunch.SourceTarget;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Set;
-
-/**
- * Implements the {@code ReadableData<T>} interface by delegating to an {@code ReadableData<S>} instance
- * and passing its contents through a {@code DoFn<S, T>}.
- */
-class DelegatingReadableData<S, T> implements ReadableData<T> {
-
-  private final ReadableData<S> delegate;
-  private final DoFn<S, T> fn;
-
-  public DelegatingReadableData(ReadableData<S> delegate, DoFn<S, T> fn) {
-    this.delegate = delegate;
-    this.fn = fn;
-  }
-
-  @Override
-  public Set<SourceTarget<?>> getSourceTargets() {
-    return delegate.getSourceTargets();
-  }
-
-  @Override
-  public void configure(Configuration conf) {
-    delegate.configure(conf);
-    fn.configure(conf);
-  }
-
-  @Override
-  public Iterable<T> read(TaskInputOutputContext<?, ?, ?, ?> context) throws IOException {
-    fn.setContext(context);
-    fn.initialize();
-    final Iterable<S> delegateIterable = delegate.read(context);
-    return new Iterable<T>() {
-      @Override
-      public Iterator<T> iterator() {
-        return new DoFnIterator<S, T>(delegateIterable.iterator(), fn);
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollection.java
new file mode 100644
index 0000000..f791fb2
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollection.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import org.apache.crunch.CachingOptions;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.impl.dist.collect.BaseDoCollection;
+import org.apache.crunch.impl.dist.collect.MRCollection;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.plan.DoNode;
+import org.apache.crunch.types.PType;
+
+public class DoCollection<S> extends BaseDoCollection<S> implements MRCollection {
+
+  <T> DoCollection(
+      String name,
+      PCollectionImpl<T> parent,
+      DoFn<T, S> fn,
+      PType<S> ptype,
+      ParallelDoOptions options) {
+    super(name, parent, fn, ptype, options);
+  }
+
+  @Override
+  public DoNode createDoNode() {
+    return DoNode.createFnNode(getName(), fn, ptype, doOptions);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
deleted file mode 100644
index c0a761a..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.collect;
-
-import java.util.List;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.ParallelDoOptions;
-import org.apache.crunch.ReadableData;
-import org.apache.crunch.impl.mr.plan.DoNode;
-import org.apache.crunch.types.PType;
-
-import com.google.common.collect.ImmutableList;
-
-public class DoCollectionImpl<S> extends PCollectionImpl<S> {
-
-  private final PCollectionImpl<Object> parent;
-  private final DoFn<Object, S> fn;
-  private final PType<S> ntype;
-
-  <T> DoCollectionImpl(String name, PCollectionImpl<T> parent, DoFn<T, S> fn, PType<S> ntype) {
-    this(name, parent, fn, ntype, ParallelDoOptions.builder().build());
-  }
-  
-  <T> DoCollectionImpl(String name, PCollectionImpl<T> parent, DoFn<T, S> fn, PType<S> ntype,
-      ParallelDoOptions options) {
-    super(name, options);
-    this.parent = (PCollectionImpl<Object>) parent;
-    this.fn = (DoFn<Object, S>) fn;
-    this.ntype = ntype;
-  }
-
-  @Override
-  protected long getSizeInternal() {
-    return (long) (fn.scaleFactor() * parent.getSize());
-  }
-
-  @Override
-  protected ReadableData<S> getReadableDataInternal() {
-    if (getOnlyParent() instanceof PGroupedTableImpl) {
-      return materializedData();
-    }
-    return new DelegatingReadableData(getOnlyParent().asReadable(false), fn);
-  }
-
-  @Override
-  public PType<S> getPType() {
-    return ntype;
-  }
-
-  @Override
-  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
-    visitor.visitDoFnCollection(this);
-  }
-
-  @Override
-  public List<PCollectionImpl<?>> getParents() {
-    return ImmutableList.<PCollectionImpl<?>> of(parent);
-  }
-
-  @Override
-  public DoNode createDoNode() {
-    return DoNode.createFnNode(getName(), fn, ntype, doOptions);
-  }
-
-  @Override
-  public long getLastModifiedAt() {
-    return parent.getLastModifiedAt();
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoFnIterator.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoFnIterator.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoFnIterator.java
deleted file mode 100644
index ff102eb..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoFnIterator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.collect;
-
-import com.google.common.collect.Lists;
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-
-/**
- * An {@code Iterator<T>} that combines a delegate {@code Iterator<S>} and a {@code DoFn<S, T>}, generating
- * data by passing the contents of the iterator through the function. Note that the input {@code DoFn} should
- * have both its {@code setContext} and {@code initialize} functions called <b>before</b> it is passed to
- * the constructor.
- *
- * @param <S> The type of the delegate iterator
- * @param <T> The returned type
- */
-class DoFnIterator<S, T> implements Iterator<T> {
-
-  private final Iterator<S> iter;
-  private final DoFn<S, T> fn;
-  private CacheEmitter<T> cache;
-  private boolean cleanup;
-
-  public DoFnIterator(Iterator<S> iter, DoFn<S, T> fn) {
-    this.iter = iter;
-    this.fn = fn;
-    this.cache = new CacheEmitter<T>();
-    this.cleanup = false;
-  }
-
-  @Override
-  public boolean hasNext() {
-    while (cache.isEmpty() && iter.hasNext()) {
-      fn.process(iter.next(), cache);
-    }
-    if (cache.isEmpty() && !cleanup) {
-      fn.cleanup(cache);
-      cleanup = true;
-    }
-    return !cache.isEmpty();
-  }
-
-  @Override
-  public T next() {
-    return cache.poll();
-  }
-
-  @Override
-  public void remove() {
-    throw new UnsupportedOperationException();
-  }
-
-  private static class CacheEmitter<T> implements Emitter<T> {
-
-    private final LinkedList<T> cache;
-
-    private CacheEmitter() {
-      this.cache = Lists.newLinkedList();
-    }
-
-    public boolean isEmpty() {
-      return cache.isEmpty();
-    }
-
-    public T poll() {
-      return cache.poll();
-    }
-
-    @Override
-    public void emit(T emitted) {
-      cache.add(emitted);
-    }
-
-    @Override
-    public void flush() {
-      // No-op
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTable.java
new file mode 100644
index 0000000..3f1803f
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTable.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.impl.dist.collect.BaseDoTable;
+import org.apache.crunch.impl.dist.collect.MRCollection;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.plan.DoNode;
+import org.apache.crunch.types.PTableType;
+
+public class DoTable<K, V> extends BaseDoTable<K, V> implements MRCollection {
+
+  <S> DoTable(String name, PCollectionImpl<S> parent,
+              DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype, ParallelDoOptions options) {
+    super(name, parent, fn, ntype, options);
+  }
+
+  <S> DoTable(String name, PCollectionImpl<S> parent, CombineFn<K, V> combineFn,
+              DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype) {
+     super(name, parent, combineFn, fn, ntype);
+  }
+
+  @Override
+  public DoNode createDoNode() {
+    return DoNode.createFnNode(getName(), fn, type, doOptions);
+  }
+
+  public DoNode createCombineNode() {
+    return DoNode.createFnNode(getName(), combineFn, type, doOptions);
+  }
+  
+  public boolean hasCombineFn() {
+    return combineFn != null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
deleted file mode 100644
index f843945..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.collect;
-
-import java.util.List;
-
-import org.apache.crunch.CombineFn;
-import org.apache.crunch.DoFn;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.ParallelDoOptions;
-import org.apache.crunch.ReadableData;
-import org.apache.crunch.impl.mr.plan.DoNode;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-
-import com.google.common.collect.ImmutableList;
-
-public class DoTableImpl<K, V> extends PTableBase<K, V> implements PTable<K, V> {
-
-  private final PCollectionImpl<?> parent;
-  private final DoFn<?, Pair<K, V>> combineFn;
-  private final DoFn<?, Pair<K, V>> fn;
-  private final PTableType<K, V> type;
-
-  private static <S, K, V> DoFn<S, Pair<K, V>> asCombineFn(final DoFn<S, Pair<K, V>> fn) {
-    if (fn instanceof CombineFn) {
-      return fn;
-    }
-    return null;
-  }
-
-  <S> DoTableImpl(String name, PCollectionImpl<S> parent, DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype) {
-    this(name, parent, fn, ntype, ParallelDoOptions.builder().build());
-  }
-
-  <S> DoTableImpl(String name, PCollectionImpl<S> parent, DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype,
-                  ParallelDoOptions options) {
-    this(name, parent, asCombineFn(fn), fn, ntype, options);
-  }
-
-  <S> DoTableImpl(final String name, final PCollectionImpl<S> parent, final DoFn<S, Pair<K, V>> combineFn,
-                  final DoFn<S, Pair<K, V>> fn, final PTableType<K, V> ntype) {
-    this(name, parent, combineFn, fn, ntype, ParallelDoOptions.builder().build());
-  }
-
-  <S> DoTableImpl(final String name, final PCollectionImpl<S> parent, final DoFn<S, Pair<K, V>> combineFn,
-                  final DoFn<S, Pair<K, V>> fn, final PTableType<K, V> ntype, final ParallelDoOptions options) {
-    super(name, options);
-    this.parent = parent;
-    this.combineFn = combineFn;
-    this.fn = fn;
-    this.type = ntype;
-  }
-
-  @Override
-  protected long getSizeInternal() {
-    return (long) (fn.scaleFactor() * parent.getSize());
-  }
-
-  @Override
-  public PTableType<K, V> getPTableType() {
-    return type;
-  }
-
-  @Override
-  protected ReadableData<Pair<K, V>> getReadableDataInternal() {
-    if (getOnlyParent() instanceof PGroupedTableImpl) {
-      return materializedData();
-    }
-    return new DelegatingReadableData(getOnlyParent().asReadable(false), fn);
-  }
-
-  @Override
-  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
-    visitor.visitDoTable(this);
-  }
-
-  @Override
-  public PType<Pair<K, V>> getPType() {
-    return type;
-  }
-
-  @Override
-  public List<PCollectionImpl<?>> getParents() {
-    return ImmutableList.<PCollectionImpl<?>> of(parent);
-  }
-
-  @Override
-  public DoNode createDoNode() {
-    return DoNode.createFnNode(getName(), fn, type, doOptions);
-  }
-
-  public DoNode createCombineNode() {
-    return DoNode.createFnNode(getName(), combineFn, type, doOptions);
-  }
-  
-  public boolean hasCombineFn() {
-    return combineFn != null;
-  }
-  
-  @Override
-  public long getLastModifiedAt() {
-    return parent.getLastModifiedAt();
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
index c801e7f..42d9df2 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
@@ -17,27 +17,18 @@
  */
 package org.apache.crunch.impl.mr.collect;
 
-import java.util.List;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.crunch.Pair;
 import org.apache.crunch.ReadableData;
 import org.apache.crunch.Source;
+import org.apache.crunch.impl.dist.collect.BaseInputCollection;
+import org.apache.crunch.impl.dist.collect.MRCollection;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.types.PType;
-
-import com.google.common.collect.ImmutableList;
 
-public class InputCollection<S> extends PCollectionImpl<S> {
-
-  private final Source<S> source;
+public class InputCollection<S> extends BaseInputCollection<S> implements MRCollection {
 
   public InputCollection(Source<S> source, MRPipeline pipeline) {
-    super(source.toString());
-    this.source = source;
-    this.pipeline = pipeline;
+    super(source, pipeline);
   }
 
   @Override
@@ -50,53 +41,7 @@ public class InputCollection<S> extends PCollectionImpl<S> {
   }
 
   @Override
-  public PType<S> getPType() {
-    return source.getType();
-  }
-
-  public Source<S> getSource() {
-    return source;
-  }
-
-  @Override
-  protected long getSizeInternal() {
-    long sz = source.getSize(pipeline.getConfiguration());
-    if (sz < 0) {
-      throw new IllegalStateException("Input source " + source + " does not exist!");
-    }
-    return sz;
-  }
-
-  @Override
-  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
-    visitor.visitInputCollection(this);
-  }
-
-  @Override
-  public List<PCollectionImpl<?>> getParents() {
-    return ImmutableList.of();
-  }
-
-  @Override
   public DoNode createDoNode() {
     return DoNode.createInputNode(source);
   }
-
-  @Override
-  public long getLastModifiedAt() {
-    return source.getLastModifiedAt(pipeline.getConfiguration());
-  }
-  
-  @Override
-  public boolean equals(Object obj) {
-    if (obj == null || !(obj instanceof InputCollection)) {
-      return false;
-    }
-    return source.equals(((InputCollection) obj).source);
-  }
-
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder().append(source).toHashCode();
-  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
index bdd5c64..fb550fa 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
@@ -17,82 +17,20 @@
  */
 package org.apache.crunch.impl.mr.collect;
 
-import java.util.List;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.ReadableData;
 import org.apache.crunch.TableSource;
+import org.apache.crunch.impl.dist.collect.BaseInputTable;
+import org.apache.crunch.impl.dist.collect.MRCollection;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.plan.DoNode;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-
-import com.google.common.collect.ImmutableList;
 
-public class InputTable<K, V> extends PTableBase<K, V> {
-
-  private final TableSource<K, V> source;
-  private final InputCollection<Pair<K, V>> asCollection;
+public class InputTable<K, V> extends BaseInputTable<K, V> implements MRCollection {
 
   public InputTable(TableSource<K, V> source, MRPipeline pipeline) {
-    super(source.toString());
-    this.source = source;
-    this.pipeline = pipeline;
-    this.asCollection = new InputCollection<Pair<K, V>>(source, pipeline);
-  }
-
-  public TableSource<K, V> getSource() {
-    return source;
-  }
-  
-  @Override
-  protected long getSizeInternal() {
-    return asCollection.getSizeInternal();
-  }
-
-  @Override
-  public PTableType<K, V> getPTableType() {
-    return source.getTableType();
-  }
-
-
-  @Override
-  public PType<Pair<K, V>> getPType() {
-    return source.getType();
-  }
-
-  @Override
-  public List<PCollectionImpl<?>> getParents() {
-    return ImmutableList.of();
-  }
-
-  @Override
-  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
-    visitor.visitInputCollection(asCollection);
-  }
-
-  @Override
-  protected ReadableData<Pair<K, V>> getReadableDataInternal() {
-    return asCollection.getReadableDataInternal();
+    super(source, pipeline);
   }
 
   @Override
   public DoNode createDoNode() {
     return DoNode.createInputNode(source);
   }
-
-  @Override
-  public long getLastModifiedAt() {
-    return source.getLastModifiedAt(pipeline.getConfiguration());
-  }
-  
-  @Override
-  public int hashCode() {
-    return asCollection.hashCode();
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    return asCollection.equals(other);
-  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java
new file mode 100644
index 0000000..1e94c53
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/MRCollectionFactory.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.Source;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.impl.dist.collect.BaseDoCollection;
+import org.apache.crunch.impl.dist.collect.BaseDoTable;
+import org.apache.crunch.impl.dist.collect.BaseGroupedTable;
+import org.apache.crunch.impl.dist.collect.BaseInputCollection;
+import org.apache.crunch.impl.dist.collect.BaseInputTable;
+import org.apache.crunch.impl.dist.collect.BaseUnionCollection;
+import org.apache.crunch.impl.dist.collect.PCollectionFactory;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.dist.collect.PTableBase;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.collect.DoCollection;
+import org.apache.crunch.impl.mr.collect.DoTable;
+import org.apache.crunch.impl.mr.collect.InputCollection;
+import org.apache.crunch.impl.mr.collect.InputTable;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+import org.apache.crunch.impl.mr.collect.UnionCollection;
+import org.apache.crunch.impl.mr.collect.UnionTable;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+import java.util.List;
+
+public class MRCollectionFactory implements PCollectionFactory {
+  @Override
+  public <S> BaseInputCollection<S> createInputCollection(Source<S> source, DistributedPipeline pipeline) {
+    return new InputCollection<S>(source, (MRPipeline) pipeline);
+  }
+
+  @Override
+  public <K, V> BaseInputTable<K, V> createInputTable(TableSource<K, V> source, DistributedPipeline pipeline) {
+    return new InputTable<K, V>(source, (MRPipeline) pipeline);
+  }
+
+  @Override
+  public <S> BaseUnionCollection<S> createUnionCollection(List<? extends PCollectionImpl<S>> internal) {
+    return new UnionCollection<S>(internal);
+  }
+
+  @Override
+  public <S, T> BaseDoCollection<T> createDoCollection(
+        String name,
+        PCollectionImpl<S> parent,
+        DoFn<S, T> fn,
+        PType<T> type,
+        ParallelDoOptions options) {
+    return new DoCollection<T>(name, parent, fn, type, options);
+  }
+
+  @Override
+  public <S, K, V> BaseDoTable<K, V> createDoTable(
+      String name,
+      PCollectionImpl<S> parent,
+      DoFn<S, Pair<K, V>> fn,
+      PTableType<K, V> type,
+      ParallelDoOptions options) {
+    return new DoTable<K, V>(name, parent, fn, type, options);
+  }
+
+  @Override
+  public <S, K, V> BaseDoTable<K, V> createDoTable(
+      String name,
+      PCollectionImpl<S> parent,
+      CombineFn<K, V> combineFn,
+      DoFn<S, Pair<K, V>> reduceFn,
+      PTableType<K, V> type) {
+    return new DoTable<K, V>(name, parent, combineFn, reduceFn, type);
+  }
+
+  @Override
+  public <K, V> BaseGroupedTable<K, V> createGroupedTable(PTableBase<K, V> parent, GroupingOptions groupingOptions) {
+    return new PGroupedTableImpl<K, V>(parent, groupingOptions);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> createUnionTable(List<PTableBase<K, V>> internal) {
+    return new UnionTable<K, V>(internal);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
deleted file mode 100644
index 191b11e..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.collect;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.DoFn;
-import org.apache.crunch.FilterFn;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PObject;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.ParallelDoOptions;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.ReadableData;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.Target;
-import org.apache.crunch.fn.ExtractKeyFn;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.impl.mr.plan.DoNode;
-import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.materialize.pobject.CollectionPObject;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-public abstract class PCollectionImpl<S> implements PCollection<S> {
-
-  private static final Log LOG = LogFactory.getLog(PCollectionImpl.class);
-
-  private final String name;
-  protected MRPipeline pipeline;
-  private boolean materialized;
-  protected SourceTarget<S> materializedAt;
-  protected final ParallelDoOptions doOptions;
-  private long size = -1L;
-  private boolean breakpoint;
-
-  public PCollectionImpl(String name) {
-    this(name, ParallelDoOptions.builder().build());
-  }
-  
-  public PCollectionImpl(String name, ParallelDoOptions doOptions) {
-    this.name = name;
-    this.doOptions = doOptions;
-  }
-
-  @Override
-  public String getName() {
-    return name;
-  }
-
-  @Override
-  public String toString() {
-    return getName();
-  }
-
-  @Override
-  public PCollection<S> union(PCollection<S> other) {
-    return union(new PCollection[] { other });
-  }
-  
-  @Override
-  public PCollection<S> union(PCollection<S>... collections) {
-    List<PCollectionImpl<S>> internal = Lists.newArrayList();
-    internal.add(this);
-    for (PCollection<S> collection : collections) {
-      internal.add((PCollectionImpl<S>) collection.parallelDo(IdentityFn.<S>getInstance(), collection.getPType()));
-    }
-    return new UnionCollection<S>(internal);
-  }
-
-  @Override
-  public <T> PCollection<T> parallelDo(DoFn<S, T> fn, PType<T> type) {
-    MRPipeline pipeline = (MRPipeline) getPipeline();
-    return parallelDo("S" + pipeline.getNextAnonymousStageId(), fn, type);
-  }
-
-  @Override
-  public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type) {
-    return new DoCollectionImpl<T>(name, getChainingCollection(), fn, type);
-  }
-  
-  @Override
-  public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type,
-      ParallelDoOptions options) {
-    return new DoCollectionImpl<T>(name, getChainingCollection(), fn, type, options);
-  }
-  
-  @Override
-  public <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    MRPipeline pipeline = (MRPipeline) getPipeline();
-    return parallelDo("S" + pipeline.getNextAnonymousStageId(), fn, type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return new DoTableImpl<K, V>(name, getChainingCollection(), fn, type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> fn, PTableType<K, V> type,
-      ParallelDoOptions options) {
-    return new DoTableImpl<K, V>(name, getChainingCollection(), fn, type, options);
-  }
-
-  public PCollection<S> write(Target target) {
-    if (materializedAt != null) {
-      getPipeline().write(new InputCollection<S>(materializedAt, (MRPipeline) getPipeline()), target);
-    } else {
-      getPipeline().write(this, target);
-    }
-    return this;
-  }
-
-  @Override
-  public PCollection<S> write(Target target, Target.WriteMode writeMode) {
-    if (materializedAt != null) {
-      getPipeline().write(new InputCollection<S>(materializedAt, (MRPipeline) getPipeline()), target,
-          writeMode);
-    } else {
-      getPipeline().write(this, target, writeMode);
-    }
-    return this;
-  }
-  
-  @Override
-  public Iterable<S> materialize() {
-    if (getSize() == 0) {
-      LOG.warn("Materializing an empty PCollection: " + this.getName());
-      return Collections.emptyList();
-    }
-    materialized = true;
-    return getPipeline().materialize(this);
-  }
-
-  public void setBreakpoint() {
-    this.breakpoint = true;
-  }
-
-  public boolean isBreakpoint() {
-    return breakpoint;
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public PObject<Collection<S>> asCollection() {
-    return new CollectionPObject<S>(this);
-  }
-
-  public SourceTarget<S> getMaterializedAt() {
-    return materializedAt;
-  }
-
-  public void materializeAt(SourceTarget<S> sourceTarget) {
-    this.materializedAt = sourceTarget;
-    this.size = materializedAt.getSize(getPipeline().getConfiguration());
-  }
-
-  @Override
-  public PCollection<S> filter(FilterFn<S> filterFn) {
-    return parallelDo(filterFn, getPType());
-  }
-
-  @Override
-  public PCollection<S> filter(String name, FilterFn<S> filterFn) {
-    return parallelDo(name, filterFn, getPType());
-  }
-
-  @Override
-  public <K> PTable<K, S> by(MapFn<S, K> mapFn, PType<K> keyType) {
-    return parallelDo(new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
-  }
-
-  @Override
-  public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) {
-    return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
-  }
-
-  @Override
-  public PTable<S, Long> count() {
-    return Aggregate.count(this);
-  }
-
-  @Override
-  public PObject<Long> length() {
-    return Aggregate.length(this);
-  }
-
-  @Override
-  public PObject<S> max() {
-    return Aggregate.max(this);
-  }
-
-  @Override
-  public PObject<S> min() {
-    return Aggregate.min(this);
-  }
-
-  @Override
-  public PTypeFamily getTypeFamily() {
-    return getPType().getFamily();
-  }
-
-  public abstract DoNode createDoNode();
-
-  public abstract List<PCollectionImpl<?>> getParents();
-
-  public PCollectionImpl<?> getOnlyParent() {
-    List<PCollectionImpl<?>> parents = getParents();
-    if (parents.size() != 1) {
-      throw new IllegalArgumentException("Expected exactly one parent PCollection");
-    }
-    return parents.get(0);
-  }
-
-  @Override
-  public Pipeline getPipeline() {
-    if (pipeline == null) {
-      pipeline = (MRPipeline) getParents().get(0).getPipeline();
-    }
-    return pipeline;
-  }
-
-  public ParallelDoOptions getParallelDoOptions() {
-    return doOptions;
-  }
-
-  public Set<SourceTarget<?>> getTargetDependencies() {
-    Set<SourceTarget<?>> targetDeps = doOptions.getSourceTargets();
-    for (PCollectionImpl<?> parent : getParents()) {
-      targetDeps = Sets.union(targetDeps, parent.getTargetDependencies());
-    }
-    return targetDeps;
-  }
-  
-  public int getDepth() {
-    int parentMax = 0;
-    for (PCollectionImpl parent : getParents()) {
-      parentMax = Math.max(parent.getDepth(), parentMax);
-    }
-    return 1 + parentMax;
-  }
-
-  public interface Visitor {
-    void visitInputCollection(InputCollection<?> collection);
-
-    void visitUnionCollection(UnionCollection<?> collection);
-
-    void visitDoFnCollection(DoCollectionImpl<?> collection);
-
-    void visitDoTable(DoTableImpl<?, ?> collection);
-
-    void visitGroupedTable(PGroupedTableImpl<?, ?> collection);
-  }
-
-  public void accept(Visitor visitor) {
-    if (materializedAt != null) {
-      visitor.visitInputCollection(new InputCollection<S>(materializedAt, (MRPipeline) getPipeline()));
-    } else {
-      acceptInternal(visitor);
-    }
-  }
-
-  protected abstract void acceptInternal(Visitor visitor);
-
-  @Override
-  public ReadableData<S> asReadable(boolean materialize) {
-    if (materializedAt != null && (materializedAt instanceof ReadableSource)) {
-      return ((ReadableSource) materializedAt).asReadable();
-    } else if (materialized || materialize) {
-      return ((MRPipeline) getPipeline()).getMaterializeSourceTarget(this).asReadable();
-    } else {
-      return getReadableDataInternal();
-    }
-  }
-
-  protected ReadableData<S> materializedData() {
-    materialized = true;
-    return pipeline.getMaterializeSourceTarget(this).asReadable();
-  }
-
-  protected abstract ReadableData<S> getReadableDataInternal();
-
-  @Override
-  public long getSize() {
-    if (size < 0) {
-      this.size = getSizeInternal();
-    }
-    return size;
-  }
-
-  protected abstract long getSizeInternal();
-  
-  public abstract long getLastModifiedAt();
-  
-  /**
-   * Retrieve the PCollectionImpl to be used for chaining within PCollectionImpls further down the pipeline.
-   * @return The PCollectionImpl instance to be chained
-   */
-  protected PCollectionImpl<S> getChainingCollection(){
-    return this;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
index ab5f48c..23578ee 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
@@ -17,51 +17,22 @@
  */
 package org.apache.crunch.impl.mr.collect;
 
-import java.util.List;
-import java.util.Set;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.Aggregator;
-import org.apache.crunch.CombineFn;
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
 import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PGroupedTable;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.ReadableData;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.impl.dist.collect.BaseGroupedTable;
+import org.apache.crunch.impl.dist.collect.MRCollection;
+import org.apache.crunch.impl.dist.collect.PTableBase;
 import org.apache.crunch.impl.mr.plan.DoNode;
-import org.apache.crunch.lib.PTables;
-import org.apache.crunch.types.PGroupedTableType;
-import org.apache.crunch.types.PType;
 import org.apache.crunch.util.PartitionUtils;
 import org.apache.hadoop.mapreduce.Job;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-
-public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>>> implements PGroupedTable<K, V> {
+public class PGroupedTableImpl<K, V> extends BaseGroupedTable<K, V> implements MRCollection {
 
   private static final Log LOG = LogFactory.getLog(PGroupedTableImpl.class);
 
-  private final PTableBase<K, V> parent;
-  private final GroupingOptions groupingOptions;
-  private final PGroupedTableType<K, V> ptype;
-  
-  PGroupedTableImpl(PTableBase<K, V> parent) {
-    this(parent, null);
-  }
-
   PGroupedTableImpl(PTableBase<K, V> parent, GroupingOptions groupingOptions) {
-    super("GBK");
-    this.parent = parent;
-    this.groupingOptions = groupingOptions;
-    this.ptype = parent.getPTableType().getGroupedTableType();
+    super(parent, groupingOptions);
   }
 
   public void configureShuffle(Job job) {
@@ -78,89 +49,11 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
   }
 
   @Override
-  protected ReadableData<Pair<K, Iterable<V>>> getReadableDataInternal() {
-    throw new UnsupportedOperationException("PGroupedTable does not currently support readability");
-  }
-
-  @Override
-  protected long getSizeInternal() {
-    return parent.getSizeInternal();
-  }
-
-  @Override
-  public PType<Pair<K, Iterable<V>>> getPType() {
-    return ptype;
-  }
-  
-  @Override
-  public PTable<K, V> combineValues(CombineFn<K, V> combineFn, CombineFn<K, V> reduceFn) {
-      return new DoTableImpl<K, V>("combine", getChainingCollection(), combineFn, reduceFn, parent.getPTableType());
-  }
-  
-  @Override
-  public PTable<K, V> combineValues(CombineFn<K, V> combineFn) {
-    return combineValues(combineFn, combineFn);
-  }
-
-  @Override
-  public PTable<K, V> combineValues(Aggregator<V> agg) {
-    return combineValues(Aggregators.<K, V>toCombineFn(agg));
-  }
-
-  @Override
-  public PTable<K, V> combineValues(Aggregator<V> combineAgg, Aggregator<V> reduceAgg) {
-    return combineValues(Aggregators.<K, V>toCombineFn(combineAgg), Aggregators.<K, V>toCombineFn(reduceAgg));
-  }
-
-  private static class Ungroup<K, V> extends DoFn<Pair<K, Iterable<V>>, Pair<K, V>> {
-    @Override
-    public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
-      for (V v : input.second()) {
-        emitter.emit(Pair.of(input.first(), v));
-      }
-    }
-  }
-
-  @Override
-  public PTable<K, V> ungroup() {
-    return parallelDo("ungroup", new Ungroup<K, V>(), parent.getPTableType());
-  }
-
-  @Override
-  public <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
-    return PTables.mapValues(this, mapFn, ptype);
-  }
-  
-  @Override
-  public <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
-    return PTables.mapValues(name, this, mapFn, ptype);
-  }
-  
-  @Override
-  public PGroupedTableType<K, V> getGroupedTableType() {
-    return ptype;
-  }
-  
-  @Override
-  protected void acceptInternal(Visitor visitor) {
+  public void accept(Visitor visitor) {
     visitor.visitGroupedTable(this);
   }
 
   @Override
-  public Set<SourceTarget<?>> getTargetDependencies() {
-    Set<SourceTarget<?>> td = Sets.newHashSet(super.getTargetDependencies());
-    if (groupingOptions != null) {
-      td.addAll(groupingOptions.getSourceTargets());
-    }
-    return ImmutableSet.copyOf(td);
-  }
-  
-  @Override
-  public List<PCollectionImpl<?>> getParents() {
-    return ImmutableList.<PCollectionImpl<?>> of(parent);
-  }
-
-  @Override
   public DoNode createDoNode() {
     return DoNode.createFnNode(getName(), ptype.getInputMapFn(), ptype, doOptions);
   }
@@ -168,16 +61,4 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
   public DoNode getGroupingNode() {
     return DoNode.createGroupingNode("", ptype);
   }
-  
-  @Override
-  public long getLastModifiedAt() {
-    return parent.getLastModifiedAt();
-  }
-  
-  @Override
-  protected PCollectionImpl<Pair<K, Iterable<V>>> getChainingCollection() {
-    // Use a copy for chaining to allow sending the output of a single grouped table to multiple outputs
-    // TODO This should be implemented in a cleaner way in the planner
-    return new PGroupedTableImpl<K, V>(parent, groupingOptions);
-  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
deleted file mode 100644
index c477fad..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.collect;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.crunch.FilterFn;
-import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PObject;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.ParallelDoOptions;
-import org.apache.crunch.TableSource;
-import org.apache.crunch.Target;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.lib.Cogroup;
-import org.apache.crunch.lib.Join;
-import org.apache.crunch.lib.PTables;
-import org.apache.crunch.materialize.MaterializableMap;
-import org.apache.crunch.materialize.pobject.MapPObject;
-import org.apache.crunch.types.PType;
-
-import com.google.common.collect.Lists;
-
-abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> implements PTable<K, V> {
-
-  public PTableBase(String name) {
-    super(name);
-  }
-
-  public PTableBase(String name, ParallelDoOptions options) {
-    super(name, options);
-  }
-  
-  public PType<K> getKeyType() {
-    return getPTableType().getKeyType();
-  }
-
-  public PType<V> getValueType() {
-    return getPTableType().getValueType();
-  }
-
-  public PGroupedTableImpl<K, V> groupByKey() {
-    return new PGroupedTableImpl<K, V>(this);
-  }
-
-  public PGroupedTableImpl<K, V> groupByKey(int numReduceTasks) {
-    return new PGroupedTableImpl<K, V>(this, GroupingOptions.builder().numReducers(numReduceTasks).build());
-  }
-
-  public PGroupedTableImpl<K, V> groupByKey(GroupingOptions groupingOptions) {
-    return new PGroupedTableImpl<K, V>(this, groupingOptions);
-  }
-
-  @Override
-  public PTable<K, V> union(PTable<K, V> other) {
-    return union(new PTable[] { other });
-  }
-  
-  @Override
-  public PTable<K, V> union(PTable<K, V>... others) {
-    List<PTableBase<K, V>> internal = Lists.newArrayList();
-    internal.add(this);
-    for (PTable<K, V> table : others) {
-      internal.add((PTableBase<K, V>) table);
-    }
-    return new UnionTable<K, V>(internal);
-  }
-
-  @Override
-  public PTable<K, V> write(Target target) {
-    if (getMaterializedAt() != null) {
-      getPipeline().write(new InputTable<K, V>(
-          (TableSource<K, V>) getMaterializedAt(), (MRPipeline) getPipeline()), target);
-    } else {
-      getPipeline().write(this, target);
-    }
-    return this;
-  }
-
-  @Override
-  public PTable<K, V> write(Target target, Target.WriteMode writeMode) {
-    if (getMaterializedAt() != null) {
-      getPipeline().write(new InputTable<K, V>(
-          (TableSource<K, V>) getMaterializedAt(), (MRPipeline) getPipeline()), target, writeMode);
-    } else {
-      getPipeline().write(this, target, writeMode);
-    }
-    return this;
-  }
-  
-  @Override
-  public PTable<K, V> filter(FilterFn<Pair<K, V>> filterFn) {
-    return parallelDo(filterFn, getPTableType());
-  }
-  
-  @Override
-  public PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn) {
-    return parallelDo(name, filterFn, getPTableType());
-  }
-  
-  @Override
-  public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype) {
-    return PTables.mapValues(this, mapFn, ptype);
-  }
-  
-  @Override
-  public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype) {
-    return PTables.mapValues(name, this, mapFn, ptype);
-  }
-  
-  @Override
-  public <K2> PTable<K2, V> mapKeys(MapFn<K, K2> mapFn, PType<K2> ptype) {
-    return PTables.mapKeys(this, mapFn, ptype);
-  }
-  
-  @Override
-  public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype) {
-    return PTables.mapKeys(name, this, mapFn, ptype);
-  }
-  
-  @Override
-  public PTable<K, V> top(int count) {
-    return Aggregate.top(this, count, true);
-  }
-
-  @Override
-  public PTable<K, V> bottom(int count) {
-    return Aggregate.top(this, count, false);
-  }
-
-  @Override
-  public PTable<K, Collection<V>> collectValues() {
-    return Aggregate.collectValues(this);
-  }
-
-  @Override
-  public <U> PTable<K, Pair<V, U>> join(PTable<K, U> other) {
-    return Join.join(this, other);
-  }
-
-  @Override
-  public <U> PTable<K, Pair<Collection<V>, Collection<U>>> cogroup(PTable<K, U> other) {
-    return Cogroup.cogroup(this, other);
-  }
-
-  @Override
-  public PCollection<K> keys() {
-    return PTables.keys(this);
-  }
-
-  @Override
-  public PCollection<V> values() {
-    return PTables.values(this);
-  }
-
-  /**
-   * Returns a Map<K, V> made up of the keys and values in this PTable.
-   */
-  @Override
-  public Map<K, V> materializeToMap() {
-    return new MaterializableMap<K, V>(this.materialize());
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public PObject<Map<K, V>> asMap() {
-    return new MapPObject<K, V>(this);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
index e6c95bb..9dab5df 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
@@ -17,97 +17,13 @@
  */
 package org.apache.crunch.impl.mr.collect;
 
-import java.util.List;
-
-import com.google.common.collect.Lists;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.ReadableData;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.impl.mr.plan.DoNode;
-import org.apache.crunch.types.PType;
-
-import com.google.common.collect.ImmutableList;
-
-public class UnionCollection<S> extends PCollectionImpl<S> {
-
-  private List<PCollectionImpl<S>> parents;
-  private long size = 0;
-  private long lastModifiedAt = -1;
-  
-  private static <S> String flatName(List<PCollectionImpl<S>> collections) {
-    StringBuilder sb = new StringBuilder("union(");
-    for (int i = 0; i < collections.size(); i++) {
-      if (i != 0) {
-        sb.append(',');
-      }
-      sb.append(collections.get(i).getName());
-    }
-    return sb.append(')').toString();
-  }
+import org.apache.crunch.impl.dist.collect.BaseUnionCollection;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 
-  UnionCollection(List<PCollectionImpl<S>> collections) {
-    super(flatName(collections));
-    this.parents = ImmutableList.copyOf(collections);
-    this.pipeline = (MRPipeline) parents.get(0).getPipeline();
-    for (PCollectionImpl<S> parent : parents) {
-      if (this.pipeline != parent.getPipeline()) {
-        throw new IllegalStateException("Cannot union PCollections from different Pipeline instances");
-      }
-      size += parent.getSize();
-      if (parent.getLastModifiedAt() > lastModifiedAt) {
-        this.lastModifiedAt = parent.getLastModifiedAt();
-      }
-    }
-  }
-
-  @Override
-  public void setBreakpoint() {
-    super.setBreakpoint();
-    for (PCollectionImpl<S> parent : parents) {
-      parent.setBreakpoint();
-    }
-  }
-
-  @Override
-  protected long getSizeInternal() {
-    return size;
-  }
-
-  @Override
-  public long getLastModifiedAt() {
-    return lastModifiedAt;
-  }
-  
-  @Override
-  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
-    visitor.visitUnionCollection(this);
-  }
-
-  @Override
-  protected ReadableData<S> getReadableDataInternal() {
-    List<ReadableData<S>> prds = Lists.newArrayList();
-    for (PCollectionImpl<S> parent : parents) {
-      if (parent instanceof PGroupedTableImpl) {
-        return materializedData();
-      } else {
-        prds.add(parent.asReadable(false));
-      }
-    }
-    return new UnionReadableData<S>(prds);
-  }
-
-  @Override
-  public PType<S> getPType() {
-    return parents.get(0).getPType();
-  }
-
-  @Override
-  public List<PCollectionImpl<?>> getParents() {
-    return ImmutableList.<PCollectionImpl<?>> copyOf(parents);
-  }
+import java.util.List;
 
-  @Override
-  public DoNode createDoNode() {
-    throw new UnsupportedOperationException("Unioned collection does not support DoNodes");
+public class UnionCollection<S> extends BaseUnionCollection<S> {
+  UnionCollection(List<? extends PCollectionImpl<S>> collections) {
+    super(collections);
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionReadableData.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionReadableData.java
deleted file mode 100644
index 58a10c3..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionReadableData.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.collect;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.crunch.ReadableData;
-import org.apache.crunch.SourceTarget;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-
-class UnionReadableData<T> implements ReadableData<T> {
-
-  private final List<ReadableData<T>> data;
-
-  public UnionReadableData(List<ReadableData<T>> data) {
-    this.data = data;
-  }
-
-  @Override
-  public Set<SourceTarget<?>> getSourceTargets() {
-    Set<SourceTarget<?>> srcTargets = Sets.newHashSet();
-    for (ReadableData<T> rd: data) {
-      srcTargets.addAll(rd.getSourceTargets());
-    }
-    return srcTargets;
-  }
-
-  @Override
-  public void configure(Configuration conf) {
-   for (ReadableData<T> rd : data) {
-     rd.configure(conf);
-   }
-  }
-
-  @Override
-  public Iterable<T> read(final TaskInputOutputContext<?, ?, ?, ?> context) throws IOException {
-    List<Iterable<T>> iterables = Lists.newArrayList();
-    for (ReadableData<T> rd : data) {
-      iterables.add(rd.read(context));
-    }
-    return Iterables.concat(iterables);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
index b4144e4..2a3ad28 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
@@ -17,107 +17,13 @@
  */
 package org.apache.crunch.impl.mr.collect;
 
-import java.util.List;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.ReadableData;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.impl.mr.plan.DoNode;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-public class UnionTable<K, V> extends PTableBase<K, V> {
-
-  private PTableType<K, V> ptype;
-  private List<PCollectionImpl<Pair<K, V>>> parents;
-  private long size;
-  private long lastModifiedAt = -1;
-  
-  private static <K, V> String flatName(List<PTableBase<K, V>> tables) {
-    StringBuilder sb = new StringBuilder("union(");
-    for (int i = 0; i < tables.size(); i++) {
-      if (i != 0) {
-        sb.append(',');
-      }
-      sb.append(tables.get(i).getName());
-    }
-    return sb.append(')').toString();
-  }
-
-  public UnionTable(List<PTableBase<K, V>> tables) {
-    super(flatName(tables));
-    this.ptype = tables.get(0).getPTableType();
-    this.pipeline = (MRPipeline) tables.get(0).getPipeline();
-    this.parents = Lists.newArrayList();
-    for (PTableBase<K, V> parent : tables) {
-      if (pipeline != parent.getPipeline()) {
-        throw new IllegalStateException("Cannot union PTables from different Pipeline instances");
-      }
-      this.parents.add(parent);
-      size += parent.getSize();
-      if (parent.getLastModifiedAt() > lastModifiedAt) {
-        this.lastModifiedAt = parent.getLastModifiedAt();
-      }
-    }
-  }
-
-  @Override
-  protected long getSizeInternal() {
-    return size;
-  }
-
-  @Override
-  public long getLastModifiedAt() {
-    return lastModifiedAt;
-  }
-  
-  @Override
-  public PTableType<K, V> getPTableType() {
-    return ptype;
-  }
-
-  @Override
-  public PType<Pair<K, V>> getPType() {
-    return ptype;
-  }
-
-  @Override
-  public List<PCollectionImpl<?>> getParents() {
-    return ImmutableList.<PCollectionImpl<?>> copyOf(parents);
-  }
-
-  @Override
-  public void setBreakpoint() {
-    super.setBreakpoint();
-    for (PCollectionImpl<Pair<K, V>> parent : parents) {
-      parent.setBreakpoint();
-    }
-  }
-
-  @Override
-  protected void acceptInternal(PCollectionImpl.Visitor visitor) {
-    visitor.visitUnionCollection(new UnionCollection<Pair<K, V>>(parents));
-  }
+import org.apache.crunch.impl.dist.collect.BaseUnionTable;
+import org.apache.crunch.impl.dist.collect.PTableBase;
 
-  @Override
-  protected ReadableData<Pair<K, V>> getReadableDataInternal() {
-    List<ReadableData<Pair<K, V>>> prds = Lists.newArrayList();
-    for (PCollectionImpl<Pair<K, V>> parent : parents) {
-      if (parent instanceof PGroupedTableImpl) {
-        return materializedData();
-      } else {
-        prds.add(parent.asReadable(false));
-      }
-    }
-    return new UnionReadableData<Pair<K, V>>(prds);
-  }
+import java.util.List;
 
-  @Override
-  public DoNode createDoNode() {
-    throw new UnsupportedOperationException("Unioned table does not support do nodes");
+public class UnionTable<K, V> extends BaseUnionTable<K, V> {
+  UnionTable(List<PTableBase<K, V>> internal) {
+    super(internal);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index 38344a2..ce6fffa 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -27,9 +27,9 @@ import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
 import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
 import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 import org.apache.crunch.impl.mr.MRJob;
 import org.apache.crunch.impl.mr.MRPipelineExecution;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
 import org.apache.crunch.materialize.MaterializableIterable;
 import org.apache.hadoop.conf.Configuration;
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
index dc4a9c2..4d88296 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
@@ -24,8 +24,8 @@ import java.util.Set;
 import org.apache.crunch.Pair;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 import org.apache.crunch.impl.mr.collect.InputCollection;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
 import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
 
 import com.google.common.base.Joiner;


Mime
View raw message