incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [14/28] Rename com.cloudera.crunch -> org.apache.crunch in the Java core
Date Sat, 07 Jul 2012 21:49:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/lib/Sort.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/Sort.java b/src/main/java/org/apache/crunch/lib/Sort.java
new file mode 100644
index 0000000..5a3f8e9
--- /dev/null
+++ b/src/main/java/org/apache/crunch/lib/Sort.java
@@ -0,0 +1,547 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.GroupingOptions.Builder;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.TupleWritable;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ * Utilities for sorting {@code PCollection} instances.
+ */
+public class Sort {
+  
+  public enum Order {
+    ASCENDING, DESCENDING, IGNORE
+  }
+  
+  /**
+   * To sort by column 2 ascending then column 1 descending, you would use:
+   * <code>
+   * sortPairs(coll, by(2, ASCENDING), by(1, DESCENDING))
+   * </code>
+   * Column numbering is 1-based.
+   */
+  public static class ColumnOrder {
+    int column;
+    Order order;
+    public ColumnOrder(int column, Order order) {
+      this.column = column;
+      this.order = order;
+    }
+    public static ColumnOrder by(int column, Order order) {
+      return new ColumnOrder(column, order);
+    }
+    
+    @Override
+    public String toString() {
+      return"ColumnOrder: column:" + column + ", Order: " + order;
+    }
+  }
+  
+  /**
+   * Sorts the {@link PCollection} using the natural ordering of its elements.
+   * 
+   * @return a {@link PCollection} representing the sorted collection.
+   */
+  public static <T> PCollection<T> sort(PCollection<T> collection) {
+    return sort(collection, Order.ASCENDING);
+  }
+  
+  /**
+   * Sorts the {@link PCollection} using the natural ordering of its elements
+   * in the order specified.
+   * 
+   * @return a {@link PCollection} representing the sorted collection.
+   */
+  public static <T> PCollection<T> sort(PCollection<T> collection, Order order) {
+    PTypeFamily tf = collection.getTypeFamily();
+    PTableType<T, Void> type = tf.tableOf(collection.getPType(), tf.nulls());
+    Configuration conf = collection.getPipeline().getConfiguration();
+    GroupingOptions options = buildGroupingOptions(conf, tf,
+        collection.getPType(), order);
+    PTable<T, Void> pt =
+      collection.parallelDo("sort-pre", new DoFn<T, Pair<T, Void>>() {
+        @Override
+        public void process(T input,
+            Emitter<Pair<T, Void>> emitter) {
+          emitter.emit(Pair.of(input, (Void) null));
+        }
+      }, type);
+    PTable<T, Void> sortedPt = pt.groupByKey(options).ungroup();
+    return sortedPt.parallelDo("sort-post", new DoFn<Pair<T, Void>, T>() {
+      @Override
+      public void process(Pair<T, Void> input, Emitter<T> emitter) {
+        emitter.emit(input.first());
+      }
+    }, collection.getPType());
+  }
+  
+
+  /**
+   * Sorts the {@link PTable} using the natural ordering of its keys.
+   * 
+   * @return a {@link PTable} representing the sorted table.
+   */
+  public static <K, V> PTable<K, V> sort(PTable<K, V> table) {
+    return sort(table, Order.ASCENDING);
+  }
+
+  /**
+   * Sorts the {@link PTable} using the natural ordering of its keys
+   * in the order specified.
+   * 
+   * @return a {@link PTable} representing the sorted collection.
+   */
+  public static <K, V> PTable<K, V> sort(PTable<K, V> table, Order key) {
+    PTypeFamily tf = table.getTypeFamily();
+    Configuration conf = table.getPipeline().getConfiguration();
+    GroupingOptions options = buildGroupingOptions(conf, tf, table.getKeyType(), key);
+    return table.groupByKey(options).ungroup();
+  }
+  
+  /**
+   * Sorts the {@link PCollection} of {@link Pair}s using the specified column
+   * ordering.
+   * 
+   * @return a {@link PCollection} representing the sorted collection.
+   */
+  public static <U, V> PCollection<Pair<U, V>> sortPairs(
+      PCollection<Pair<U, V>> collection, ColumnOrder... columnOrders) {
+    // put U and V into a pair/tuple in the key so we can do grouping and sorting
+    PTypeFamily tf = collection.getTypeFamily();
+    PType<Pair<U, V>> pType = collection.getPType();
+    @SuppressWarnings("unchecked")
+    PTableType<Pair<U, V>, Void> type = tf.tableOf(
+        tf.pairs(pType.getSubTypes().get(0), pType.getSubTypes().get(1)),
+        tf.nulls());
+    PTable<Pair<U, V>, Void> pt =
+      collection.parallelDo(new DoFn<Pair<U, V>, Pair<Pair<U, V>, Void>>() {
+        @Override
+        public void process(Pair<U, V> input,
+            Emitter<Pair<Pair<U, V>, Void>> emitter) {
+          emitter.emit(Pair.of(input, (Void) null));
+        }
+      }, type);
+    Configuration conf = collection.getPipeline().getConfiguration();
+    GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
+    PTable<Pair<U, V>, Void> sortedPt = pt.groupByKey(options).ungroup();
+    return sortedPt.parallelDo(new DoFn<Pair<Pair<U, V>,Void>, Pair<U, V>>() {
+      @Override
+      public void process(Pair<Pair<U, V>, Void> input,
+          Emitter<Pair<U, V>> emitter) {
+        emitter.emit(input.first());
+      }
+    }, collection.getPType());
+  }
+
+  /**
+   * Sorts the {@link PCollection} of {@link Tuple3}s using the specified column
+   * ordering.
+   * 
+   * @return a {@link PCollection} representing the sorted collection.
+   */
+  public static <V1, V2, V3> PCollection<Tuple3<V1, V2, V3>> sortTriples(
+      PCollection<Tuple3<V1, V2, V3>> collection, ColumnOrder... columnOrders) {
+    PTypeFamily tf = collection.getTypeFamily();
+    PType<Tuple3<V1, V2, V3>> pType = collection.getPType();
+    @SuppressWarnings("unchecked")
+    PTableType<Tuple3<V1, V2, V3>, Void> type = tf.tableOf(
+        tf.triples(pType.getSubTypes().get(0), pType.getSubTypes().get(1), pType.getSubTypes().get(2)),
+        tf.nulls());
+    PTable<Tuple3<V1, V2, V3>, Void> pt =
+      collection.parallelDo(new DoFn<Tuple3<V1, V2, V3>, Pair<Tuple3<V1, V2, V3>, Void>>() {
+        @Override
+        public void process(Tuple3<V1, V2, V3> input,
+            Emitter<Pair<Tuple3<V1, V2, V3>, Void>> emitter) {
+          emitter.emit(Pair.of(input, (Void) null));
+        }
+      }, type);
+    Configuration conf = collection.getPipeline().getConfiguration();
+    GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
+    PTable<Tuple3<V1, V2, V3>, Void> sortedPt = pt.groupByKey(options).ungroup();
+    return sortedPt.parallelDo(new DoFn<Pair<Tuple3<V1, V2, V3>,Void>, Tuple3<V1, V2, V3>>() {
+      @Override
+      public void process(Pair<Tuple3<V1, V2, V3>, Void> input,
+          Emitter<Tuple3<V1, V2, V3>> emitter) {
+        emitter.emit(input.first());
+      }
+    }, collection.getPType());
+  }
+
+  /**
+   * Sorts the {@link PCollection} of {@link Tuple4}s using the specified column
+   * ordering.
+   * 
+   * @return a {@link PCollection} representing the sorted collection.
+   */
+  public static <V1, V2, V3, V4> PCollection<Tuple4<V1, V2, V3, V4>> sortQuads(
+      PCollection<Tuple4<V1, V2, V3, V4>> collection, ColumnOrder... columnOrders) {
+    PTypeFamily tf = collection.getTypeFamily();
+    PType<Tuple4<V1, V2, V3, V4>> pType = collection.getPType();
+    @SuppressWarnings("unchecked")
+    PTableType<Tuple4<V1, V2, V3, V4>, Void> type = tf.tableOf(
+        tf.quads(pType.getSubTypes().get(0), pType.getSubTypes().get(1), pType.getSubTypes().get(2),  pType.getSubTypes().get(3)),
+        tf.nulls());
+    PTable<Tuple4<V1, V2, V3, V4>, Void> pt =
+      collection.parallelDo(new DoFn<Tuple4<V1, V2, V3, V4>, Pair<Tuple4<V1, V2, V3, V4>, Void>>() {
+        @Override
+        public void process(Tuple4<V1, V2, V3, V4> input,
+            Emitter<Pair<Tuple4<V1, V2, V3, V4>, Void>> emitter) {
+          emitter.emit(Pair.of(input, (Void) null));
+        }
+      }, type);
+    Configuration conf = collection.getPipeline().getConfiguration();
+    GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
+    PTable<Tuple4<V1, V2, V3, V4>, Void> sortedPt = pt.groupByKey(options).ungroup();
+    return sortedPt.parallelDo(new DoFn<Pair<Tuple4<V1, V2, V3, V4>,Void>, Tuple4<V1, V2, V3, V4>>() {
+      @Override
+      public void process(Pair<Tuple4<V1, V2, V3, V4>, Void> input,
+          Emitter<Tuple4<V1, V2, V3, V4>> emitter) {
+        emitter.emit(input.first());
+      }
+    }, collection.getPType());
+  }
+
+  /**
+   * Sorts the {@link PCollection} of {@link TupleN}s using the specified column
+   * ordering.
+   * 
+   * @return a {@link PCollection} representing the sorted collection.
+   */
+  public static PCollection<TupleN> sortTuples(PCollection<TupleN> collection,
+      ColumnOrder... columnOrders) {
+    PTypeFamily tf = collection.getTypeFamily();
+    PType<TupleN> pType = collection.getPType();
+    PTableType<TupleN, Void> type = tf.tableOf(
+        tf.tuples(pType.getSubTypes().toArray(new PType[0])),
+        tf.nulls());
+    PTable<TupleN, Void> pt =
+      collection.parallelDo(new DoFn<TupleN, Pair<TupleN, Void>>() {
+        @Override
+        public void process(TupleN input,
+            Emitter<Pair<TupleN, Void>> emitter) {
+          emitter.emit(Pair.of(input, (Void) null));
+        }
+      }, type);
+    Configuration conf = collection.getPipeline().getConfiguration();
+    GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
+    PTable<TupleN, Void> sortedPt = pt.groupByKey(options).ungroup();
+    return sortedPt.parallelDo(new DoFn<Pair<TupleN,Void>, TupleN>() {
+      @Override
+      public void process(Pair<TupleN, Void> input,
+          Emitter<TupleN> emitter) {
+        emitter.emit(input.first());
+      }
+    }, collection.getPType());
+  }
+  
+  // TODO: move to type family?
+  private static <T> GroupingOptions buildGroupingOptions(Configuration conf,
+      PTypeFamily tf, PType<T> ptype, Order order) {
+    Builder builder = GroupingOptions.builder();
+    if (order == Order.DESCENDING) {
+      if (tf == WritableTypeFamily.getInstance()) {
+        builder.sortComparatorClass(ReverseWritableComparator.class);
+      } else if (tf == AvroTypeFamily.getInstance()) {
+        AvroType<T> avroType = (AvroType<T>) ptype;
+        Schema schema = avroType.getSchema();
+        conf.set("crunch.schema", schema.toString());
+        builder.sortComparatorClass(ReverseAvroComparator.class);
+      } else {
+        throw new RuntimeException("Unrecognized type family: " + tf);
+      }
+    }
+    return builder.build();
+  }
+  
+  private static <T> GroupingOptions buildGroupingOptions(Configuration conf,
+      PTypeFamily tf, PType<T> ptype, ColumnOrder[] columnOrders) {
+    Builder builder = GroupingOptions.builder();
+    if (tf == WritableTypeFamily.getInstance()) {
+      TupleWritableComparator.configureOrdering(conf, columnOrders);
+      builder.sortComparatorClass(TupleWritableComparator.class);
+    } else if (tf == AvroTypeFamily.getInstance()) {
+      TupleAvroComparator.configureOrdering(conf, columnOrders, ptype);
+      builder.sortComparatorClass(TupleAvroComparator.class);
+    } else {
+      throw new RuntimeException("Unrecognized type family: " + tf);
+    }
+    return builder.build();
+  }
+  
+  static class ReverseWritableComparator<T> extends Configured implements RawComparator<T> {
+    
+    RawComparator<T> comparator;
+    
+    @SuppressWarnings("unchecked")
+    @Override
+    public void setConf(Configuration conf) {
+      super.setConf(conf);
+      if (conf != null) {
+        JobConf jobConf = new JobConf(conf);
+        comparator = WritableComparator.get(
+            jobConf.getMapOutputKeyClass().asSubclass(WritableComparable.class));
+      }
+    }
+
+    @Override
+    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4,
+        int arg5) {
+      return -comparator.compare(arg0, arg1, arg2, arg3, arg4, arg5);
+    }
+
+    @Override
+    public int compare(T o1, T o2) {
+      return -comparator.compare(o1, o2);
+    }
+
+  }
+  
+  static class ReverseAvroComparator<T> extends Configured implements RawComparator<T> {
+
+    Schema schema;
+    
+    @Override
+    public void setConf(Configuration conf) {
+      super.setConf(conf);
+      if (conf != null) {
+        schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
+      }
+    }
+
+    @Override
+    public int compare(T o1, T o2) {
+      return -ReflectData.get().compare(o1, o2, schema);
+    }
+
+    @Override
+    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4,
+        int arg5) {
+      return -BinaryData.compare(arg0, arg1, arg2, arg3, arg4, arg5, schema);
+    }
+    
+  }
+  
+  static class TupleWritableComparator extends WritableComparator implements Configurable {
+    
+    private static final String CRUNCH_ORDERING_PROPERTY = "crunch.ordering";
+    
+    Configuration conf;
+    ColumnOrder[] columnOrders;
+    
+    public TupleWritableComparator() {
+      super(TupleWritable.class, true);
+    }
+    
+    public static void configureOrdering(Configuration conf, Order... orders) {
+      conf.set(CRUNCH_ORDERING_PROPERTY, Joiner.on(",").join(
+        Iterables.transform(Arrays.asList(orders),
+          new Function<Order, String>() {
+        @Override
+        public String apply(Order o) {
+          return o.name();
+        }
+      })));
+    }
+
+    
+    public static void configureOrdering(Configuration conf, ColumnOrder... columnOrders) {
+      conf.set(CRUNCH_ORDERING_PROPERTY, Joiner.on(",").join(
+        Iterables.transform(Arrays.asList(columnOrders),
+          new Function<ColumnOrder, String>() {
+        @Override
+        public String apply(ColumnOrder o) {
+          return o.column + ";" + o.order.name();
+        }
+      })));
+    }
+    
+    @Override
+    public int compare(WritableComparable a, WritableComparable b) {
+      TupleWritable ta = (TupleWritable) a;
+      TupleWritable tb = (TupleWritable) b;
+      for (int i = 0; i < columnOrders.length; i++) {
+        int index = columnOrders[i].column - 1;
+        int order = 1;
+        if (columnOrders[i].order == Order.ASCENDING) {
+          order = 1;
+        } else  if (columnOrders[i].order == Order.DESCENDING) {
+          order = -1;
+        } else { // ignore
+          continue;
+        }
+        if (!ta.has(index) && !tb.has(index)) {
+          continue;
+        } else if (ta.has(index) && !tb.has(index)) {
+          return order;
+        } else if (!ta.has(index) && tb.has(index)) {
+          return -order;
+        } else {
+          Writable v1 = ta.get(index);
+          Writable v2 = tb.get(index);
+          if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
+            if (v1 instanceof WritableComparable
+                && v2 instanceof WritableComparable) {
+              int cmp = ((WritableComparable) v1)
+                  .compareTo((WritableComparable) v2);
+              if (cmp != 0) {
+                return order * cmp;
+              }
+            } else {
+              int cmp = v1.hashCode() - v2.hashCode();
+              if (cmp != 0) {
+                return order * cmp;
+              }
+            }
+          }
+        }
+      }
+      return 0; // ordering using specified columns found no differences
+    }
+
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+      if (conf != null) {
+        String ordering = conf.get(CRUNCH_ORDERING_PROPERTY);
+        String[] columnOrderNames = ordering.split(",");
+        columnOrders = new ColumnOrder[columnOrderNames.length];
+        for (int i = 0; i < columnOrders.length; i++) {
+          String[] split = columnOrderNames[i].split(";");
+          int column = Integer.parseInt(split[0]);
+          Order order = Order.valueOf(split[1]);
+          columnOrders[i] = ColumnOrder.by(column, order);
+          
+        }
+      }
+    }
+  }
+  
+  static class TupleAvroComparator<T> extends Configured implements RawComparator<T> {
+
+    Schema schema;
+    
+    @Override
+    public void setConf(Configuration conf) {
+      super.setConf(conf);
+      if (conf != null) {
+        schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
+      }
+    }
+
+    public static <S> void configureOrdering(Configuration conf, ColumnOrder[] columnOrders,
+        PType<S> ptype) {
+      Schema orderedSchema = createOrderedTupleSchema(ptype, columnOrders);
+      conf.set("crunch.schema", orderedSchema.toString());
+    }
+    
+    // TODO: move to Avros
+    // TODO: need to re-order columns in map output then switch back in the reduce
+    //       this will require more extensive changes in Crunch
+    private static <S> Schema createOrderedTupleSchema(PType<S> ptype, ColumnOrder[] orders) {
+      // Guarantee each tuple schema has a globally unique name
+      String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
+      Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
+      List<Schema.Field> fields = Lists.newArrayList();
+      AvroType<S> parentAvroType = (AvroType<S>) ptype;
+      Schema parentAvroSchema = parentAvroType.getSchema();
+      
+      BitSet orderedColumns = new BitSet();
+      // First add any fields specified by ColumnOrder
+      for (ColumnOrder columnOrder : orders) {
+        int index = columnOrder.column - 1;
+        AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(index);
+        Schema fieldSchema = Schema.createUnion(
+            ImmutableList.of(atype.getSchema(), Schema.create(Type.NULL)));
+        String fieldName = parentAvroSchema.getFields().get(index).name();
+        fields.add(new Schema.Field(fieldName, fieldSchema, "", null,
+            Schema.Field.Order.valueOf(columnOrder.order.name())));
+        orderedColumns.set(index);
+      }
+      // Then add remaining fields from the ptypes, with no sort order
+      for (int i = 0; i < ptype.getSubTypes().size(); i++) {
+        if (orderedColumns.get(i)) {
+          continue;
+        }
+        AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(i);
+        Schema fieldSchema = Schema.createUnion(
+            ImmutableList.of(atype.getSchema(), Schema.create(Type.NULL)));
+        String fieldName = parentAvroSchema.getFields().get(i).name();
+        fields.add(new Schema.Field(fieldName, fieldSchema, "", null,
+            Schema.Field.Order.IGNORE));
+      }
+      schema.setFields(fields);
+      return schema;
+    }
+
+    @Override
+    public int compare(T o1, T o2) {
+      return ReflectData.get().compare(o1, o2, schema);
+    }
+
+    @Override
+    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4,
+        int arg5) {
+      return BinaryData.compare(arg0, arg1, arg2, arg3, arg4, arg5, schema);
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java b/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
new file mode 100644
index 0000000..4f0d5cc
--- /dev/null
+++ b/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+import com.google.common.collect.Lists;
+
+/**
+ * Used to perform the last step of an full outer join.
+ *
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ */
+public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
+  
+  private transient int lastId;
+  private transient K lastKey;
+  private transient List<U> leftValues;
+
+  public FullOuterJoinFn(PType<U> leftValueType) {
+    super(leftValueType);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void initialize() {
+    lastId = 1;
+    lastKey = null;
+    this.leftValues = Lists.newArrayList();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void join(K key, int id, Iterable<Pair<U, V>> pairs,
+      Emitter<Pair<K, Pair<U, V>>> emitter) {
+    if (!key.equals(lastKey)) {
+      // Make sure that left side gets emitted.
+      if (0 == lastId && 0 == id) {
+        for (U u : leftValues) {
+          emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
+        }
+      }
+      lastKey = key;
+      leftValues.clear();
+    }
+    if (id == 0) {
+      for (Pair<U, V> pair : pairs) {
+        if (pair.first() != null)
+          leftValues.add(leftValueType.getDetachedValue(pair.first()));
+      }
+    } else {
+      for (Pair<U, V> pair : pairs) {
+        // Make sure that right side gets emitted.
+        if (leftValues.isEmpty()) {
+          leftValues.add(null);
+        }
+        for (U u : leftValues) {
+          emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
+        }
+      }
+    }
+
+    lastId = id;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void cleanup(Emitter<Pair<K, Pair<U, V>>> emitter) {
+    if (0 == lastId) {
+      for (U u : leftValues) {
+        emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String getJoinType() { return "fullOuterJoin"; }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java b/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
new file mode 100644
index 0000000..0c46bdf
--- /dev/null
+++ b/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+import com.google.common.collect.Lists;
+
+/**
+ * Used to perform the last step of an inner join.
+ *
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ */
+public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> {
+
+  private transient K lastKey;
+  private transient List<U> leftValues;
+  
+  public InnerJoinFn(PType<U> leftValueType) {
+    super(leftValueType);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void initialize() {
+    lastKey = null;
+    this.leftValues = Lists.newArrayList();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void join(K key, int id, Iterable<Pair<U, V>> pairs,
+      Emitter<Pair<K, Pair<U, V>>> emitter) {
+    if (!key.equals(lastKey)) {
+      lastKey = key;
+      leftValues.clear();
+    }
+    if (id == 0) { // from left
+      for (Pair<U, V> pair : pairs) {
+        if (pair.first() != null)
+          leftValues.add(leftValueType.getDetachedValue(pair.first()));
+      }
+    } else { // from right
+      for (Pair<U, V> pair : pairs) {
+        for (U u : leftValues) {
+          emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
+        }
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String getJoinType() { return "innerJoin"; }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/lib/join/JoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/join/JoinFn.java b/src/main/java/org/apache/crunch/lib/join/JoinFn.java
new file mode 100644
index 0000000..494031c
--- /dev/null
+++ b/src/main/java/org/apache/crunch/lib/join/JoinFn.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+
+/**
+ * Represents a {@link org.apache.crunch.DoFn} for performing joins.
+ *
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ */
+public abstract class JoinFn<K, U, V>
+    extends DoFn<Pair<Pair<K, Integer>, Iterable<Pair<U, V>>>, Pair<K, Pair<U, V>>> {
+  
+  protected PType<U> leftValueType;
+
+  /**
+   * Instantiate with the PType of the value of the left side of the join (used
+   * for creating deep copies of values).
+   * 
+   * @param leftValueType
+   *          The PType of the value type of the left side of the join
+   */
+  public JoinFn(PType<U> leftValueType) {
+    this.leftValueType = leftValueType;
+  }
+
+  /** @return The name of this join type (e.g. innerJoin, leftOuterJoin). */
+  public abstract String getJoinType();
+
+  /**
+   * Performs the actual joining.
+   *
+   * @param key The key for this grouping of values.
+   * @param id The side that this group of values is from (0 -> left, 1 -> right).
+   * @param pairs The group of values associated with this key and id pair.
+   * @param emitter The emitter to send the output to.
+   */
+  public abstract void join(K key, int id, Iterable<Pair<U, V>> pairs,
+      Emitter<Pair<K, Pair<U, V>>> emitter);
+
+  /**
+   * Split up the input record to make coding a bit more manageable.
+   *
+   * @param input The input record.
+   * @param emitter The emitter to send the output to.
+   */
+  @Override
+  public void process(Pair<Pair<K, Integer>, Iterable<Pair<U, V>>> input,
+      Emitter<Pair<K, Pair<U, V>>> emitter) {
+    join(input.first().first(), input.first().second(), input.second(), emitter);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/join/JoinUtils.java b/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
new file mode 100644
index 0000000..c282642
--- /dev/null
+++ b/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.writable.TupleWritable;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+
+/**
+ * Utilities that are useful in joining multiple data sets via a MapReduce.
+ *
+ */
+public class JoinUtils {
+
+  public static Class<? extends Partitioner> getPartitionerClass(PTypeFamily typeFamily) {
+    if (typeFamily == WritableTypeFamily.getInstance()) {
+      return TupleWritablePartitioner.class;
+    } else {
+      return AvroIndexedRecordPartitioner.class;
+    }
+  }
+  
+  public static Class<? extends RawComparator> getGroupingComparator(PTypeFamily typeFamily) {
+    if (typeFamily == WritableTypeFamily.getInstance()) {
+      return TupleWritableComparator.class;
+    } else {
+      return AvroPairGroupingComparator.class;
+    }
+  }
+  
+  public static class TupleWritablePartitioner extends Partitioner<TupleWritable, Writable> {
+    @Override
+    public int getPartition(TupleWritable key, Writable value, int numPartitions) {
+      return (Math.abs(key.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
+    }
+  }
+  
+  public static class TupleWritableComparator implements RawComparator<TupleWritable> {
+    
+    private DataInputBuffer buffer = new DataInputBuffer();
+    private TupleWritable key1 = new TupleWritable();
+    private TupleWritable key2 = new TupleWritable();
+    
+    @Override
+    public int compare(TupleWritable o1, TupleWritable o2) {
+      return ((WritableComparable)o1.get(0)).compareTo((WritableComparable)o2.get(0));
+    }
+
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      try {
+        buffer.reset(b1, s1, l1);
+        key1.readFields(buffer);
+      
+        buffer.reset(b2, s2, l2);
+        key2.readFields(buffer);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      
+      return compare(key1, key2);
+    }
+  }
+  
+  public static class AvroIndexedRecordPartitioner<K, V> extends Partitioner<AvroKey<K>, AvroValue<V>> {
+    @Override
+    public int getPartition(AvroKey<K> key, AvroValue<V> value, int numPartitions) {
+      IndexedRecord record = (IndexedRecord) key.datum();
+      return (Math.abs(record.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
+    }
+  }
+  
+  public static class AvroPairGroupingComparator<T> extends Configured implements RawComparator<AvroWrapper<T>> {
+    private Schema schema;
+
+    @Override
+    public void setConf(Configuration conf) {
+      super.setConf(conf);
+      if (conf != null) {
+        Schema mapOutputSchema = AvroJob.getMapOutputSchema(conf);
+        Schema keySchema = org.apache.avro.mapred.Pair.getKeySchema(mapOutputSchema);
+        schema = keySchema.getFields().get(0).schema();
+      }
+    }
+    
+    @Override
+    public int compare(AvroWrapper<T> x, AvroWrapper<T> y) {
+      return ReflectData.get().compare(x.datum(), y.datum(), schema);
+    }
+
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      return BinaryData.compare(b1, s1, l1, b2, s2, l2, schema);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java b/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
new file mode 100644
index 0000000..6e4d3c6
--- /dev/null
+++ b/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+import com.google.common.collect.Lists;
+
+/**
+ * Used to perform the last step of an left outer join.
+ *
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ */
+public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
+  
+  private transient int lastId;
+  private transient K lastKey;
+  private transient List<U> leftValues;
+
+  public LeftOuterJoinFn(PType<U> leftValueType) {
+    super(leftValueType);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void initialize() {
+    lastId = 1;
+    lastKey = null;
+    this.leftValues = Lists.newArrayList();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void join(K key, int id, Iterable<Pair<U, V>> pairs,
+      Emitter<Pair<K, Pair<U, V>>> emitter) {
+    if (!key.equals(lastKey)) {
+      // Make sure that left side always gets emitted.
+      if (0 == lastId && 0 == id) {
+        for (U u : leftValues) {
+          emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
+        }
+      }
+      lastKey = key;
+      leftValues.clear();
+    }
+    if (id == 0) {
+      for (Pair<U, V> pair : pairs) {
+        if (pair.first() != null)
+          leftValues.add(leftValueType.getDetachedValue(pair.first()));
+      }
+    } else {
+      for (Pair<U, V> pair : pairs) {
+        for (U u : leftValues) {
+          emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
+        }
+      }
+    }
+
+    lastId = id;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void cleanup(Emitter<Pair<K, Pair<U, V>>> emitter) {
+    if (0 == lastId) {
+      for (U u : leftValues) {
+        emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String getJoinType() { return "leftOuterJoin"; }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java b/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
new file mode 100644
index 0000000..28a0829
--- /dev/null
+++ b/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.io.IOException;
+
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.io.impl.SourcePathTargetImpl;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * Utility for doing map side joins on a common key between two {@link PTable}s.
+ * <p>
+ * A map side join is an optimized join which doesn't use a reducer; instead,
+ * the right side of the join is loaded into memory and the join is performed in
+ * a mapper. This style of join has the important implication that the output of
+ * the join is not sorted, which is the case with a conventional (reducer-based)
+ * join.
+ * <p>
+ * <b>Note:</b>This utility is only supported when running with a
+ * {@link MRPipeline} as the pipeline.
+ */
+public class MapsideJoin {
+
+  /**
+   * Join two tables using a map side join. The right-side table will be loaded
+   * fully in memory, so this method should only be used if the right side
+   * table's contents can fit in the memory allocated to mappers. The join
+   * performed by this method is an inner join.
+   * 
+   * @param left
+   *          The left-side table of the join
+   * @param right
+   *          The right-side table of the join, whose contents will be fully
+   *          read into memory
+   * @return A table keyed on the join key, containing pairs of joined values
+   */
+  public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right) {
+
+    if (!(right.getPipeline() instanceof MRPipeline)) {
+      throw new CrunchRuntimeException("Map-side join is only supported within a MapReduce context");
+    }
+
+    MRPipeline pipeline = (MRPipeline) right.getPipeline();
+    pipeline.materialize(right);
+
+    // TODO Move necessary logic to MRPipeline so that we can theoretically
+    // optimize his by running the setup of multiple map-side joins concurrently
+    pipeline.run();
+
+    ReadableSourceTarget<Pair<K, V>> readableSourceTarget = pipeline
+        .getMaterializeSourceTarget(right);
+    if (!(readableSourceTarget instanceof SourcePathTargetImpl)) {
+      throw new CrunchRuntimeException("Right-side contents can't be read from a path");
+    }
+
+    // Suppress warnings because we've just checked this cast via instanceof
+    @SuppressWarnings("unchecked")
+    SourcePathTargetImpl<Pair<K, V>> sourcePathTarget = (SourcePathTargetImpl<Pair<K, V>>) readableSourceTarget;
+
+    Path path = sourcePathTarget.getPath();
+    DistributedCache.addCacheFile(path.toUri(), pipeline.getConfiguration());
+
+    MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(path.toString(),
+        right.getPType());
+    PTypeFamily typeFamily = left.getTypeFamily();
+    return left.parallelDo(
+        "mapjoin",
+        mapJoinDoFn,
+        typeFamily.tableOf(left.getKeyType(),
+            typeFamily.pairs(left.getValueType(), right.getValueType())));
+
+  }
+
+  static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
+
+    private String inputPath;
+    private PType<Pair<K, V>> ptype;
+    private Multimap<K, V> joinMap;
+
+    public MapsideJoinDoFn(String inputPath, PType<Pair<K, V>> ptype) {
+      this.inputPath = inputPath;
+      this.ptype = ptype;
+    }
+
+    private Path getCacheFilePath() {
+      try {
+        for (Path localPath : DistributedCache.getLocalCacheFiles(getConfiguration())) {
+          if (localPath.toString().endsWith(inputPath)) {
+            return localPath.makeQualified(FileSystem.getLocal(getConfiguration()));
+
+          }
+        }
+      } catch (IOException e) {
+        throw new CrunchRuntimeException(e);
+      }
+
+      throw new CrunchRuntimeException("Can't find local cache file for '" + inputPath + "'");
+    }
+
+    @Override
+    public void initialize() {
+      super.initialize();
+
+      ReadableSourceTarget<Pair<K, V>> sourceTarget = (ReadableSourceTarget<Pair<K, V>>) ptype
+          .getDefaultFileSource(getCacheFilePath());
+      Iterable<Pair<K, V>> iterable = null;
+      try {
+        iterable = sourceTarget.read(getConfiguration());
+      } catch (IOException e) {
+        throw new CrunchRuntimeException("Error reading right-side of map side join: ", e);
+      }
+
+      joinMap = ArrayListMultimap.create();
+      for (Pair<K, V> joinPair : iterable) {
+        joinMap.put(joinPair.first(), joinPair.second());
+      }
+    }
+
+    @Override
+    public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
+      K key = input.first();
+      U value = input.second();
+      for (V joinValue : joinMap.get(key)) {
+        Pair<U, V> valuePair = Pair.of(value, joinValue);
+        emitter.emit(Pair.of(key, valuePair));
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java b/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
new file mode 100644
index 0000000..a7b2a0d
--- /dev/null
+++ b/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+import com.google.common.collect.Lists;
+
+/**
+ * Used to perform the last step of an right outer join.
+ *
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ */
+public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
+
+  private transient K lastKey;
+  private transient List<U> leftValues;
+
+  public RightOuterJoinFn(PType<U> leftValueType) {
+    super(leftValueType);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void initialize() {
+    lastKey = null;
+    this.leftValues = Lists.newArrayList();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void join(K key, int id, Iterable<Pair<U, V>> pairs,
+      Emitter<Pair<K, Pair<U, V>>> emitter) {
+    if (!key.equals(lastKey)) {
+      lastKey = key;
+      leftValues.clear();
+    }
+    if (id == 0) {
+      for (Pair<U, V> pair : pairs) {
+        if (pair.first() != null)
+          leftValues.add(leftValueType.getDetachedValue(pair.first()));
+      }
+    } else {
+      for (Pair<U, V> pair : pairs) {
+        // Make sure that right side gets emitted.
+        if (leftValues.isEmpty()) {
+          leftValues.add(null);
+        }
+
+        for (U u : leftValues) {
+          emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
+        }
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String getJoinType() { return "rightOuterJoin"; }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java b/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
new file mode 100644
index 0000000..c713d90
--- /dev/null
+++ b/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.materialize;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+import org.apache.crunch.io.ReadableSourceTarget;
+
+public class MaterializableIterable<E> implements Iterable<E> {
+
+  private static final Log LOG = LogFactory.getLog(MaterializableIterable.class);
+  
+  private final Pipeline pipeline;
+  private final ReadableSourceTarget<E> sourceTarget;
+  private Iterable<E> materialized;
+  
+  public MaterializableIterable(Pipeline pipeline, ReadableSourceTarget<E> source) {
+	this.pipeline = pipeline;
+	this.sourceTarget = source;
+	this.materialized = null;
+  }
+  
+  public ReadableSourceTarget<E> getSourceTarget() {
+    return sourceTarget;
+  }
+  
+  @Override
+  public Iterator<E> iterator() {
+    if (materialized == null) {
+      pipeline.run();
+      materialize();
+    }
+    return materialized.iterator();
+  }
+
+  public void materialize() {
+	try {
+	  materialized = sourceTarget.read(pipeline.getConfiguration());
+	} catch (IOException e) {
+	  LOG.error("Could not materialize: " + sourceTarget, e);
+	  throw new CrunchRuntimeException(e);
+	}	
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/materialize/MaterializableMap.java b/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
new file mode 100644
index 0000000..e751f50
--- /dev/null
+++ b/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.materialize;
+
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.crunch.Pair;
+
+public class MaterializableMap<K, V> extends AbstractMap<K, V> {
+  
+  private Iterable<Pair<K, V>> iterable;
+  private Set<Map.Entry<K, V>> entrySet;
+  
+  public MaterializableMap(Iterable<Pair<K, V>> iterable) {
+  	this.iterable = iterable;
+  }
+  
+  private Set<Map.Entry<K, V>> toMapEntries(Iterable<Pair<K, V>> xs) {
+    HashMap<K, V> m = new HashMap<K, V>();
+    for (Pair<K, V> x : xs)
+      m.put(x.first(), x.second());
+    return m.entrySet();
+  }
+
+  @Override
+  public Set<Map.Entry<K, V>> entrySet() {
+    if (entrySet == null)
+      entrySet = toMapEntries(iterable);
+    return entrySet;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/test/FileHelper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/test/FileHelper.java b/src/main/java/org/apache/crunch/test/FileHelper.java
new file mode 100644
index 0000000..7f5d0c7
--- /dev/null
+++ b/src/main/java/org/apache/crunch/test/FileHelper.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;
+
+import static com.google.common.io.Resources.getResource;
+import static com.google.common.io.Resources.newInputStreamSupplier;
+
+import java.io.File;
+import java.io.IOException;
+
+import com.google.common.io.Files;
+
+public class FileHelper {
+
+  public static String createTempCopyOf(String fileResource) throws IOException {
+	File tmpFile = File.createTempFile("tmp", "");
+	tmpFile.deleteOnExit();
+	Files.copy(newInputStreamSupplier(getResource(fileResource)), tmpFile);
+	return tmpFile.getAbsolutePath();
+  }
+  
+  public static File createOutputPath() throws IOException {
+    File output = File.createTempFile("output", "");
+    output.delete();
+    return output;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/test/InMemoryEmitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/test/InMemoryEmitter.java b/src/main/java/org/apache/crunch/test/InMemoryEmitter.java
new file mode 100644
index 0000000..46e90ae
--- /dev/null
+++ b/src/main/java/org/apache/crunch/test/InMemoryEmitter.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import com.google.common.collect.Lists;
+
+/**
+ * An {@code Emitter} instance that writes emitted records to a backing {@code List}.
+ *
+ * @param <T>
+ */
+public class InMemoryEmitter<T> implements Emitter<T> {
+  
+  private final List<T> output;
+  
+  public InMemoryEmitter() {
+    this(Lists.<T>newArrayList());
+  }
+  
+  public InMemoryEmitter(List<T> output) {
+    this.output = output;
+  }
+  
+  @Override
+  public void emit(T emitted) {
+    output.add(emitted);
+  }
+
+  @Override
+  public void flush() {
+
+  }
+
+  public List<T> getOutput() {
+    return output;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/test/TestCounters.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/test/TestCounters.java b/src/main/java/org/apache/crunch/test/TestCounters.java
new file mode 100644
index 0000000..8c8823c
--- /dev/null
+++ b/src/main/java/org/apache/crunch/test/TestCounters.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+
+/**
+ * A utility class used during unit testing to update and read counters.
+ */
+public class TestCounters {
+
+  private static Counters COUNTERS = new Counters();
+  
+  public static Counter getCounter(Enum<?> e) {
+    return COUNTERS.findCounter(e);
+  }
+  
+  public static Counter getCounter(String group, String name) {
+    return COUNTERS.findCounter(group, name);
+  }
+  
+  public static void clearCounters() {
+	  COUNTERS = new Counters();
+  }   
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/tool/CrunchTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/tool/CrunchTool.java b/src/main/java/org/apache/crunch/tool/CrunchTool.java
new file mode 100644
index 0000000..a788520
--- /dev/null
+++ b/src/main/java/org/apache/crunch/tool/CrunchTool.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.tool;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Source;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+
+/**
+ * An extension of the {@code Tool} interface that creates a {@code Pipeline}
+ * instance and provides methods for working with the Pipeline from inside of
+ * the Tool's run method.
+ *
+ */
+public abstract class CrunchTool extends Configured implements Tool {
+
+  protected static final From from = new From();
+  protected static final To to = new To();
+  protected static final At at = new At();
+  
+  private Pipeline pipeline;
+
+  public CrunchTool() throws IOException {
+	this(false);
+  }
+  
+  public CrunchTool(boolean inMemory) throws IOException {
+    this.pipeline = inMemory ? MemPipeline.getInstance() : new MRPipeline(getClass());  
+  }
+  
+  @Override
+  public void setConf(Configuration conf) {
+	super.setConf(conf);
+	if (conf != null && pipeline != null) {
+	  pipeline.setConfiguration(conf);
+	}
+  }
+  
+  @Override
+  public Configuration getConf() {
+	return pipeline.getConfiguration();
+  }
+  
+  public void enableDebug() {
+    pipeline.enableDebug();
+  }
+  
+  public <T> PCollection<T> read(Source<T> source) {
+	return pipeline.read(source);
+  }
+  
+  public <K, V> PTable<K, V> read(TableSource<K, V> tableSource) {
+	return pipeline.read(tableSource);
+  }
+  
+  public PCollection<String> readTextFile(String pathName) {
+	return pipeline.readTextFile(pathName);
+  }
+  
+  public void write(PCollection<?> pcollection, Target target) {
+	pipeline.write(pcollection, target);
+  }
+  
+  public void writeTextFile(PCollection<?> pcollection, String pathName) {
+	pipeline.writeTextFile(pcollection, pathName);
+  }
+  
+  public void run() {
+	pipeline.run();
+  }
+  
+  public void done() {
+	pipeline.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/Converter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/Converter.java b/src/main/java/org/apache/crunch/types/Converter.java
new file mode 100644
index 0000000..70705f0
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/Converter.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.io.Serializable;
+
+import org.apache.crunch.DoFn;
+
+/**
+ * Converts the input key/value from a MapReduce task into the input to a
+ * {@link DoFn}, or takes the output of a {@code DoFn} and write it to
+ * the output key/values.
+ */
+public interface Converter<K, V, S, T> extends Serializable {
+  S convertInput(K key, V value);
+  
+  T convertIterableInput(K key, Iterable<V> value);
+  
+  K outputKey(S value);
+  
+  V outputValue(S value);
+  
+  Class<K> getKeyClass();
+  
+  Class<V> getValueClass();
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/PGroupedTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/PGroupedTableType.java b/src/main/java/org/apache/crunch/types/PGroupedTableType.java
new file mode 100644
index 0000000..e29018a
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/PGroupedTableType.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.SourceTarget;
+
+/**
+ * The {@code PType} instance for {@link PGroupedTable} instances. Its settings are
+ * derived from the {@code PTableType} that was grouped to create the
+ * {@code PGroupedTable} instance.
+ *
+ */
+public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable<V>>> {
+
+  protected static class PTypeIterable<V> implements Iterable<V> {
+    private final Iterable<Object> iterable;
+    private final MapFn<Object, V> mapFn;
+
+    public PTypeIterable(MapFn<Object, V> mapFn, Iterable<Object> iterable) {
+      this.mapFn = mapFn;
+      this.iterable = iterable;
+    }
+
+    public Iterator<V> iterator() {
+      return new Iterator<V>() {
+        Iterator<Object> iter = iterable.iterator();
+
+        public boolean hasNext() {
+          return iter.hasNext();
+        }
+
+        public V next() {
+          return mapFn.map(iter.next());
+        }
+
+        public void remove() {
+          iter.remove();
+        }
+      };
+    }
+  }
+
+  public static class PairIterableMapFn<K, V> extends
+      MapFn<Pair<Object, Iterable<Object>>, Pair<K, Iterable<V>>> {    
+    private final MapFn<Object, K> keys;
+    private final MapFn<Object, V> values;
+
+    public PairIterableMapFn(MapFn<Object, K> keys, MapFn<Object, V> values) {
+      this.keys = keys;
+      this.values = values;
+    }
+
+    @Override
+    public void initialize() {
+      keys.initialize();
+      values.initialize();
+    }
+
+    @Override
+    public Pair<K, Iterable<V>> map(Pair<Object, Iterable<Object>> input) {
+      return Pair.<K, Iterable<V>>of(keys.map(input.first()),
+          new PTypeIterable(values, input.second()));
+    }
+  }
+
+  protected final PTableType<K, V> tableType;
+  
+  public PGroupedTableType(PTableType<K, V> tableType) {
+    this.tableType = tableType;
+  }
+
+  public PTableType<K, V> getTableType() {
+    return tableType;
+  }
+  
+  @Override
+  public PTypeFamily getFamily() {
+    return tableType.getFamily();
+  }
+
+  @Override
+  public List<PType> getSubTypes() {
+    return tableType.getSubTypes();
+  }
+  
+  @Override
+  public Converter getConverter() {
+    return tableType.getConverter();
+  }
+  
+  public abstract Converter getGroupingConverter();
+  
+  public abstract void configureShuffle(Job job, GroupingOptions options);
+  
+  @Override
+  public SourceTarget<Pair<K, Iterable<V>>> getDefaultFileSource(Path path) {
+    throw new UnsupportedOperationException("Grouped tables cannot be written out directly");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/PTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/PTableType.java b/src/main/java/org/apache/crunch/types/PTableType.java
new file mode 100644
index 0000000..b9e946f
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/PTableType.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+
+/**
+ * An extension of {@code PType} specifically for {@link PTable} objects. It
+ * allows separate access to the {@code PType}s of the key and value for the
+ * {@code PTable}.
+ *
+ */
+public interface PTableType<K, V> extends PType<Pair<K, V>> {
+  /**
+   * Returns the key type for the table.
+   */
+  PType<K> getKeyType();
+
+  /**
+   * Returns the value type for the table.
+   */
+  PType<V> getValueType(); 
+  
+  /**
+   * Returns the grouped table version of this type.
+   */
+  PGroupedTableType<K, V> getGroupedTableType();
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/PType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/PType.java b/src/main/java/org/apache/crunch/types/PType.java
new file mode 100644
index 0000000..126b165
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/PType.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.SourceTarget;
+
+/**
+ * A {@code PType} defines a mapping between a data type that is used in a
+ * Crunch pipeline and a serialization and storage format that is used to
+ * read/write data from/to HDFS. Every {@link PCollection} has an associated
+ * {@code PType} that tells Crunch how to read/write data from that
+ * {@code PCollection}.
+ * 
+ */
+public interface PType<T> extends Serializable {
+  /**
+   * Returns the Java type represented by this {@code PType}.
+   */
+  Class<T> getTypeClass();
+
+  /**
+   * Returns the {@code PTypeFamily} that this {@code PType} belongs to.
+   */
+  PTypeFamily getFamily();
+
+  MapFn<Object, T> getInputMapFn();
+
+  MapFn<T, Object> getOutputMapFn();
+
+  Converter getConverter();
+
+  /**
+   * Returns a copy of a value (or the value itself) that can safely be
+   * retained.
+   * <p>
+   * This is useful when iterable values being processed in a DoFn (via a
+   * reducer) need to be held on to for more than the scope of a single
+   * iteration, as a reducer (and therefore also a DoFn that has an Iterable as
+   * input) re-use deserialized values. More information on object reuse is
+   * available in the {@link DoFn} class documentation.
+   * 
+   * @param value
+   *          The value to be deep-copied
+   * @return A deep copy of the input value
+   */
+  T getDetachedValue(T value);
+
+  /**
+   * Returns a {@code SourceTarget} that is able to read/write data using the
+   * serialization format specified by this {@code PType}.
+   */
+  SourceTarget<T> getDefaultFileSource(Path path);
+
+  /**
+   * Returns the sub-types that make up this PType if it is a composite
+   * instance, such as a tuple.
+   */
+  List<PType> getSubTypes();
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/PTypeFamily.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/PTypeFamily.java b/src/main/java/org/apache/crunch/types/PTypeFamily.java
new file mode 100644
index 0000000..2ae9493
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/PTypeFamily.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+
+/**
+ * An abstract factory for creating {@code PType} instances that have the same
+ * serialization/storage backing format.
+ * 
+ */
+public interface PTypeFamily {
+  PType<Void> nulls();
+  
+  PType<String> strings();
+
+  PType<Long> longs();
+
+  PType<Integer> ints();
+
+  PType<Float> floats();
+
+  PType<Double> doubles();
+
+  PType<Boolean> booleans();
+  
+  PType<ByteBuffer> bytes();
+  
+  <T> PType<T> records(Class<T> clazz);
+
+  <T> PType<Collection<T>> collections(PType<T> ptype);
+
+  <T> PType<Map<String, T>> maps(PType<T> ptype);
+  
+  <V1, V2> PType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2);
+
+  <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2,
+      PType<V3> p3);
+
+  <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1,
+      PType<V2> p2, PType<V3> p3, PType<V4> p4);
+
+  PType<TupleN> tuples(PType<?>... ptypes);
+
+  <T extends Tuple> PType<T> tuples(Class<T> clazz, PType<?>... ptypes);
+  
+  <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base);
+  
+  <K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value);
+  
+  /**
+   * Returns the equivalent of the given ptype for this family, if it exists.
+   */
+  <T> PType<T> as(PType<T> ptype);  
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/PTypeUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/PTypeUtils.java b/src/main/java/org/apache/crunch/types/PTypeUtils.java
new file mode 100644
index 0000000..4dda78d
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/PTypeUtils.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+
+/**
+ * Utilities for converting between {@code PType}s from different {@code PTypeFamily}
+ * implementations.
+ *
+ */
+public class PTypeUtils {
+
+  public static <T> PType<T> convert(PType<T> ptype, PTypeFamily tf) {
+    if (ptype instanceof PTableType) {
+      PTableType ptt = (PTableType) ptype;
+      return tf.tableOf(tf.as(ptt.getKeyType()), tf.as(ptt.getValueType()));
+    }
+    Class<T> typeClass = ptype.getTypeClass();
+    if (Tuple.class.isAssignableFrom(typeClass)) {
+      List<PType> subTypes = ptype.getSubTypes();
+      if (Pair.class.equals(typeClass)) {
+        return tf.pairs(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)));
+      } else if (Tuple3.class.equals(typeClass)) {
+        return tf.triples(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)), tf.as(subTypes.get(2)));
+      } else if (Tuple4.class.equals(typeClass)) {
+        return tf.quads(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)),
+            tf.as(subTypes.get(2)), tf.as(subTypes.get(3)));
+      } else if (TupleN.class.equals(typeClass)) {
+        PType[] newPTypes = subTypes.toArray(new PType[0]);
+        for (int i = 0; i < newPTypes.length; i++) {
+          newPTypes[i] = tf.as(subTypes.get(i));
+        }
+        return (PType<T>) tf.tuples(newPTypes);
+      }
+    }
+    if (Collection.class.isAssignableFrom(typeClass)) {
+      return tf.collections(tf.as(ptype.getSubTypes().get(0)));
+    }
+    return tf.records(typeClass);
+  }
+  
+  private PTypeUtils() {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/TupleFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/TupleFactory.java b/src/main/java/org/apache/crunch/types/TupleFactory.java
new file mode 100644
index 0000000..49aae0d
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/TupleFactory.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+
+public abstract class TupleFactory<T extends Tuple> implements Serializable {
+
+  public void initialize() { }
+  
+  public abstract T makeTuple(Object...values);
+  
+  public static final TupleFactory<Pair> PAIR = new TupleFactory<Pair>() {
+    @Override
+    public Pair makeTuple(Object... values) {
+      return Pair.of(values[0], values[1]);
+    }
+  };
+
+  public static final TupleFactory<Tuple3> TUPLE3 = new TupleFactory<Tuple3>() {
+    @Override
+    public Tuple3 makeTuple(Object... values) {
+      return Tuple3.of(values[0], values[1], values[2]);
+    }
+  };
+  
+  public static final TupleFactory<Tuple4> TUPLE4 = new TupleFactory<Tuple4>() {
+    @Override
+    public Tuple4 makeTuple(Object... values) {
+      return Tuple4.of(values[0], values[1], values[2], values[3]);
+    }
+  };
+
+  public static final TupleFactory<TupleN> TUPLEN = new TupleFactory<TupleN>() {
+    @Override
+    public TupleN makeTuple(Object... values) {
+      return new TupleN(values);
+    }
+  };
+
+  public static <T extends Tuple> TupleFactory<T> create(Class<T> clazz, Class... typeArgs) {
+    return new CustomTupleFactory<T>(clazz, typeArgs);
+  }
+  
+  private static class CustomTupleFactory<T extends Tuple> extends TupleFactory<T> {
+
+    private final Class<T> clazz;
+    private final Class[] typeArgs;
+    
+    private transient Constructor<T> constructor;
+    
+    public CustomTupleFactory(Class<T> clazz, Class[] typeArgs) {
+      this.clazz = clazz;
+      this.typeArgs = typeArgs;
+    }
+    
+    @Override
+    public void initialize() {
+      try {
+        constructor = clazz.getConstructor(typeArgs);
+      } catch (Exception e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+    
+    @Override
+    public T makeTuple(Object... values) {
+      try {
+        return constructor.newInstance(values);
+      } catch (Exception e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
new file mode 100644
index 0000000..be9f5e9
--- /dev/null
+++ b/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Serializable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+
+/**
+ * Performs deep copies of Avro-serializable objects.
+ * <p>
+ * <b>Warning:</b> Methods in this class are not thread-safe. This shouldn't be
+ * a problem when running in a map-reduce context where each mapper/reducer is
+ * running in its own JVM, but it may well be a problem in any other kind of
+ * multi-threaded context.
+ */
+public abstract class AvroDeepCopier<T> implements Serializable {
+
+  private BinaryEncoder binaryEncoder;
+  private BinaryDecoder binaryDecoder;
+  protected DatumWriter<T> datumWriter;
+  protected DatumReader<T> datumReader;
+
+  protected AvroDeepCopier(DatumWriter<T> datumWriter, DatumReader<T> datumReader) {
+    this.datumWriter = datumWriter;
+    this.datumReader = datumReader;
+  }
+
+  protected abstract T createCopyTarget();
+
+  /**
+   * Deep copier for Avro specific data objects.
+   */
+  public static class AvroSpecificDeepCopier<T> extends AvroDeepCopier<T> {
+
+    private Class<T> valueClass;
+
+    public AvroSpecificDeepCopier(Class<T> valueClass, Schema schema) {
+      super(new SpecificDatumWriter<T>(schema), new SpecificDatumReader(schema));
+      this.valueClass = valueClass;
+    }
+
+    @Override
+    protected T createCopyTarget() {
+      return createNewInstance(valueClass);
+    }
+
+  }
+
+  /**
+   * Deep copier for Avro generic data objects.
+   */
+  public static class AvroGenericDeepCopier extends AvroDeepCopier<Record> {
+
+    private Schema schema;
+
+    public AvroGenericDeepCopier(Schema schema) {
+      super(new GenericDatumWriter<Record>(schema), new GenericDatumReader<Record>(schema));
+      this.schema = schema;
+    }
+
+    @Override
+    protected Record createCopyTarget() {
+      return new GenericData.Record(schema);
+    }
+  }
+
+  /**
+   * Deep copier for Avro reflect data objects.
+   */
+  public static class AvroReflectDeepCopier<T> extends AvroDeepCopier<T> {
+    private Class<T> valueClass;
+
+    public AvroReflectDeepCopier(Class<T> valueClass, Schema schema) {
+      super(new ReflectDatumWriter<T>(schema), new ReflectDatumReader<T>(schema));
+      this.valueClass = valueClass;
+    }
+
+    @Override
+    protected T createCopyTarget() {
+      return createNewInstance(valueClass);
+    }
+  }
+
+  /**
+   * Create a deep copy of an Avro value.
+   * 
+   * @param source
+   *          The value to be copied
+   * @return The deep copy of the value
+   */
+  public T deepCopy(T source) {
+    ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+    binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder);
+    T target = createCopyTarget();
+    try {
+      datumWriter.write(source, binaryEncoder);
+      binaryEncoder.flush();
+      binaryDecoder = DecoderFactory.get()
+          .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
+      datumReader.read(target, binaryDecoder);
+    } catch (Exception e) {
+      throw new CrunchRuntimeException("Error while deep copying avro value " + source, e);
+    }
+
+    return target;
+  }
+
+  protected T createNewInstance(Class<T> targetClass) {
+    try {
+      return targetClass.newInstance();
+    } catch (InstantiationException e) {
+      throw new CrunchRuntimeException(e);
+    } catch (IllegalAccessException e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+
+}


Mime
View raw message