crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [7/9] Generalizing Crunch's Collection APIs to support more execution frameworks
Date Wed, 11 Dec 2013 20:47:53 GMT
http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
index 8f99a0b..67c624d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
@@ -26,10 +26,9 @@ import com.google.common.collect.Maps;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.lang.builder.ReflectionToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
-import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
-import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+import org.apache.crunch.impl.dist.collect.BaseGroupedTable;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -65,17 +64,6 @@ class Edge {
     return paths;
   }
 
-  private static boolean readWriteOutput(PCollectionImpl<?> pc, Map<PCollectionImpl<?>,
Set<Target>> outputs) {
-    if (outputs.containsKey(pc)) {
-      for (Target t : outputs.get(pc)) {
-        if (t instanceof SourceTarget || t.asSourceTarget(pc.getPType()) != null) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
   public Map<NodePath,  PCollectionImpl> getSplitPoints(Map<PCollectionImpl<?>,
Set<Target>> outputs) {
     List<NodePath> np = Lists.newArrayList(paths);
     List<PCollectionImpl<?>> smallestOverallPerPath = Lists.newArrayListWithExpectedSize(np.size());
@@ -86,7 +74,7 @@ class Edge {
       boolean breakpoint = false;
       PCollectionImpl<?> best = null;
       for (PCollectionImpl<?> pc : np.get(i)) {
-        if (!(pc instanceof PGroupedTableImpl)) {
+        if (!(pc instanceof BaseGroupedTable)) {
           if (pc.isBreakpoint()) {
             if (!breakpoint || pc.getSize() < bestSize) {
               best = pc;

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java
index ce0a847..220bf19 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java
@@ -23,7 +23,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.crunch.Pair;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java
index 925c39a..a18cda0 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java
@@ -17,12 +17,12 @@
  */
 package org.apache.crunch.impl.mr.plan;
 
-import org.apache.crunch.impl.mr.collect.DoCollectionImpl;
-import org.apache.crunch.impl.mr.collect.DoTableImpl;
-import org.apache.crunch.impl.mr.collect.InputCollection;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
-import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
-import org.apache.crunch.impl.mr.collect.UnionCollection;
+import org.apache.crunch.impl.dist.collect.BaseDoCollection;
+import org.apache.crunch.impl.dist.collect.BaseDoTable;
+import org.apache.crunch.impl.dist.collect.BaseGroupedTable;
+import org.apache.crunch.impl.dist.collect.BaseInputCollection;
+import org.apache.crunch.impl.dist.collect.BaseUnionCollection;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 
 /**
  *
@@ -44,13 +44,13 @@ class GraphBuilder implements PCollectionImpl.Visitor {
   }
   
   @Override
-  public void visitInputCollection(InputCollection<?> collection) {
+  public void visitInputCollection(BaseInputCollection<?> collection) {
     Vertex v = graph.addVertex(collection, false);
     graph.getEdge(v, workingVertex).addNodePath(workingPath.close(collection));
   }
 
   @Override
-  public void visitUnionCollection(UnionCollection<?> collection) {
+  public void visitUnionCollection(BaseUnionCollection<?> collection) {
     Vertex baseVertex = workingVertex;
     NodePath basePath = workingPath;
     for (PCollectionImpl<?> parent : collection.getParents()) {
@@ -61,19 +61,19 @@ class GraphBuilder implements PCollectionImpl.Visitor {
   }
 
   @Override
-  public void visitDoFnCollection(DoCollectionImpl<?> collection) {
+  public void visitDoCollection(BaseDoCollection<?> collection) {
     workingPath.push(collection);
     processParent(collection.getOnlyParent());
   }
 
   @Override
-  public void visitDoTable(DoTableImpl<?, ?> collection) {
+  public void visitDoTable(BaseDoTable<?, ?> collection) {
     workingPath.push(collection);
     processParent(collection.getOnlyParent());
   }
 
   @Override
-  public void visitGroupedTable(PGroupedTableImpl<?, ?> collection) {
+  public void visitGroupedTable(BaseGroupedTable<?, ?> collection) {
     Vertex v = graph.addVertex(collection, false);
     graph.getEdge(v, workingVertex).addNodePath(workingPath.close(collection));
     workingVertex = v;

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index a192a22..e7a1e17 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -26,8 +26,9 @@ import java.util.Set;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.Target;
 import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
-import org.apache.crunch.impl.mr.collect.DoTableImpl;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.DoTable;
+import org.apache.crunch.impl.dist.collect.MRCollection;
 import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
 import org.apache.crunch.impl.mr.exec.CrunchJobHooks;
 import org.apache.crunch.impl.mr.run.CrunchCombiner;
@@ -69,7 +70,7 @@ class JobPrototype {
 
   private HashMultimap<Target, NodePath> mapSideNodePaths;
   private HashMultimap<Target, NodePath> targetsToNodePaths;
-  private DoTableImpl<?, ?> combineFnTable;
+  private DoTable<?, ?> combineFnTable;
 
   private CrunchControlledJob job;
 
@@ -200,7 +201,7 @@ class JobPrototype {
       Set<DoNode> mapNodes = Sets.newHashSet(mapSideNodes);
       for (NodePath nodePath : mapNodePaths) {
         // Advance these one step, since we've already configured
-        // the grouping node, and the PGroupedTableImpl is the tail
+        // the grouping node, and the BaseGroupedTable is the tail
         // of the NodePath.
         Iterator<PCollectionImpl<?>> iter = nodePath.descendingIterator();
         iter.next();
@@ -256,11 +257,11 @@ class JobPrototype {
       PCollectionImpl<?> collect = iter.next();
       if (combineFnTable != null && !(collect instanceof PGroupedTableImpl)) {
         combineFnTable = null;
-      } else if (collect instanceof DoTableImpl && ((DoTableImpl<?, ?>) collect).hasCombineFn())
{
-        combineFnTable = (DoTableImpl<?, ?>) collect;
+      } else if (collect instanceof DoTable && ((DoTable<?, ?>) collect).hasCombineFn())
{
+        combineFnTable = (DoTable<?, ?>) collect;
       }
       if (!nodes.containsKey(collect)) {
-        nodes.put(collect, collect.createDoNode());
+        nodes.put(collect, ((MRCollection) collect).createDoNode());
       }
       DoNode parent = nodes.get(collect);
       parent.addChild(working);

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 96c9125..97ac866 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -26,9 +26,9 @@ import java.util.TreeMap;
 
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.collect.InputCollection;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
 import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
 import org.apache.crunch.impl.mr.exec.MRExecutor;
 import org.apache.crunch.materialize.MaterializableIterable;
@@ -57,7 +57,7 @@ public class MSCRPlanner {
 
   // Used to ensure that we always build pipelines starting from the deepest
   // outputs, which helps ensure that we handle intermediate outputs correctly.
-  private static final Comparator<PCollectionImpl<?>> DEPTH_COMPARATOR = new
Comparator<PCollectionImpl<?>>() {
+  static final Comparator<PCollectionImpl<?>> DEPTH_COMPARATOR = new Comparator<PCollectionImpl<?>>()
{
     @Override
     public int compare(PCollectionImpl<?> left, PCollectionImpl<?> right) {
       int cmp = right.getDepth() - left.getDepth();

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
index a090d93..03d39af 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
@@ -20,7 +20,7 @@ package org.apache.crunch.impl.mr.plan;
 import java.util.Iterator;
 import java.util.LinkedList;
 
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 
 import com.google.common.collect.Lists;
 
@@ -41,7 +41,7 @@ class NodePath implements Iterable<PCollectionImpl<?>> {
   }
 
   public void push(PCollectionImpl<?> stage) {
-    this.path.push((PCollectionImpl<?>) stage);
+    this.path.push(stage);
   }
 
   public NodePath close(PCollectionImpl<?> head) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
index f4aa668..1a77e58 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
@@ -23,9 +23,9 @@ import java.util.Set;
 import org.apache.commons.lang.builder.ReflectionToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
 import org.apache.crunch.Source;
-import org.apache.crunch.impl.mr.collect.InputCollection;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
-import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+import org.apache.crunch.impl.dist.collect.BaseGroupedTable;
+import org.apache.crunch.impl.dist.collect.BaseInputCollection;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -51,11 +51,11 @@ class Vertex {
   }
   
   public boolean isInput() {
-    return impl instanceof InputCollection;
+    return impl instanceof BaseInputCollection;
   }
   
   public boolean isGBK() {
-    return impl instanceof PGroupedTableImpl;
+    return impl instanceof BaseGroupedTable;
   }
   
   public void setOutput() {
@@ -68,7 +68,7 @@ class Vertex {
   
   public Source getSource() {
     if (isInput()) {
-      return ((InputCollection) impl).getSource();
+      return ((BaseInputCollection) impl).getSource();
     }
     return null;
   }
@@ -92,10 +92,6 @@ class Vertex {
     return n;
   }
   
-  public Set<Edge> getAllEdges() {
-    return Sets.union(incoming, outgoing);
-  }
-  
   public Set<Edge> getIncomingEdges() {
     return incoming;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
index 1f542df..bda6f1a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
@@ -20,8 +20,8 @@ package org.apache.crunch.impl.mr.run;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 
 import org.apache.crunch.io.FormatBundle;
 import org.apache.hadoop.conf.Configurable;
@@ -98,27 +98,40 @@ class CrunchInputSplit extends InputSplit implements Writable, Configurable
{
   }
 
   public void readFields(DataInput in) throws IOException {
+    if (conf == null) {
+      conf = new Configuration();
+    }
     nodeIndex = in.readInt();
     bundle = new FormatBundle();
     bundle.setConf(conf);
     bundle.readFields(in);
     bundle.configure(conf); // yay bootstrap!
     Class<? extends InputSplit> inputSplitClass = readClass(in);
-    inputSplit = ReflectionUtils.newInstance(inputSplitClass, conf);
-    SerializationFactory factory = new SerializationFactory(conf);
-    Deserializer deserializer = factory.getDeserializer(inputSplitClass);
-    deserializer.open((DataInputStream) in);
-    inputSplit = (InputSplit) deserializer.deserialize(inputSplit);
+    inputSplit = (InputSplit) ReflectionUtils.newInstance(inputSplitClass, conf);
+    if (inputSplit instanceof Writable) {
+      ((Writable) inputSplit).readFields(in);
+    } else {
+      SerializationFactory factory = new SerializationFactory(conf);
+      Deserializer deserializer = factory.getDeserializer(inputSplitClass);
+      deserializer.open((DataInputStream) in);
+      inputSplit = (InputSplit) deserializer.deserialize(inputSplit);
+      deserializer.close();
+    }
   }
 
   public void write(DataOutput out) throws IOException {
     out.writeInt(nodeIndex);
     bundle.write(out);
     Text.writeString(out, inputSplit.getClass().getName());
-    SerializationFactory factory = new SerializationFactory(conf);
-    Serializer serializer = factory.getSerializer(inputSplit.getClass());
-    serializer.open((DataOutputStream) out);
-    serializer.serialize(inputSplit);
+    if (inputSplit instanceof Writable) {
+      ((Writable) inputSplit).write(out);
+    } else {
+      SerializationFactory factory = new SerializationFactory(conf);
+      Serializer serializer = factory.getSerializer(inputSplit.getClass());
+      serializer.open((OutputStream) out);
+      serializer.serialize(inputSplit);
+      serializer.close();
+    }
   }
 
   private Class readClass(DataInput in) throws IOException {

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index 7472e3d..f3fd397 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -144,7 +144,11 @@ public class FileTargetImpl implements PathTarget {
   }
   
   protected Path getSourcePattern(Path workingPath, int index) {
-    return new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index + "-*");
+    if (index < 0) {
+      return new Path(workingPath, "part-*");
+    } else {
+      return new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index + "-*");
+    }
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java b/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java
index 32bff38..1c89cb9 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/SecondarySort.java
@@ -103,6 +103,7 @@ public class SecondarySort {
     PTableType<K, Collection<Pair<V1, V2>>> out = ptf.tableOf(input.getKeyType(),
         ptf.collections(input.getValueType()));
     GroupingOptions.Builder gob = GroupingOptions.builder()
+        .requireSortedKeys()
         .groupingComparatorClass(JoinUtils.getGroupingComparator(ptf))
         .partitionerClass(JoinUtils.getPartitionerClass(ptf));
     if (numReducers > 0) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java b/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java
index 011d9cd..3d4f68b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Sort.java
@@ -240,6 +240,7 @@ public class Sort {
     } else if (tf == AvroTypeFamily.getInstance()) {
       builder.conf("crunch.schema", ((AvroType<K>) ptype).getSchema().toString());
     }
+    builder.requireSortedKeys();
     configureReducers(builder, ptable, conf, numReducers);
     return builder.build();
   }
@@ -270,6 +271,7 @@ public class Sort {
     } else {
       throw new RuntimeException("Unrecognized type family: " + tf);
     }
+    builder.requireSortedKeys();
     configureReducers(builder, ptable, conf, numReducers);
     return builder.build();
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java
b/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java
index bfc8ab3..580510b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java
@@ -99,6 +99,7 @@ public class DefaultJoinStrategy<K, U, V> implements JoinStrategy<K,
U, V> {
         }, ptt);
 
     GroupingOptions.Builder optionsBuilder = GroupingOptions.builder();
+    optionsBuilder.requireSortedKeys();
     optionsBuilder.partitionerClass(JoinUtils.getPartitionerClass(ptf));
     if (numReducers > 0) {
       optionsBuilder.numReducers(numReducers);

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
index 3f031cb..c717ab7 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
@@ -91,10 +91,17 @@ public class JoinUtils {
     }
   }
 
-  public static class AvroIndexedRecordPartitioner<K, V> extends Partitioner<AvroKey<K>,
AvroValue<V>> {
+  public static class AvroIndexedRecordPartitioner extends Partitioner<Object, Object>
{
     @Override
-    public int getPartition(AvroKey<K> key, AvroValue<V> value, int numPartitions)
{
-      IndexedRecord record = (IndexedRecord) key.datum();
+    public int getPartition(Object key, Object value, int numPartitions) {
+      IndexedRecord record;
+      if (key instanceof AvroWrapper) {
+        record = (IndexedRecord) ((AvroWrapper) key).datum();
+      } else if (key instanceof IndexedRecord) {
+        record = (IndexedRecord) key;
+      } else {
+        throw new UnsupportedOperationException("Unknown avro key type: " + key);
+      }
       return (Math.abs(record.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
     }
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/util/DelegatingReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/util/DelegatingReadableData.java
b/crunch-core/src/main/java/org/apache/crunch/util/DelegatingReadableData.java
new file mode 100644
index 0000000..d852c4a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/util/DelegatingReadableData.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * Implements the {@code ReadableData<T>} interface by delegating to an {@code ReadableData<S>}
instance
+ * and passing its contents through a {@code DoFn<S, T>}.
+ */
+public class DelegatingReadableData<S, T> implements ReadableData<T> {
+
+  private final ReadableData<S> delegate;
+  private final DoFn<S, T> fn;
+
+  public DelegatingReadableData(ReadableData<S> delegate, DoFn<S, T> fn) {
+    this.delegate = delegate;
+    this.fn = fn;
+  }
+
+  @Override
+  public Set<SourceTarget<?>> getSourceTargets() {
+    return delegate.getSourceTargets();
+  }
+
+  @Override
+  public void configure(Configuration conf) {
+    delegate.configure(conf);
+    fn.configure(conf);
+  }
+
+  @Override
+  public Iterable<T> read(TaskInputOutputContext<?, ?, ?, ?> context) throws
IOException {
+    fn.setContext(context);
+    fn.initialize();
+    final Iterable<S> delegateIterable = delegate.read(context);
+    return new Iterable<T>() {
+      @Override
+      public Iterator<T> iterator() {
+        return new DoFnIterator<S, T>(delegateIterable.iterator(), fn);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/util/DoFnIterator.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/util/DoFnIterator.java b/crunch-core/src/main/java/org/apache/crunch/util/DoFnIterator.java
new file mode 100644
index 0000000..0877a8f
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/util/DoFnIterator.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import com.google.common.collect.Lists;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+
+/**
+ * An {@code Iterator<T>} that combines a delegate {@code Iterator<S>} and a
{@code DoFn<S, T>}, generating
+ * data by passing the contents of the iterator through the function. Note that the input
{@code DoFn} should
+ * have both its {@code setContext} and {@code initialize} functions called <b>before</b>
it is passed to
+ * the constructor.
+ *
+ * @param <S> The type of the delegate iterator
+ * @param <T> The returned type
+ */
+public class DoFnIterator<S, T> implements Iterator<T> {
+
+  private final Iterator<S> iter;
+  private final DoFn<S, T> fn;
+  private CacheEmitter<T> cache;
+  private boolean cleanup;
+
+  public DoFnIterator(Iterator<S> iter, DoFn<S, T> fn) {
+    this.iter = iter;
+    this.fn = fn;
+    this.cache = new CacheEmitter<T>();
+    this.cleanup = false;
+  }
+
+  @Override
+  public boolean hasNext() {
+    while (cache.isEmpty() && iter.hasNext()) {
+      fn.process(iter.next(), cache);
+    }
+    if (cache.isEmpty() && !cleanup) {
+      fn.cleanup(cache);
+      cleanup = true;
+    }
+    return !cache.isEmpty();
+  }
+
+  @Override
+  public T next() {
+    return cache.poll();
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+
+  private static class CacheEmitter<T> implements Emitter<T> {
+
+    private final LinkedList<T> cache;
+
+    private CacheEmitter() {
+      this.cache = Lists.newLinkedList();
+    }
+
+    public boolean isEmpty() {
+      return cache.isEmpty();
+    }
+
+    public T poll() {
+      return cache.poll();
+    }
+
+    @Override
+    public void emit(T emitted) {
+      cache.add(emitted);
+    }
+
+    @Override
+    public void flush() {
+      // No-op
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/main/java/org/apache/crunch/util/UnionReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/util/UnionReadableData.java b/crunch-core/src/main/java/org/apache/crunch/util/UnionReadableData.java
new file mode 100644
index 0000000..7d6f65b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/util/UnionReadableData.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.util;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+public class UnionReadableData<T> implements ReadableData<T> {
+
+  private final List<ReadableData<T>> data;
+
+  public UnionReadableData(List<ReadableData<T>> data) {
+    this.data = data;
+  }
+
+  @Override
+  public Set<SourceTarget<?>> getSourceTargets() {
+    Set<SourceTarget<?>> srcTargets = Sets.newHashSet();
+    for (ReadableData<T> rd: data) {
+      srcTargets.addAll(rd.getSourceTargets());
+    }
+    return srcTargets;
+  }
+
+  @Override
+  public void configure(Configuration conf) {
+   for (ReadableData<T> rd : data) {
+     rd.configure(conf);
+   }
+  }
+
+  @Override
+  public Iterable<T> read(final TaskInputOutputContext<?, ?, ?, ?> context) throws
IOException {
+    List<Iterable<T>> iterables = Lists.newArrayList();
+    for (ReadableData<T> rd : data) {
+      iterables.add(rd.read(context));
+    }
+    return Iterables.concat(iterables);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/test/java/org/apache/crunch/impl/dist/collect/DoCollectionTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/dist/collect/DoCollectionTest.java
b/crunch-core/src/test/java/org/apache/crunch/impl/dist/collect/DoCollectionTest.java
new file mode 100644
index 0000000..75b2c2c
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/dist/collect/DoCollectionTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+public class DoCollectionTest {
+
+  @Test
+  public void testGetSizeInternal_NoScaleFactor() {
+    runScaleTest(100L, 1.0f, 100L);
+  }
+
+  @Test
+  public void testGetSizeInternal_ScaleFactorBelowZero() {
+    runScaleTest(100L, 0.5f, 50L);
+  }
+
+  @Test
+  public void testGetSizeInternal_ScaleFactorAboveZero() {
+    runScaleTest(100L, 1.5f, 150L);
+  }
+
+  private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize) {
+    PCollectionImpl<String> parentCollection = new SizedPCollectionImpl("Sized collection",
inputSize);
+
+    BaseDoCollection<String> doCollection = new BaseDoCollection<String>("Scaled
collection", parentCollection,
+        new ScaledFunction(scaleFactor), Writables.strings(), ParallelDoOptions.builder().build());
+
+    assertEquals(expectedScaledSize, doCollection.getSizeInternal());
+  }
+
+  static class ScaledFunction extends DoFn<String, String> {
+
+    private float scaleFactor;
+
+    public ScaledFunction(float scaleFactor) {
+      this.scaleFactor = scaleFactor;
+    }
+
+    @Override
+    public void process(String input, Emitter<String> emitter) {
+      emitter.emit(input);
+    }
+
+    @Override
+    public float scaleFactor() {
+      return scaleFactor;
+    }
+
+  }
+
+  static class SizedPCollectionImpl extends PCollectionImpl<String> {
+
+    private long internalSize;
+
+    public SizedPCollectionImpl(String name, long internalSize) {
+      super(name, null);
+      this.internalSize = internalSize;
+    }
+
+    @Override
+    public PType getPType() {
+      return null;
+    }
+
+    @Override
+    public List getParents() {
+      return null;
+    }
+
+    @Override
+    protected void acceptInternal(Visitor visitor) {
+    }
+
+    @Override
+    protected ReadableData<String> getReadableDataInternal() {
+      return null;
+    }
+
+    @Override
+    protected long getSizeInternal() {
+      return internalSize;
+    }
+
+    @Override
+    public long getLastModifiedAt() {
+      return -1;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/test/java/org/apache/crunch/impl/dist/collect/DoTableImplTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/dist/collect/DoTableImplTest.java
b/crunch-core/src/test/java/org/apache/crunch/impl/dist/collect/DoTableImplTest.java
new file mode 100644
index 0000000..0fd557f
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/dist/collect/DoTableImplTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import static org.apache.crunch.types.writable.Writables.strings;
+import static org.apache.crunch.types.writable.Writables.tableOf;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ParallelDoOptions;
+import org.junit.Test;
+
+public class DoTableImplTest {
+
+  @Test
+  public void testGetSizeInternal_NoScaleFactor() {
+    runScaleTest(100L, 1.0f, 100L);
+  }
+
+  @Test
+  public void testGetSizeInternal_ScaleFactorBelowZero() {
+    runScaleTest(100L, 0.5f, 50L);
+  }
+
+  @Test
+  public void testGetSizeInternal_ScaleFactorAboveZero() {
+    runScaleTest(100L, 1.5f, 150L);
+  }
+
+  private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize) {
+
+    @SuppressWarnings("unchecked")
+    PCollectionImpl<String> parentCollection = (PCollectionImpl<String>) mock(PCollectionImpl.class);
+    when(parentCollection.getPipeline()).thenReturn(null);
+    when(parentCollection.getSize()).thenReturn(inputSize);
+
+    BaseDoTable<String, String> doTable = new BaseDoTable<String, String>("Scalled
table collection",
+        parentCollection, new TableScaledFunction(scaleFactor), tableOf(strings(), strings()),
+        ParallelDoOptions.builder().build());
+
+    assertEquals(expectedScaledSize, doTable.getSizeInternal());
+    verify(parentCollection).getPipeline();
+    verify(parentCollection).getSize();
+
+    verifyNoMoreInteractions(parentCollection);
+  }
+
+  static class TableScaledFunction extends DoFn<String, Pair<String, String>>
{
+
+    private float scaleFactor;
+
+    public TableScaledFunction(float scaleFactor) {
+      this.scaleFactor = scaleFactor;
+    }
+
+    @Override
+    public float scaleFactor() {
+      return scaleFactor;
+    }
+
+    @Override
+    public void process(String input, Emitter<Pair<String, String>> emitter)
{
+      emitter.emit(Pair.of(input, input));
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java
index 9ed7a46..d04b62b 100644
--- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java
@@ -25,7 +25,7 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 
 import org.apache.crunch.SourceTarget;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 import org.apache.crunch.impl.mr.run.RuntimeParameters;
 import org.apache.crunch.io.ReadableSourceTarget;
 import org.apache.crunch.types.avro.Avros;

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
b/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
deleted file mode 100644
index 4b607b1..0000000
--- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.collect;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.List;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.ReadableData;
-import org.apache.crunch.impl.mr.plan.DoNode;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.writable.Writables;
-import org.junit.Test;
-
-public class DoCollectionImplTest {
-
-  @Test
-  public void testGetSizeInternal_NoScaleFactor() {
-    runScaleTest(100L, 1.0f, 100L);
-  }
-
-  @Test
-  public void testGetSizeInternal_ScaleFactorBelowZero() {
-    runScaleTest(100L, 0.5f, 50L);
-  }
-
-  @Test
-  public void testGetSizeInternal_ScaleFactorAboveZero() {
-    runScaleTest(100L, 1.5f, 150L);
-  }
-
-  private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize) {
-    PCollectionImpl<String> parentCollection = new SizedPCollectionImpl("Sized collection",
inputSize);
-
-    DoCollectionImpl<String> doCollectionImpl = new DoCollectionImpl<String>("Scaled
collection", parentCollection,
-        new ScaledFunction(scaleFactor), Writables.strings());
-
-    assertEquals(expectedScaledSize, doCollectionImpl.getSizeInternal());
-  }
-
-  static class ScaledFunction extends DoFn<String, String> {
-
-    private float scaleFactor;
-
-    public ScaledFunction(float scaleFactor) {
-      this.scaleFactor = scaleFactor;
-    }
-
-    @Override
-    public void process(String input, Emitter<String> emitter) {
-      emitter.emit(input);
-    }
-
-    @Override
-    public float scaleFactor() {
-      return scaleFactor;
-    }
-
-  }
-
-  static class SizedPCollectionImpl extends PCollectionImpl<String> {
-
-    private long internalSize;
-
-    public SizedPCollectionImpl(String name, long internalSize) {
-      super(name);
-      this.internalSize = internalSize;
-    }
-
-    @Override
-    public PType getPType() {
-      return null;
-    }
-
-    @Override
-    public DoNode createDoNode() {
-      return null;
-    }
-
-    @Override
-    public List getParents() {
-      return null;
-    }
-
-    @Override
-    protected void acceptInternal(Visitor visitor) {
-    }
-
-    @Override
-    protected ReadableData<String> getReadableDataInternal() {
-      return null;
-    }
-
-    @Override
-    protected long getSizeInternal() {
-      return internalSize;
-    }
-
-    @Override
-    public long getLastModifiedAt() {
-      return -1;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java
b/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java
deleted file mode 100644
index 89b9944..0000000
--- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.collect;
-
-import static org.apache.crunch.types.writable.Writables.strings;
-import static org.apache.crunch.types.writable.Writables.tableOf;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-import org.junit.Test;
-
-public class DoTableImplTest {
-
-  @Test
-  public void testGetSizeInternal_NoScaleFactor() {
-    runScaleTest(100L, 1.0f, 100L);
-  }
-
-  @Test
-  public void testGetSizeInternal_ScaleFactorBelowZero() {
-    runScaleTest(100L, 0.5f, 50L);
-  }
-
-  @Test
-  public void testGetSizeInternal_ScaleFactorAboveZero() {
-    runScaleTest(100L, 1.5f, 150L);
-  }
-
-  private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize) {
-
-    @SuppressWarnings("unchecked")
-    PCollectionImpl<String> parentCollection = (PCollectionImpl<String>) mock(PCollectionImpl.class);
-
-    when(parentCollection.getSize()).thenReturn(inputSize);
-
-    DoTableImpl<String, String> doTableImpl = new DoTableImpl<String, String>("Scalled
table collection",
-        parentCollection, new TableScaledFunction(scaleFactor), tableOf(strings(), strings()));
-
-    assertEquals(expectedScaledSize, doTableImpl.getSizeInternal());
-
-    verify(parentCollection).getSize();
-
-    verifyNoMoreInteractions(parentCollection);
-  }
-
-  static class TableScaledFunction extends DoFn<String, Pair<String, String>>
{
-
-    private float scaleFactor;
-
-    public TableScaledFunction(float scaleFactor) {
-      this.scaleFactor = scaleFactor;
-    }
-
-    @Override
-    public float scaleFactor() {
-      return scaleFactor;
-    }
-
-    @Override
-    public void process(String input, Emitter<Pair<String, String>> emitter)
{
-      emitter.emit(Pair.of(input, input));
-
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
index e85419c..4b183ac 100644
--- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
@@ -27,8 +27,8 @@ import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.Source;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 import org.apache.crunch.impl.mr.collect.InputCollection;
-import org.apache.crunch.impl.mr.collect.PCollectionImpl;
 import org.apache.crunch.impl.mr.plan.DotfileWriter.MRTaskType;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/crunch/blob/a691b835/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
index 2428c16..96a9931 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
@@ -32,7 +32,7 @@ import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.lib.sort.TotalOrderPartitioner;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -389,7 +389,7 @@ public final class HFileUtils {
       }
     }, tableOf(writables(KeyValue.class), nulls()));
     List <KeyValue> splitPoints = getSplitPoints(table);
-    Path partitionFile = new Path(((MRPipeline) kvs.getPipeline()).createTempPath(), "partition");
+    Path partitionFile = new Path(((DistributedPipeline) kvs.getPipeline()).createTempPath(),
"partition");
     writePartitionInfo(conf, partitionFile, splitPoints);
     GroupingOptions options = GroupingOptions.builder()
         .partitionerClass(TotalOrderPartitioner.class)


Mime
View raw message