crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [9/9] git commit: Generalizing Crunch's Collection APIs to support more execution frameworks
Date Wed, 11 Dec 2013 20:47:55 GMT
Generalizing Crunch's Collection APIs to support more execution frameworks


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/a691b835
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/a691b835
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/a691b835

Branch: refs/heads/master
Commit: a691b835b71c3ffaeade03bf02f5e4c0d7961557
Parents: 11e9b53
Author: Josh Wills <jwills@apache.org>
Authored: Sun Nov 17 09:13:21 2013 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Wed Dec 11 12:28:10 2013 -0800

----------------------------------------------------------------------
 .../impl/dist/collect/UnionCollectionIT.java    | 154 +++++++++
 .../impl/mr/collect/UnionCollectionIT.java      | 154 ---------
 .../java/org/apache/crunch/CachingOptions.java  | 126 +++++++
 .../java/org/apache/crunch/GroupingOptions.java |  23 +-
 .../java/org/apache/crunch/PCollection.java     |  17 +
 .../src/main/java/org/apache/crunch/PTable.java |   4 +
 .../src/main/java/org/apache/crunch/Pair.java   |   4 +-
 .../main/java/org/apache/crunch/Pipeline.java   |   9 +
 .../crunch/impl/dist/DistributedPipeline.java   | 342 +++++++++++++++++++
 .../impl/dist/collect/BaseDoCollection.java     |  79 +++++
 .../crunch/impl/dist/collect/BaseDoTable.java   | 112 ++++++
 .../impl/dist/collect/BaseGroupedTable.java     | 160 +++++++++
 .../impl/dist/collect/BaseInputCollection.java  |  93 +++++
 .../impl/dist/collect/BaseInputTable.java       |  90 +++++
 .../impl/dist/collect/BaseUnionCollection.java  | 105 ++++++
 .../impl/dist/collect/BaseUnionTable.java       | 115 +++++++
 .../crunch/impl/dist/collect/MRCollection.java  |  24 ++
 .../impl/dist/collect/PCollectionFactory.java   |  66 ++++
 .../impl/dist/collect/PCollectionImpl.java      | 332 ++++++++++++++++++
 .../crunch/impl/dist/collect/PTableBase.java    | 202 +++++++++++
 .../org/apache/crunch/impl/mem/MemPipeline.java |   6 +
 .../crunch/impl/mem/collect/MemCollection.java  |  13 +
 .../crunch/impl/mem/collect/MemTable.java       |  15 +-
 .../crunch/impl/mem/emit/InMemoryEmitter.java   |   2 +-
 .../org/apache/crunch/impl/mr/MRPipeline.java   | 292 +---------------
 .../impl/mr/collect/DelegatingReadableData.java |  67 ----
 .../crunch/impl/mr/collect/DoCollection.java    |  45 +++
 .../impl/mr/collect/DoCollectionImpl.java       |  85 -----
 .../crunch/impl/mr/collect/DoFnIterator.java    |  98 ------
 .../apache/crunch/impl/mr/collect/DoTable.java  |  54 +++
 .../crunch/impl/mr/collect/DoTableImpl.java     | 121 -------
 .../crunch/impl/mr/collect/InputCollection.java |  63 +---
 .../crunch/impl/mr/collect/InputTable.java      |  70 +---
 .../impl/mr/collect/MRCollectionFactory.java    | 106 ++++++
 .../crunch/impl/mr/collect/PCollectionImpl.java | 331 ------------------
 .../impl/mr/collect/PGroupedTableImpl.java      | 131 +------
 .../crunch/impl/mr/collect/PTableBase.java      | 190 -----------
 .../crunch/impl/mr/collect/UnionCollection.java |  96 +-----
 .../impl/mr/collect/UnionReadableData.java      |  64 ----
 .../crunch/impl/mr/collect/UnionTable.java      | 106 +-----
 .../apache/crunch/impl/mr/exec/MRExecutor.java  |   2 +-
 .../crunch/impl/mr/plan/DotfileWriter.java      |   2 +-
 .../org/apache/crunch/impl/mr/plan/Edge.java    |  18 +-
 .../org/apache/crunch/impl/mr/plan/Graph.java   |   2 +-
 .../crunch/impl/mr/plan/GraphBuilder.java       |  22 +-
 .../crunch/impl/mr/plan/JobPrototype.java       |  15 +-
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java |   4 +-
 .../apache/crunch/impl/mr/plan/NodePath.java    |   4 +-
 .../org/apache/crunch/impl/mr/plan/Vertex.java  |  16 +-
 .../crunch/impl/mr/run/CrunchInputSplit.java    |  33 +-
 .../apache/crunch/io/impl/FileTargetImpl.java   |   6 +-
 .../org/apache/crunch/lib/SecondarySort.java    |   1 +
 .../main/java/org/apache/crunch/lib/Sort.java   |   2 +
 .../crunch/lib/join/DefaultJoinStrategy.java    |   1 +
 .../org/apache/crunch/lib/join/JoinUtils.java   |  13 +-
 .../crunch/util/DelegatingReadableData.java     |  67 ++++
 .../org/apache/crunch/util/DoFnIterator.java    |  98 ++++++
 .../apache/crunch/util/UnionReadableData.java   |  64 ++++
 .../impl/dist/collect/DoCollectionTest.java     | 117 +++++++
 .../impl/dist/collect/DoTableImplTest.java      |  88 +++++
 .../apache/crunch/impl/mr/MRPipelineTest.java   |   2 +-
 .../impl/mr/collect/DoCollectionImplTest.java   | 122 -------
 .../crunch/impl/mr/collect/DoTableImplTest.java |  86 -----
 .../crunch/impl/mr/plan/DotfileWriterTest.java  |   2 +-
 .../org/apache/crunch/io/hbase/HFileUtils.java  |   4 +-
 65 files changed, 2845 insertions(+), 2112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/it/java/org/apache/crunch/impl/dist/collect/UnionCollectionIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/dist/collect/UnionCollectionIT.java b/crunch-core/src/it/java/org/apache/crunch/impl/dist/collect/UnionCollectionIT.java
new file mode 100644
index 0000000..51b3dda
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/impl/dist/collect/UnionCollectionIT.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTableKeyValueIT;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+
+@RunWith(value = Parameterized.class)
+public class UnionCollectionIT {
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  private static final Log LOG = LogFactory.getLog(UnionCollectionIT.class);
+
+  private PTypeFamily typeFamily;
+  private Pipeline pipeline;
+  private PCollection<String> union;
+
+  private ArrayList<String> EXPECTED = Lists.newArrayList("a", "a", "b", "c", "c", "d", "e");
+
+  private Class pipelineClass;
+
+  @Before
+  @SuppressWarnings("unchecked")
+  public void setUp() throws IOException {
+    String inputFile1 = tmpDir.copyResourceFileName("set1.txt");
+    String inputFile2 = tmpDir.copyResourceFileName("set2.txt");
+    if (pipelineClass == null) {
+      pipeline = MemPipeline.getInstance();
+    } else {
+      pipeline = new MRPipeline(pipelineClass, tmpDir.getDefaultConfiguration());
+    }
+    PCollection<String> firstCollection = pipeline.read(At.textFile(inputFile1, typeFamily.strings()));
+    PCollection<String> secondCollection = pipeline.read(At.textFile(inputFile2, typeFamily.strings()));
+
+    LOG.info("Test fixture: [" + pipeline.getClass().getSimpleName() + " : " + typeFamily.getClass().getSimpleName()
+        + "]  First: " + Lists.newArrayList(firstCollection.materialize().iterator()) + ", Second: "
+        + Lists.newArrayList(secondCollection.materialize().iterator()));
+
+    union = secondCollection.union(firstCollection);
+  }
+
+  @Parameters
+  public static Collection<Object[]> data() throws IOException {
+    Object[][] data = new Object[][] { { WritableTypeFamily.getInstance(), PTableKeyValueIT.class },
+        { WritableTypeFamily.getInstance(), null }, { AvroTypeFamily.getInstance(), PTableKeyValueIT.class },
+        { AvroTypeFamily.getInstance(), null } };
+    return Arrays.asList(data);
+  }
+
+  public UnionCollectionIT(PTypeFamily typeFamily, Class pipelineClass) {
+    this.typeFamily = typeFamily;
+    this.pipelineClass = pipelineClass;
+  }
+
+  @Test
+  public void unionMaterializeShouldNotThrowNPE() throws Exception {
+    checkMaterialized(union.materialize());
+    checkMaterialized(pipeline.materialize(union));
+  }
+
+  private void checkMaterialized(Iterable<String> materialized) {
+    List<String> materializedValues = Lists.newArrayList(materialized.iterator());
+    Collections.sort(materializedValues);
+    LOG.info("Materialized union: " + materializedValues);
+    assertEquals(EXPECTED, materializedValues);
+  }
+
+  @Test
+  public void unionWriteShouldNotThrowNPE() throws IOException {
+    String outputPath1 = tmpDir.getFileName("output1");
+    String outputPath2 = tmpDir.getFileName("output2");
+    String outputPath3 = tmpDir.getFileName("output3");
+
+    if (typeFamily == AvroTypeFamily.getInstance()) {
+      union.write(To.avroFile(outputPath1));
+      pipeline.write(union, To.avroFile(outputPath2));
+
+      pipeline.run();
+
+      checkFileContents(outputPath1);
+      checkFileContents(outputPath2);
+
+    } else {
+
+      union.write(To.textFile(outputPath1));
+      pipeline.write(union, To.textFile(outputPath2));
+      pipeline.writeTextFile(union, outputPath3);
+
+      pipeline.run();
+
+      checkFileContents(outputPath1);
+      checkFileContents(outputPath2);
+      checkFileContents(outputPath3);
+    }
+  }
+
+  private void checkFileContents(String filePath) throws IOException {
+
+    List<String> fileContentValues = (typeFamily != AvroTypeFamily.getInstance())? Lists
+        .newArrayList(pipeline.read(At.textFile(filePath, typeFamily.strings())).materialize().iterator()) : Lists
+        .newArrayList(pipeline.read(At.avroFile(filePath, Avros.strings())).materialize().iterator());
+
+    Collections.sort(fileContentValues);
+
+    LOG.info("Saved Union: " + fileContentValues);
+    assertEquals(EXPECTED, fileContentValues);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
deleted file mode 100644
index 2832437..0000000
--- a/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.collect;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PTableKeyValueIT;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.io.To;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.google.common.collect.Lists;
-
-@RunWith(value = Parameterized.class)
-public class UnionCollectionIT {
-  @Rule
-  public TemporaryPath tmpDir = TemporaryPaths.create();
-
-  private static final Log LOG = LogFactory.getLog(UnionCollectionIT.class);
-
-  private PTypeFamily typeFamily;
-  private Pipeline pipeline;
-  private PCollection<String> union;
-
-  private ArrayList<String> EXPECTED = Lists.newArrayList("a", "a", "b", "c", "c", "d", "e");
-
-  private Class pipelineClass;
-
-  @Before
-  @SuppressWarnings("unchecked")
-  public void setUp() throws IOException {
-    String inputFile1 = tmpDir.copyResourceFileName("set1.txt");
-    String inputFile2 = tmpDir.copyResourceFileName("set2.txt");
-    if (pipelineClass == null) {
-      pipeline = MemPipeline.getInstance();
-    } else {
-      pipeline = new MRPipeline(pipelineClass, tmpDir.getDefaultConfiguration());
-    }
-    PCollection<String> firstCollection = pipeline.read(At.textFile(inputFile1, typeFamily.strings()));
-    PCollection<String> secondCollection = pipeline.read(At.textFile(inputFile2, typeFamily.strings()));
-
-    LOG.info("Test fixture: [" + pipeline.getClass().getSimpleName() + " : " + typeFamily.getClass().getSimpleName()
-        + "]  First: " + Lists.newArrayList(firstCollection.materialize().iterator()) + ", Second: "
-        + Lists.newArrayList(secondCollection.materialize().iterator()));
-
-    union = secondCollection.union(firstCollection);
-  }
-
-  @Parameters
-  public static Collection<Object[]> data() throws IOException {
-    Object[][] data = new Object[][] { { WritableTypeFamily.getInstance(), PTableKeyValueIT.class },
-        { WritableTypeFamily.getInstance(), null }, { AvroTypeFamily.getInstance(), PTableKeyValueIT.class },
-        { AvroTypeFamily.getInstance(), null } };
-    return Arrays.asList(data);
-  }
-
-  public UnionCollectionIT(PTypeFamily typeFamily, Class pipelineClass) {
-    this.typeFamily = typeFamily;
-    this.pipelineClass = pipelineClass;
-  }
-
-  @Test
-  public void unionMaterializeShouldNotThrowNPE() throws Exception {
-    checkMaterialized(union.materialize());
-    checkMaterialized(pipeline.materialize(union));
-  }
-
-  private void checkMaterialized(Iterable<String> materialized) {
-    List<String> materializedValues = Lists.newArrayList(materialized.iterator());
-    Collections.sort(materializedValues);
-    LOG.info("Materialized union: " + materializedValues);
-    assertEquals(EXPECTED, materializedValues);
-  }
-
-  @Test
-  public void unionWriteShouldNotThrowNPE() throws IOException {
-    String outputPath1 = tmpDir.getFileName("output1");
-    String outputPath2 = tmpDir.getFileName("output2");
-    String outputPath3 = tmpDir.getFileName("output3");
-
-    if (typeFamily == AvroTypeFamily.getInstance()) {
-      union.write(To.avroFile(outputPath1));
-      pipeline.write(union, To.avroFile(outputPath2));
-
-      pipeline.run();
-
-      checkFileContents(outputPath1);
-      checkFileContents(outputPath2);
-
-    } else {
-
-      union.write(To.textFile(outputPath1));
-      pipeline.write(union, To.textFile(outputPath2));
-      pipeline.writeTextFile(union, outputPath3);
-
-      pipeline.run();
-
-      checkFileContents(outputPath1);
-      checkFileContents(outputPath2);
-      checkFileContents(outputPath3);
-    }
-  }
-
-  private void checkFileContents(String filePath) throws IOException {
-
-    List<String> fileContentValues = (typeFamily != AvroTypeFamily.getInstance())? Lists
-        .newArrayList(pipeline.read(At.textFile(filePath, typeFamily.strings())).materialize().iterator()) : Lists
-        .newArrayList(pipeline.read(At.avroFile(filePath, Avros.strings())).materialize().iterator());
-
-    Collections.sort(fileContentValues);
-
-    LOG.info("Saved Union: " + fileContentValues);
-    assertEquals(EXPECTED, fileContentValues);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/CachingOptions.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/CachingOptions.java b/crunch-core/src/main/java/org/apache/crunch/CachingOptions.java
new file mode 100644
index 0000000..7cb8a52
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/CachingOptions.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Options for controlling how a {@code PCollection<T>} is cached for subsequent processing. Different pipeline
+ * execution frameworks may use some or all of these options when deciding how to cache a given {@code PCollection}
+ * depending on the implementation details of the framework.
+ */
+public class CachingOptions {
+
+  private final boolean useDisk;
+  private final boolean useMemory;
+  private final boolean deserialized;
+  private final int replicas;
+
+  private CachingOptions(
+      boolean useDisk,
+      boolean useMemory,
+      boolean deserialized,
+      int replicas) {
+    this.useDisk = useDisk;
+    this.useMemory = useMemory;
+    this.deserialized = deserialized;
+    this.replicas = replicas;
+  }
+
+  /**
+   * Whether the framework may cache data on disk.
+   */
+  public boolean useDisk() {
+    return useDisk;
+  }
+
+  /**
+   * Whether the framework may cache data in memory without writing it to disk.
+   */
+  public boolean useMemory() {
+    return useMemory;
+  }
+
+  /**
+   * Whether the data should remain deserialized in the cache, which trades off CPU processing time
+   * for additional storage overhead.
+   */
+  public boolean deserialized() {
+    return deserialized;
+  }
+
+  /**
+   * Returns the number of replicas of the data that should be maintained in the cache.
+   */
+  public int replicas() {
+    return replicas;
+  }
+
+  /**
+   * Creates a new {@link Builder} instance to use for specifying the caching options for a particular
+   * {@code PCollection<T>}.
+   * @return
+   */
+  public static Builder builder() {
+    return new CachingOptions.Builder();
+  }
+
+  /**
+   * An instance of {@code CachingOptions} with the default caching settings.
+   */
+  public static final CachingOptions DEFAULT = CachingOptions.builder().build();
+
+  /**
+   * A Builder class to use for setting the {@code CachingOptions} for a {@link PCollection}. The default
+   * settings are to keep a single replica of the data deserialized in memory, without writing to disk
+   * unless it is required due to resource limitations.
+   */
+  public static class Builder {
+    private boolean useDisk = false;
+    private boolean useMemory = true;
+    private boolean deserialized = true;
+    private int replicas = 1;
+
+    public Builder() {}
+
+    public Builder useMemory(boolean useMemory) {
+      this.useMemory = useMemory;
+      return this;
+    }
+
+    public Builder useDisk(boolean useDisk) {
+      this.useDisk = useDisk;
+      return this;
+    }
+
+    public Builder deserialized(boolean deserialized) {
+      this.deserialized = deserialized;
+      return this;
+    }
+
+    public Builder replicas(int replicas) {
+      Preconditions.checkArgument(replicas > 0);
+      this.replicas = replicas;
+      return this;
+    }
+
+    public CachingOptions build() {
+      return new CachingOptions(useDisk, useMemory, deserialized, replicas);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/GroupingOptions.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/GroupingOptions.java b/crunch-core/src/main/java/org/apache/crunch/GroupingOptions.java
index baaecdc..59abe27 100644
--- a/crunch-core/src/main/java/org/apache/crunch/GroupingOptions.java
+++ b/crunch-core/src/main/java/org/apache/crunch/GroupingOptions.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch;
 
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
@@ -35,21 +36,25 @@ import com.google.common.collect.Sets;
  * keys is performed.
  * 
  */
-public class GroupingOptions {
+public class GroupingOptions implements Serializable {
 
   private final Class<? extends Partitioner> partitionerClass;
   private final Class<? extends RawComparator> groupingComparatorClass;
   private final Class<? extends RawComparator> sortComparatorClass;
+  private final boolean requireSortedKeys;
   private final int numReducers;
   private final Map<String, String> extraConf;
-  private final Set<SourceTarget<?>> sourceTargets;
+  private transient Set<SourceTarget<?>> sourceTargets;
   
   private GroupingOptions(Class<? extends Partitioner> partitionerClass,
       Class<? extends RawComparator> groupingComparatorClass, Class<? extends RawComparator> sortComparatorClass,
-      int numReducers, Map<String, String> extraConf, Set<SourceTarget<?>> sourceTargets) {
+      boolean requireSortedKeys, int numReducers,
+      Map<String, String> extraConf,
+      Set<SourceTarget<?>> sourceTargets) {
     this.partitionerClass = partitionerClass;
     this.groupingComparatorClass = groupingComparatorClass;
     this.sortComparatorClass = sortComparatorClass;
+    this.requireSortedKeys = requireSortedKeys;
     this.numReducers = numReducers;
     this.extraConf = extraConf;
     this.sourceTargets = sourceTargets;
@@ -59,6 +64,10 @@ public class GroupingOptions {
     return numReducers;
   }
 
+  public boolean requireSortedKeys() {
+    return requireSortedKeys;
+  }
+
   public Class<? extends RawComparator> getSortComparatorClass() {
     return sortComparatorClass;
   }
@@ -121,6 +130,7 @@ public class GroupingOptions {
     private Class<? extends Partitioner> partitionerClass;
     private Class<? extends RawComparator> groupingComparatorClass;
     private Class<? extends RawComparator> sortComparatorClass;
+    private boolean requireSortedKeys;
     private int numReducers;
     private Map<String, String> extraConf = Maps.newHashMap();
     private Set<SourceTarget<?>> sourceTargets = Sets.newHashSet();
@@ -143,6 +153,11 @@ public class GroupingOptions {
       return this;
     }
 
+    public Builder requireSortedKeys() {
+      requireSortedKeys = true;
+      return this;
+    }
+
     public Builder numReducers(int numReducers) {
       if (numReducers <= 0) {
         throw new IllegalArgumentException("Invalid number of reducers: " + numReducers);
@@ -174,7 +189,7 @@ public class GroupingOptions {
 
     public GroupingOptions build() {
       return new GroupingOptions(partitionerClass, groupingComparatorClass, sortComparatorClass,
-          numReducers, extraConf, sourceTargets);
+          requireSortedKeys, numReducers, extraConf, sourceTargets);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PCollection.java b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
index ee8052a..2d62d00 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
@@ -160,6 +160,23 @@ public interface PCollection<S> {
   Iterable<S> materialize();
 
   /**
+   * Marks this data as cached using the default {@link CachingOptions}. Cached {@code PCollection}s will only
+   * be processed once, and then their contents will be saved so that downstream code can process them many times.
+   *
+   * @return this {@code PCollection} instance
+   */
+  PCollection<S> cache();
+
+  /**
+   * Marks this data as cached using the given {@code CachingOptions}. Cached {@code PCollection}s will only
+   * be processed once and then their contents will be saved so that downstream code can process them many times.
+   *
+   * @param options the options that control the cache settings for the data
+   * @return this {@code PCollection} instance
+   */
+  PCollection<S> cache(CachingOptions options);
+
+  /**
    * @return A {@code PObject} encapsulating an in-memory {@link Collection} containing the values
    * of this {@code PCollection}.
    */

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/PTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PTable.java b/crunch-core/src/main/java/org/apache/crunch/PTable.java
index 738b3cb..09fe9db 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PTable.java
@@ -80,6 +80,10 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
    */
   PTable<K, V> write(Target target, Target.WriteMode writeMode);
 
+  PTable<K, V> cache();
+
+  PTable<K, V> cache(CachingOptions options);
+
   /**
    * Returns the {@code PTableType} of this {@code PTable}.
    */

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/Pair.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Pair.java b/crunch-core/src/main/java/org/apache/crunch/Pair.java
index fd058b6..462434c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Pair.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Pair.java
@@ -19,10 +19,12 @@ package org.apache.crunch;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
 
+import java.io.Serializable;
+
 /**
  * A convenience class for two-element {@link Tuple}s.
  */
-public class Pair<K, V> implements Tuple, Comparable<Pair<K, V>> {
+public class Pair<K, V> implements Tuple, Comparable<Pair<K, V>>, Serializable {
 
   private final K first;
   private final V second;

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
index 503ca49..3b0bac2 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
@@ -101,6 +101,15 @@ public interface Pipeline {
   <T> Iterable<T> materialize(PCollection<T> pcollection);
 
   /**
+   * Caches the given PCollection so that it will be processed at most once
+   * during pipeline execution.
+   *
+   * @param pcollection The PCollection to cache
+   * @param options The options for how the cached data is stored
+   */
+  <T> void cache(PCollection<T> pcollection, CachingOptions options);
+
+  /**
    * Constructs and executes a series of MapReduce jobs in order to write data
    * to the output targets.
    */

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
new file mode 100644
index 0000000..28dbaec
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
+import org.apache.crunch.Source;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.Target;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.impl.dist.collect.BaseInputCollection;
+import org.apache.crunch.impl.dist.collect.BaseInputTable;
+import org.apache.crunch.impl.dist.collect.BaseGroupedTable;
+import org.apache.crunch.impl.dist.collect.BaseUnionCollection;
+import org.apache.crunch.impl.dist.collect.BaseUnionTable;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.dist.collect.PCollectionFactory;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.io.To;
+import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+public abstract class DistributedPipeline implements Pipeline {
+  private static final Log LOG = LogFactory.getLog(DistributedPipeline.class);
+
+  private static final Random RANDOM = new Random();
+
+  private final String name;
+  protected final PCollectionFactory factory;
+  protected final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
+  protected final Map<PCollectionImpl<?>, MaterializableIterable<?>> outputTargetsToMaterialize;
+  private Path tempDirectory;
+  private int tempFileIndex;
+  private int nextAnonymousStageId;
+
+  private Configuration conf;
+
+  /**
+   * Instantiate with a custom name and configuration.
+   *
+   * @param name Display name of the pipeline
+   * @param conf Configuration to be used within all MapReduce jobs run in the pipeline
+   */
+  public DistributedPipeline(String name, Configuration conf, PCollectionFactory factory) {
+    this.name = name;
+    this.factory = factory;
+    this.outputTargets = Maps.newHashMap();
+    this.outputTargetsToMaterialize = Maps.newHashMap();
+    this.conf = conf;
+    this.tempDirectory = createTempDirectory(conf);
+    this.tempFileIndex = 0;
+    this.nextAnonymousStageId = 0;
+  }
+
+  public PCollectionFactory getFactory() {
+    return factory;
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  @Override
+  public void setConfiguration(Configuration conf) {
+    this.conf = conf;
+    this.tempDirectory = createTempDirectory(conf);
+  }
+
+  @Override
+  public PipelineResult done() {
+    PipelineResult res = PipelineResult.EMPTY;
+    if (!outputTargets.isEmpty()) {
+      res = run();
+    }
+    cleanup();
+    return res;
+  }
+
+  public <S> PCollection<S> read(Source<S> source) {
+    return factory.createInputCollection(source, this);
+  }
+
+  public <K, V> PTable<K, V> read(TableSource<K, V> source) {
+    return factory.createInputTable(source, this);
+  }
+
+  public PCollection<String> readTextFile(String pathName) {
+    return read(From.textFile(pathName));
+  }
+
+  public void write(PCollection<?> pcollection, Target target) {
+    write(pcollection, target, Target.WriteMode.DEFAULT);
+  }
+
+  @SuppressWarnings("unchecked")
+  public void write(PCollection<?> pcollection, Target target,
+                    Target.WriteMode writeMode) {
+    if (pcollection instanceof BaseGroupedTable) {
+      pcollection = ((BaseGroupedTable<?, ?>) pcollection).ungroup();
+    } else if (pcollection instanceof BaseUnionCollection || pcollection instanceof BaseUnionTable) {
+      pcollection = pcollection.parallelDo("UnionCollectionWrapper",
+          (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
+    }
+    boolean exists = target.handleExisting(writeMode, ((PCollectionImpl) pcollection).getLastModifiedAt(),
+        getConfiguration());
+    if (exists && writeMode == Target.WriteMode.CHECKPOINT) {
+      SourceTarget<?> st = target.asSourceTarget(pcollection.getPType());
+      if (st == null) {
+        throw new CrunchRuntimeException("Target " + target + " does not support checkpointing");
+      } else {
+        ((PCollectionImpl) pcollection).materializeAt(st);
+      }
+      return;
+    } else if (writeMode != Target.WriteMode.APPEND && targetInCurrentRun(target)) {
+      throw new CrunchRuntimeException("Target " + target + " is already written in current run." +
+          " Use WriteMode.APPEND in order to write additional data to it.");
+    }
+    addOutput((PCollectionImpl<?>) pcollection, target);
+  }
+
+  private boolean targetInCurrentRun(Target target) {
+    for (Set<Target> targets : outputTargets.values()) {
+      if (targets.contains(target)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private void addOutput(PCollectionImpl<?> impl, Target target) {
+    if (!outputTargets.containsKey(impl)) {
+      outputTargets.put(impl, Sets.<Target> newHashSet());
+    }
+    outputTargets.get(impl).add(target);
+  }
+
+  // TODO: sort this out
+  /*
+  @Override
+  public <T> Iterable<T> materialize(PCollection<T> pcollection) {
+    C pcollectionImpl = toPCollectionImpl(pcollection);
+    ReadableSource<?> readableSrc = getMaterializeSourceTarget(pcollectionImpl);
+
+    MaterializableIterable<?> c = new MaterializableIterable(this, readableSrc);
+    if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) {
+      outputTargetsToMaterialize.put(pcollectionImpl, c);
+    }
+    return (Iterable<T>) c;
+  }
+  */
+
+  /**
+   * Retrieve a ReadableSourceTarget that provides access to the contents of a {@link PCollection}.
+   * This is primarily intended as a helper method to {@link #materialize(PCollection)}. The
+   * underlying data of the ReadableSourceTarget may not be actually present until the pipeline is
+   * run.
+   *
+   * @param pcollection The collection for which the ReadableSourceTarget is to be retrieved
+   * @return The ReadableSourceTarget
+   * @throws IllegalArgumentException If no ReadableSourceTarget can be retrieved for the given
+   *           PCollection
+   */
+  public <T> ReadableSource<T> getMaterializeSourceTarget(PCollection<T> pcollection) {
+    PCollectionImpl<T> impl = toPCollectionImpl(pcollection);
+
+    // First, check to see if this is a readable input collection.
+    if (impl instanceof BaseInputCollection) {
+      BaseInputCollection<T> ic = (BaseInputCollection<T>) impl;
+      if (ic.getSource() instanceof ReadableSource) {
+        return (ReadableSource) ic.getSource();
+      } else {
+        throw new IllegalArgumentException(
+            "Cannot materialize non-readable input collection: " + ic);
+      }
+    } else if (impl instanceof BaseInputTable) {
+      BaseInputTable it = (BaseInputTable) impl;
+      if (it.getSource() instanceof ReadableSource) {
+        return (ReadableSource) it.getSource();
+      } else {
+        throw new IllegalArgumentException(
+            "Cannot materialize non-readable input table: " + it);
+      }
+    }
+
+    // Next, check to see if this pcollection has already been materialized.
+    SourceTarget<?> matTarget = impl.getMaterializedAt();
+    if (matTarget != null && matTarget instanceof ReadableSourceTarget) {
+      return (ReadableSourceTarget<T>) matTarget;
+    }
+
+    // Check to see if we plan on materializing this collection on the
+    // next run.
+    ReadableSourceTarget<T> srcTarget = null;
+    if (outputTargets.containsKey(pcollection)) {
+      for (Target target : outputTargets.get(impl)) {
+        if (target instanceof ReadableSourceTarget) {
+          return (ReadableSourceTarget<T>) target;
+        }
+      }
+    }
+
+    // If we're not planning on materializing it already, create a temporary
+    // output to hold the materialized records and return that.
+    SourceTarget<T> st = createIntermediateOutput(pcollection.getPType());
+    if (!(st instanceof ReadableSourceTarget)) {
+      throw new IllegalArgumentException("The PType for the given PCollection is not readable"
+          + " and cannot be materialized");
+    } else {
+      srcTarget = (ReadableSourceTarget<T>) st;
+      addOutput(impl, srcTarget);
+      return srcTarget;
+    }
+  }
+
+  /**
+   * Safely cast a PCollection into a PCollectionImpl, including handling the case of
+   * UnionCollections.
+   *
+   * @param pcollection The PCollection to be cast/transformed
+   * @return The PCollectionImpl representation
+   */
+  private <T> PCollectionImpl<T> toPCollectionImpl(PCollection<T> pcollection) {
+    PCollectionImpl<T> pcollectionImpl = null;
+    if (pcollection instanceof BaseUnionCollection || pcollection instanceof BaseUnionTable) {
+      pcollectionImpl = (PCollectionImpl<T>) pcollection.parallelDo("UnionCollectionWrapper",
+          (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
+    } else {
+      pcollectionImpl = (PCollectionImpl<T>) pcollection;
+    }
+    return pcollectionImpl;
+  }
+
+  public <T> SourceTarget<T> createIntermediateOutput(PType<T> ptype) {
+    return ptype.getDefaultFileSource(createTempPath());
+  }
+
+  public Path createTempPath() {
+    tempFileIndex++;
+    return new Path(tempDirectory, "p" + tempFileIndex);
+  }
+
+  private static Path createTempDirectory(Configuration conf) {
+    Path dir = createTemporaryPath(conf);
+    try {
+      dir.getFileSystem(conf).mkdirs(dir);
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot create job output directory " + dir, e);
+    }
+    return dir;
+  }
+
+  private static Path createTemporaryPath(Configuration conf) {
+    //TODO: allow configurable
+    String baseDir = conf.get("crunch.tmp.dir", "/tmp");
+    return new Path(baseDir, "crunch-" + (RANDOM.nextInt() & Integer.MAX_VALUE));
+  }
+
+  @Override
+  public <T> void writeTextFile(PCollection<T> pcollection, String pathName) {
+    pcollection.parallelDo("asText", new StringifyFn<T>(), Writables.strings())
+        .write(To.textFile(pathName));
+  }
+
+  private static class StringifyFn<T> extends MapFn<T, String> {
+    @Override
+    public String map(T input) {
+      return input.toString();
+    }
+  }
+
+  @Override
+  public void cleanup(boolean force) {
+    if (force || outputTargets.isEmpty()) {
+      try {
+        FileSystem fs = tempDirectory.getFileSystem(conf);
+        if (fs.exists(tempDirectory)) {
+          fs.delete(tempDirectory, true);
+        }
+      } catch (IOException e) {
+        LOG.info("Exception during cleanup", e);
+      }
+    } else {
+      LOG.warn("Not running cleanup while output targets remain.");
+    }
+  }
+
+  private void cleanup() {
+    cleanup(false);
+  }
+
+  public int getNextAnonymousStageId() {
+    return nextAnonymousStageId++;
+  }
+
+  @Override
+  public void enableDebug() {
+    // Turn on Crunch runtime error catching.
+    //TODO: allow configurable
+    getConfiguration().setBoolean("crunch.debug", true);
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoCollection.java
new file mode 100644
index 0000000..bb1d054
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoCollection.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.util.DelegatingReadableData;
+
+import java.util.List;
+
+public class BaseDoCollection<S> extends PCollectionImpl<S> {
+
+  private final PCollectionImpl<Object> parent;
+  protected final DoFn<Object, S> fn;
+  protected final PType<S> ptype;
+
+  protected <T> BaseDoCollection(
+      String name,
+      PCollectionImpl<T> parent,
+      DoFn<T, S> fn,
+      PType<S> ptype,
+      ParallelDoOptions options) {
+    super(name, parent.getPipeline(), options);
+    this.parent = (PCollectionImpl<Object>) parent;
+    this.fn = (DoFn<Object, S>) fn;
+    this.ptype = ptype;
+  }
+
+  @Override
+  protected long getSizeInternal() {
+    return (long) (fn.scaleFactor() * parent.getSize());
+  }
+
+  @Override
+  protected ReadableData<S> getReadableDataInternal() {
+    if (getOnlyParent() instanceof BaseGroupedTable) {
+      return materializedData();
+    }
+    return new DelegatingReadableData(getOnlyParent().asReadable(false), fn);
+  }
+
+  @Override
+  public PType<S> getPType() {
+    return ptype;
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.<PCollectionImpl<?>> of(parent);
+  }
+
+  @Override
+  public long getLastModifiedAt() {
+    return parent.getLastModifiedAt();
+  }
+
+  @Override
+  protected void acceptInternal(Visitor visitor) {
+    visitor.visitDoCollection(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoTable.java
new file mode 100644
index 0000000..4c5569e
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseDoTable.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.util.DelegatingReadableData;
+
+import java.util.List;
+
+public class BaseDoTable<K, V> extends PTableBase<K, V> implements PTable<K, V> {
+
+  private final PCollectionImpl<?> parent;
+  protected final DoFn<?, Pair<K, V>> combineFn;
+  protected final DoFn<?, Pair<K, V>> fn;
+  protected final PTableType<K, V> type;
+
+  private static <S, K, V> CombineFn<K, V> asCombineFn(final DoFn<S, Pair<K, V>> fn) {
+    if (fn instanceof CombineFn) {
+      return (CombineFn) fn;
+    }
+    return null;
+  }
+
+  protected <S> BaseDoTable(String name, PCollectionImpl<S> parent, DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype,
+                            ParallelDoOptions options) {
+    this(name, parent, asCombineFn(fn), fn, ntype, options);
+  }
+
+  protected <S> BaseDoTable(
+      String name,
+      PCollectionImpl<S> parent,
+      CombineFn<K, V> combineFn,
+      DoFn<S, Pair<K, V>> fn,
+      PTableType<K, V> ntype) {
+    this(name, parent, combineFn, fn, ntype, ParallelDoOptions.builder().build());
+  }
+
+  protected <S> BaseDoTable(
+      String name,
+      PCollectionImpl<S> parent,
+      CombineFn<K, V> combineFn,
+      DoFn<S, Pair<K, V>> fn,
+      PTableType<K, V> ntype,
+      ParallelDoOptions options) {
+    super(name, parent.getPipeline(), options);
+    this.parent = parent;
+    this.combineFn = combineFn;
+    this.fn = fn;
+    this.type = ntype;
+  }
+
+  @Override
+  protected long getSizeInternal() {
+    return (long) (fn.scaleFactor() * parent.getSize());
+  }
+
+  @Override
+  public PTableType<K, V> getPTableType() {
+    return type;
+  }
+
+  @Override
+  protected ReadableData<Pair<K, V>> getReadableDataInternal() {
+    if (getOnlyParent() instanceof BaseGroupedTable) {
+      return materializedData();
+    }
+    return new DelegatingReadableData(getOnlyParent().asReadable(false), fn);
+  }
+
+  @Override
+  public PType<Pair<K, V>> getPType() {
+    return type;
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.<PCollectionImpl<?>> of(parent);
+  }
+
+  @Override
+  public long getLastModifiedAt() {
+    return parent.getLastModifiedAt();
+  }
+
+  @Override
+  protected void acceptInternal(Visitor visitor) {
+    visitor.visitDoTable(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
new file mode 100644
index 0000000..24cbaf5
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.crunch.Aggregator;
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.lib.PTables;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.PType;
+
+import java.util.List;
+import java.util.Set;
+
+public class BaseGroupedTable<K, V> extends PCollectionImpl<Pair<K, Iterable<V>>>
+    implements PGroupedTable<K, V> {
+
+  protected final PTableBase<K, V> parent;
+  protected final GroupingOptions groupingOptions;
+  protected final PGroupedTableType<K, V> ptype;
+
+  protected BaseGroupedTable(PTableBase<K, V> parent) {
+    this(parent, null);
+  }
+
+  protected BaseGroupedTable(PTableBase<K, V> parent, GroupingOptions groupingOptions) {
+    super("GBK", parent.getPipeline());
+    this.parent = parent;
+    this.groupingOptions = groupingOptions;
+    this.ptype = parent.getPTableType().getGroupedTableType();
+  }
+
+  @Override
+  protected ReadableData<Pair<K, Iterable<V>>> getReadableDataInternal() {
+    throw new UnsupportedOperationException("PGroupedTable does not currently support readability");
+  }
+
+  @Override
+  protected long getSizeInternal() {
+    return parent.getSizeInternal();
+  }
+
+  @Override
+  public PType<Pair<K, Iterable<V>>> getPType() {
+    return ptype;
+  }
+
+  @Override
+  public PTable<K, V> combineValues(CombineFn<K, V> combineFn, CombineFn<K, V> reduceFn) {
+      return pipeline.getFactory().createDoTable(
+          "combine",
+          getChainingCollection(),
+          combineFn,
+          reduceFn,
+          parent.getPTableType());
+  }
+
+  @Override
+  public PTable<K, V> combineValues(CombineFn<K, V> combineFn) {
+    return combineValues(combineFn, combineFn);
+  }
+
+  @Override
+  public PTable<K, V> combineValues(Aggregator<V> agg) {
+    return combineValues(Aggregators.<K, V>toCombineFn(agg));
+  }
+
+  @Override
+  public PTable<K, V> combineValues(Aggregator<V> combineAgg, Aggregator<V> reduceAgg) {
+    return combineValues(Aggregators.<K, V>toCombineFn(combineAgg), Aggregators.<K, V>toCombineFn(reduceAgg));
+  }
+
+  private static class Ungroup<K, V> extends DoFn<Pair<K, Iterable<V>>, Pair<K, V>> {
+    @Override
+    public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
+      for (V v : input.second()) {
+        emitter.emit(Pair.of(input.first(), v));
+      }
+    }
+  }
+
+  @Override
+  public PTable<K, V> ungroup() {
+    return parallelDo("ungroup", new Ungroup<K, V>(), parent.getPTableType());
+  }
+
+  @Override
+  public <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
+    return PTables.mapValues(this, mapFn, ptype);
+  }
+
+  @Override
+  public <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
+    return PTables.mapValues(name, this, mapFn, ptype);
+  }
+
+  @Override
+  public PGroupedTableType<K, V> getGroupedTableType() {
+    return ptype;
+  }
+
+  @Override
+  public Set<SourceTarget<?>> getTargetDependencies() {
+    Set<SourceTarget<?>> td = Sets.newHashSet(super.getTargetDependencies());
+    if (groupingOptions != null) {
+      td.addAll(groupingOptions.getSourceTargets());
+    }
+    return ImmutableSet.copyOf(td);
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.<PCollectionImpl<?>> of(parent);
+  }
+
+  @Override
+  public long getLastModifiedAt() {
+    return parent.getLastModifiedAt();
+  }
+
+  @Override
+  protected void acceptInternal(Visitor visitor) {
+    visitor.visitGroupedTable(this);
+  }
+
+
+  @Override
+  protected PCollectionImpl<Pair<K, Iterable<V>>> getChainingCollection() {
+    // Use a copy for chaining to allow sending the output of a single grouped table to multiple outputs
+    // TODO This should be implemented in a cleaner way in the planner
+    return pipeline.getFactory().createGroupedTable(parent, groupingOptions);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java
new file mode 100644
index 0000000..641a3cb
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.Source;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.types.PType;
+
+import java.util.List;
+
+public class BaseInputCollection<S> extends PCollectionImpl<S> {
+
+  protected final Source<S> source;
+
+  public BaseInputCollection(Source<S> source, DistributedPipeline pipeline) {
+    super(source.toString(), pipeline);
+    this.source = source;
+  }
+
+  @Override
+  protected ReadableData<S> getReadableDataInternal() {
+    if (source instanceof ReadableSource) {
+      return ((ReadableSource<S>) source).asReadable();
+    } else {
+      return materializedData();
+    }
+  }
+
+  @Override
+  protected void acceptInternal(Visitor visitor) {
+    visitor.visitInputCollection(this);
+  }
+
+  @Override
+  public PType<S> getPType() {
+    return source.getType();
+  }
+
+  public Source<S> getSource() {
+    return source;
+  }
+
+  @Override
+  protected long getSizeInternal() {
+    long sz = source.getSize(pipeline.getConfiguration());
+    if (sz < 0) {
+      throw new IllegalStateException("Input source " + source + " does not exist!");
+    }
+    return sz;
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public long getLastModifiedAt() {
+    return source.getLastModifiedAt(pipeline.getConfiguration());
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || !(obj instanceof BaseInputCollection)) {
+      return false;
+    }
+    return source.equals(((BaseInputCollection) obj).source);
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(source).toHashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java
new file mode 100644
index 0000000..f41895a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputTable.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+import java.util.List;
+
+public class BaseInputTable<K, V> extends PTableBase<K, V> {
+
+  protected final TableSource<K, V> source;
+  protected final BaseInputCollection<Pair<K, V>> asCollection;
+
+  public BaseInputTable(TableSource<K, V> source, DistributedPipeline pipeline) {
+    super(source.toString(), pipeline);
+    this.source = source;
+    this.asCollection = pipeline.getFactory().createInputCollection(source, pipeline);
+  }
+
+  public TableSource<K, V> getSource() {
+    return source;
+  }
+  
+  @Override
+  protected long getSizeInternal() {
+    return asCollection.getSizeInternal();
+  }
+
+  @Override
+  public PTableType<K, V> getPTableType() {
+    return source.getTableType();
+  }
+
+
+  @Override
+  public PType<Pair<K, V>> getPType() {
+    return source.getType();
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  protected ReadableData<Pair<K, V>> getReadableDataInternal() {
+    return asCollection.getReadableDataInternal();
+  }
+
+  @Override
+  public long getLastModifiedAt() {
+    return source.getLastModifiedAt(pipeline.getConfiguration());
+  }
+
+  @Override
+  protected void acceptInternal(Visitor visitor) {
+    visitor.visitInputCollection(asCollection);
+  }
+
+  @Override
+  public int hashCode() {
+    return asCollection.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    return asCollection.equals(other);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionCollection.java
new file mode 100644
index 0000000..ef10ee7
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionCollection.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.util.UnionReadableData;
+
+import java.util.List;
+
+public class BaseUnionCollection<S> extends PCollectionImpl<S> {
+
+  private List<PCollectionImpl<S>> parents;
+  private long size = 0;
+  private long lastModifiedAt = -1;
+  
+  private static String flatName(List<? extends PCollectionImpl> collections) {
+    StringBuilder sb = new StringBuilder("union(");
+    for (int i = 0; i < collections.size(); i++) {
+      if (i != 0) {
+        sb.append(',');
+      }
+      sb.append(collections.get(i).getName());
+    }
+    return sb.append(')').toString();
+  }
+
+  protected BaseUnionCollection(List<? extends PCollectionImpl<S>> collections) {
+    super(flatName(collections), collections.get(0).getPipeline());
+    this.parents = ImmutableList.copyOf(collections);
+    for (PCollectionImpl<S> parent : parents) {
+      if (this.pipeline != parent.getPipeline()) {
+        throw new IllegalStateException("Cannot union PCollections from different Pipeline instances");
+      }
+      size += parent.getSize();
+      if (parent.getLastModifiedAt() > lastModifiedAt) {
+        this.lastModifiedAt = parent.getLastModifiedAt();
+      }
+    }
+  }
+
+
+  @Override
+  public void setBreakpoint() {
+    super.setBreakpoint();
+    for (PCollectionImpl<?> parent : getParents()) {
+      parent.setBreakpoint();
+    }
+  }
+
+  @Override
+  protected long getSizeInternal() {
+    return size;
+  }
+
+  @Override
+  public long getLastModifiedAt() {
+    return lastModifiedAt;
+  }
+  
+  @Override
+  protected ReadableData<S> getReadableDataInternal() {
+    List<ReadableData<S>> prds = Lists.newArrayList();
+    for (PCollectionImpl<S> parent : parents) {
+      if (parent instanceof BaseGroupedTable) {
+        return materializedData();
+      } else {
+        prds.add(parent.asReadable(false));
+      }
+    }
+    return new UnionReadableData<S>(prds);
+  }
+
+  @Override
+  public PType<S> getPType() {
+    return parents.get(0).getPType();
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.<PCollectionImpl<?>> copyOf(parents);
+  }
+
+  @Override
+  protected void acceptInternal(Visitor visitor) {
+    visitor.visitUnionCollection(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionTable.java
new file mode 100644
index 0000000..4d688c3
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseUnionTable.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.util.UnionReadableData;
+
+import java.util.List;
+
+public class BaseUnionTable<K, V> extends PTableBase<K, V> {
+
+  private PTableType<K, V> ptype;
+  private List<PCollectionImpl<Pair<K, V>>> parents;
+  private long size;
+  private long lastModifiedAt = -1;
+  
+  private static <K, V> String flatName(List<PTableBase<K, V>> tables) {
+    StringBuilder sb = new StringBuilder("union(");
+    for (int i = 0; i < tables.size(); i++) {
+      if (i != 0) {
+        sb.append(',');
+      }
+      sb.append(tables.get(i).getName());
+    }
+    return sb.append(')').toString();
+  }
+
+  protected BaseUnionTable(List<PTableBase<K, V>> tables) {
+    super(flatName(tables), tables.get(0).getPipeline());
+    this.ptype = tables.get(0).getPTableType();
+    this.pipeline = tables.get(0).getPipeline();
+    this.parents = Lists.newArrayList();
+    for (PTableBase<K, V> parent : tables) {
+      if (pipeline != parent.getPipeline()) {
+        throw new IllegalStateException("Cannot union PTables from different Pipeline instances");
+      }
+      this.parents.add(parent);
+      size += parent.getSize();
+      if (parent.getLastModifiedAt() > lastModifiedAt) {
+        this.lastModifiedAt = parent.getLastModifiedAt();
+      }
+    }
+  }
+
+  @Override
+  public void setBreakpoint() {
+    super.setBreakpoint();
+    for (PCollectionImpl<?> parent : getParents()) {
+      parent.setBreakpoint();
+    }
+  }
+
+  @Override
+  protected long getSizeInternal() {
+    return size;
+  }
+
+  @Override
+  public long getLastModifiedAt() {
+    return lastModifiedAt;
+  }
+  
+  @Override
+  public PTableType<K, V> getPTableType() {
+    return ptype;
+  }
+
+  @Override
+  public PType<Pair<K, V>> getPType() {
+    return ptype;
+  }
+
+  @Override
+  public List<PCollectionImpl<?>> getParents() {
+    return ImmutableList.<PCollectionImpl<?>>copyOf(parents);
+  }
+
+  @Override
+  protected void acceptInternal(Visitor visitor) {
+    visitor.visitUnionCollection(pipeline.getFactory().createUnionCollection(parents));
+  }
+
+  @Override
+  protected ReadableData<Pair<K, V>> getReadableDataInternal() {
+    List<ReadableData<Pair<K, V>>> prds = Lists.newArrayList();
+    for (PCollectionImpl<Pair<K, V>> parent : parents) {
+      if (parent instanceof BaseGroupedTable) {
+        return materializedData();
+      } else {
+        prds.add(parent.asReadable(false));
+      }
+    }
+    return new UnionReadableData<Pair<K, V>>(prds);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/MRCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/MRCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/MRCollection.java
new file mode 100644
index 0000000..8106282
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/MRCollection.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import org.apache.crunch.impl.mr.plan.DoNode;
+
+public interface MRCollection {
+  DoNode createDoNode();
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java
new file mode 100644
index 0000000..a176aa1
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionFactory.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.Source;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+import java.util.List;
+
+public interface PCollectionFactory {
+
+  <S> BaseInputCollection<S> createInputCollection(Source<S> source, DistributedPipeline distributedPipeline);
+
+  <K, V> BaseInputTable<K, V> createInputTable(TableSource<K,V> source, DistributedPipeline distributedPipeline);
+
+  <S> BaseUnionCollection<S> createUnionCollection(List<? extends PCollectionImpl<S>> internal);
+
+  <S, T> BaseDoCollection<T> createDoCollection(
+      String name,
+      PCollectionImpl<S> chainingCollection,
+      DoFn<S,T> fn,
+      PType<T> type,
+      ParallelDoOptions options);
+
+  <S, K, V> BaseDoTable<K, V> createDoTable(
+      String name,
+      PCollectionImpl<S> chainingCollection,
+      DoFn<S,Pair<K, V>> fn,
+      PTableType<K, V> type,
+      ParallelDoOptions options);
+
+  <S, K, V> BaseDoTable<K, V> createDoTable(
+      String name,
+      PCollectionImpl<S> chainingCollection,
+      CombineFn<K, V> combineFn,
+      DoFn<S,Pair<K, V>> fn,
+      PTableType<K, V> type);
+
+  <K, V> BaseGroupedTable<K, V> createGroupedTable(PTableBase<K,V> parent, GroupingOptions groupingOptions);
+
+  <K, V> PTable<K, V> createUnionTable(List<PTableBase<K, V>> internal);
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
new file mode 100644
index 0000000..ee820f0
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.crunch.CachingOptions;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.FilterFn;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PObject;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
+import org.apache.crunch.fn.ExtractKeyFn;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.materialize.pobject.CollectionPObject;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public abstract class PCollectionImpl<S> implements PCollection<S> {
+
+  private final String name;
+  protected DistributedPipeline pipeline;
+  private boolean materialized;
+  protected SourceTarget<S> materializedAt;
+  protected final ParallelDoOptions doOptions;
+  private long size = -1L;
+  private boolean breakpoint;
+
+  public PCollectionImpl(String name, DistributedPipeline pipeline) {
+    this(name, pipeline, ParallelDoOptions.builder().build());
+  }
+  
+  public PCollectionImpl(String name, DistributedPipeline pipeline, ParallelDoOptions doOptions) {
+    this.name = name;
+    this.pipeline = pipeline;
+    this.doOptions = doOptions;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public DistributedPipeline getPipeline() {
+    return pipeline;
+  }
+
+  public ParallelDoOptions getParallelDoOptions() {
+    return doOptions;
+  }
+
+  @Override
+  public String toString() {
+    return getName();
+  }
+
+  @Override
+  public Iterable<S> materialize() {
+    if (getSize() == 0) {
+      System.err.println("Materializing an empty PCollection: " + this.getName());
+      return Collections.emptyList();
+    }
+    materialized = true;
+    return pipeline.materialize(this);
+  }
+
+  @Override
+  public PCollection<S> cache() {
+    return cache(CachingOptions.DEFAULT);
+  }
+
+  @Override
+  public PCollection<S> cache(CachingOptions options) {
+    pipeline.cache(this, options);
+    return this;
+  }
+
+  @Override
+  public PCollection<S> union(PCollection<S> other) {
+    return union(new PCollection[] { other });
+  }
+  
+  @Override
+  public PCollection<S> union(PCollection<S>... collections) {
+    List<PCollectionImpl<S>> internal = Lists.newArrayList();
+    internal.add(this);
+    for (PCollection<S> collection : collections) {
+      internal.add((PCollectionImpl<S>) collection.parallelDo(IdentityFn.<S>getInstance(), collection.getPType()));
+    }
+    return pipeline.getFactory().createUnionCollection(internal);
+  }
+
+  @Override
+  public <T> PCollection<T> parallelDo(DoFn<S, T> fn, PType<T> type) {
+    return parallelDo("S" + pipeline.getNextAnonymousStageId(), fn, type);
+  }
+
+  @Override
+  public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type) {
+    return parallelDo(name, fn, type, ParallelDoOptions.builder().build());
+  }
+
+  @Override
+  public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type,
+      ParallelDoOptions options) {
+    return pipeline.getFactory().createDoCollection(name, getChainingCollection(), fn, type, options);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
+    return parallelDo("S" + pipeline.getNextAnonymousStageId(), fn, type);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
+    return parallelDo(name, fn, type, ParallelDoOptions.builder().build());
+  }
+
+  @Override
+  public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> fn, PTableType<K, V> type,
+      ParallelDoOptions options) {
+    return pipeline.getFactory().createDoTable(name, getChainingCollection(), fn, type, options);
+  }
+
+  public PCollection<S> write(Target target) {
+    if (materializedAt != null) {
+      getPipeline().write(pipeline.getFactory().createInputCollection(materializedAt, pipeline), target);
+    } else {
+      getPipeline().write(this, target);
+    }
+    return this;
+  }
+
+  @Override
+  public PCollection<S> write(Target target, Target.WriteMode writeMode) {
+    if (materializedAt != null) {
+      getPipeline().write(
+          pipeline.getFactory().createInputCollection(materializedAt, pipeline),
+          target,
+          writeMode);
+    } else {
+      getPipeline().write(this, target, writeMode);
+    }
+    return this;
+  }
+
+  public interface Visitor {
+    void visitInputCollection(BaseInputCollection<?> collection);
+
+    void visitUnionCollection(BaseUnionCollection<?> collection);
+
+    void visitDoCollection(BaseDoCollection<?> collection);
+
+    void visitDoTable(BaseDoTable<?, ?> collection);
+
+    void visitGroupedTable(BaseGroupedTable<?, ?> collection);
+  }
+
+  public void accept(Visitor visitor) {
+    if (materializedAt != null) {
+      visitor.visitInputCollection(pipeline.getFactory().createInputCollection(materializedAt, pipeline));
+    } else {
+      acceptInternal(visitor);
+    }
+  }
+
+  protected abstract void acceptInternal(Visitor visitor);
+
+  public void setBreakpoint() {
+    this.breakpoint = true;
+  }
+
+  public boolean isBreakpoint() {
+    return breakpoint;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public PObject<Collection<S>> asCollection() {
+    return new CollectionPObject<S>(this);
+  }
+
+  public SourceTarget<S> getMaterializedAt() {
+    return materializedAt;
+  }
+
+  public void materializeAt(SourceTarget<S> sourceTarget) {
+    this.materializedAt = sourceTarget;
+    this.size = materializedAt.getSize(getPipeline().getConfiguration());
+  }
+
+  @Override
+  public PCollection<S> filter(FilterFn<S> filterFn) {
+    return parallelDo(filterFn, getPType());
+  }
+
+  @Override
+  public PCollection<S> filter(String name, FilterFn<S> filterFn) {
+    return parallelDo(name, filterFn, getPType());
+  }
+
+  @Override
+  public <K> PTable<K, S> by(MapFn<S, K> mapFn, PType<K> keyType) {
+    return parallelDo(new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
+  }
+
+  @Override
+  public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) {
+    return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
+  }
+
+  @Override
+  public PTable<S, Long> count() {
+    return Aggregate.count(this);
+  }
+
+  @Override
+  public PObject<Long> length() {
+    return Aggregate.length(this);
+  }
+
+  @Override
+  public PObject<S> max() {
+    return Aggregate.max(this);
+  }
+
+  @Override
+  public PObject<S> min() {
+    return Aggregate.min(this);
+  }
+
+  @Override
+  public PTypeFamily getTypeFamily() {
+    return getPType().getFamily();
+  }
+
+  public abstract List<PCollectionImpl<?>> getParents();
+
+  public PCollectionImpl<?> getOnlyParent() {
+    List<PCollectionImpl<?>> parents = getParents();
+    if (parents.size() != 1) {
+      throw new IllegalArgumentException("Expected exactly one parent PCollection");
+    }
+    return parents.get(0);
+  }
+
+  public Set<SourceTarget<?>> getTargetDependencies() {
+    Set<SourceTarget<?>> targetDeps = doOptions.getSourceTargets();
+    for (PCollectionImpl<?> parent : getParents()) {
+      targetDeps = Sets.union(targetDeps, parent.getTargetDependencies());
+    }
+    return targetDeps;
+  }
+
+  public int getDepth() {
+    int parentMax = 0;
+    for (PCollectionImpl parent : getParents()) {
+      parentMax = Math.max(parent.getDepth(), parentMax);
+    }
+    return 1 + parentMax;
+  }
+
+  @Override
+  public ReadableData<S> asReadable(boolean materialize) {
+    if (materializedAt != null && (materializedAt instanceof ReadableSource)) {
+      return ((ReadableSource) materializedAt).asReadable();
+    } else if (materialized || materialize) {
+      return pipeline.getMaterializeSourceTarget(this).asReadable();
+    } else {
+      return getReadableDataInternal();
+    }
+  }
+
+  protected ReadableData<S> materializedData() {
+    materialized = true;
+    return pipeline.getMaterializeSourceTarget(this).asReadable();
+  }
+
+  protected abstract ReadableData<S> getReadableDataInternal();
+
+  @Override
+  public long getSize() {
+    if (size < 0) {
+      this.size = getSizeInternal();
+    }
+    return size;
+  }
+
+  protected abstract long getSizeInternal();
+  
+  public abstract long getLastModifiedAt();
+  
+  /**
+   * Retrieve the PCollectionImpl to be used for chaining within PCollectionImpls further down the pipeline.
+   * @return The PCollectionImpl instance to be chained
+   */
+  protected PCollectionImpl<S> getChainingCollection() {
+    return this;
+  }
+  
+}


Mime
View raw message