incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [23/33] CRUNCH-8: Moving the code into multiple Maven modules. Contributed by Matthias Friedrich
Date Wed, 11 Jul 2012 05:14:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/lib/Set.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/Set.java b/src/main/java/org/apache/crunch/lib/Set.java
deleted file mode 100644
index f915d53..0000000
--- a/src/main/java/org/apache/crunch/lib/Set.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib;
-
-import java.util.Collection;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-
-/**
- * Utilities for performing set operations (difference, intersection, etc) on
- * {@code PCollection} instances.
- */
-public class Set {
-
-  /**
-   * Compute the set difference between two sets of elements.
-   * 
-   * @return a collection containing elements that are in <code>coll1</code>
-   * but not in <code>coll2</code>
-   */
-  public static <T> PCollection<T> difference(PCollection<T> coll1,
-      PCollection<T> coll2) {
-    return Cogroup.cogroup(toTable(coll1), toTable(coll2))
-        .parallelDo(new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>, T>() {
-          @Override
-          public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input,
-              Emitter<T> emitter) {
-            Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
-            if (!groups.first().isEmpty() && groups.second().isEmpty()) {
-              emitter.emit(input.first());
-            }
-          }
-        }, coll1.getPType());
-  }
-  
-  /**
-   * Compute the intersection of two sets of elements.
-   * 
-   * @return a collection containing elements that common to both sets
-   * <code>coll1</code> and <code>coll2</code>
-   */
-  public static <T> PCollection<T> intersection(PCollection<T> coll1,
-      PCollection<T> coll2) {
-    return Cogroup.cogroup(toTable(coll1), toTable(coll2))
-        .parallelDo(new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>, T>() {
-          @Override
-          public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input,
-              Emitter<T> emitter) {
-            Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
-            if (!groups.first().isEmpty() && !groups.second().isEmpty()) {
-              emitter.emit(input.first());
-            }
-          }
-        }, coll1.getPType());
-  }
-  
-  /**
-   * Find the elements that are common to two sets, like the Unix <code>comm</code>
-   * utility. This method returns a {@link PCollection} of {@link Tuple3} objects,
-   * and the position in the tuple that an element appears is determined by
-   * the collections that it is a member of, as follows:
-   * <ol>
-   * <li>elements only in <code>coll1</code>,</li>
-   * <li>elements only in <code>coll2</code>, or</li>
-   * <li>elements in both collections</li>
-   * </ol>
-   * Tuples are otherwise filled with <code>null</code>.
-   * 
-   * @return a collection of {@link Tuple3} objects
-   */
-  public static <T> PCollection<Tuple3<T, T, T>> comm(PCollection<T> coll1,
-      PCollection<T> coll2) {
-    PTypeFamily typeFamily = coll1.getTypeFamily();
-    PType<T> type = coll1.getPType();
-    return Cogroup.cogroup(toTable(coll1), toTable(coll2))
-        .parallelDo(new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>,
-            Tuple3<T, T, T>>() {
-          @Override
-          public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input,
-              Emitter<Tuple3<T, T, T>> emitter) {
-            Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
-            boolean inFirst = !groups.first().isEmpty();
-            boolean inSecond = !groups.second().isEmpty();
-            T t = input.first();
-            emitter.emit(Tuple3.of(
-                inFirst && !inSecond ? t : null,
-                    !inFirst && inSecond ? t : null,
-                        inFirst && inSecond ? t : null));
-          }
-        }, typeFamily.triples(type, type, type));
-  }
-  
-  private static <T> PTable<T, Boolean> toTable(PCollection<T> coll) {
-    PTypeFamily typeFamily = coll.getTypeFamily();
-    return coll.parallelDo(new DoFn<T, Pair<T, Boolean>>() {
-      @Override
-      public void process(T input, Emitter<Pair<T, Boolean>> emitter) {
-        emitter.emit(Pair.of(input, Boolean.TRUE));
-      }
-    }, typeFamily.tableOf(coll.getPType(), typeFamily.booleans()));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/lib/Sort.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/Sort.java b/src/main/java/org/apache/crunch/lib/Sort.java
deleted file mode 100644
index 5a3f8e9..0000000
--- a/src/main/java/org/apache/crunch/lib/Sort.java
+++ /dev/null
@@ -1,547 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib;
-
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.io.BinaryData;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.GroupingOptions.Builder;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.TupleWritable;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-/**
- * Utilities for sorting {@code PCollection} instances.
- */
-public class Sort {
-  
-  public enum Order {
-    ASCENDING, DESCENDING, IGNORE
-  }
-  
-  /**
-   * To sort by column 2 ascending then column 1 descending, you would use:
-   * <code>
-   * sortPairs(coll, by(2, ASCENDING), by(1, DESCENDING))
-   * </code>
-   * Column numbering is 1-based.
-   */
-  public static class ColumnOrder {
-    int column;
-    Order order;
-    public ColumnOrder(int column, Order order) {
-      this.column = column;
-      this.order = order;
-    }
-    public static ColumnOrder by(int column, Order order) {
-      return new ColumnOrder(column, order);
-    }
-    
-    @Override
-    public String toString() {
-      return"ColumnOrder: column:" + column + ", Order: " + order;
-    }
-  }
-  
-  /**
-   * Sorts the {@link PCollection} using the natural ordering of its elements.
-   * 
-   * @return a {@link PCollection} representing the sorted collection.
-   */
-  public static <T> PCollection<T> sort(PCollection<T> collection) {
-    return sort(collection, Order.ASCENDING);
-  }
-  
-  /**
-   * Sorts the {@link PCollection} using the natural ordering of its elements
-   * in the order specified.
-   * 
-   * @return a {@link PCollection} representing the sorted collection.
-   */
-  public static <T> PCollection<T> sort(PCollection<T> collection, Order order) {
-    PTypeFamily tf = collection.getTypeFamily();
-    PTableType<T, Void> type = tf.tableOf(collection.getPType(), tf.nulls());
-    Configuration conf = collection.getPipeline().getConfiguration();
-    GroupingOptions options = buildGroupingOptions(conf, tf,
-        collection.getPType(), order);
-    PTable<T, Void> pt =
-      collection.parallelDo("sort-pre", new DoFn<T, Pair<T, Void>>() {
-        @Override
-        public void process(T input,
-            Emitter<Pair<T, Void>> emitter) {
-          emitter.emit(Pair.of(input, (Void) null));
-        }
-      }, type);
-    PTable<T, Void> sortedPt = pt.groupByKey(options).ungroup();
-    return sortedPt.parallelDo("sort-post", new DoFn<Pair<T, Void>, T>() {
-      @Override
-      public void process(Pair<T, Void> input, Emitter<T> emitter) {
-        emitter.emit(input.first());
-      }
-    }, collection.getPType());
-  }
-  
-
-  /**
-   * Sorts the {@link PTable} using the natural ordering of its keys.
-   * 
-   * @return a {@link PTable} representing the sorted table.
-   */
-  public static <K, V> PTable<K, V> sort(PTable<K, V> table) {
-    return sort(table, Order.ASCENDING);
-  }
-
-  /**
-   * Sorts the {@link PTable} using the natural ordering of its keys
-   * in the order specified.
-   * 
-   * @return a {@link PTable} representing the sorted collection.
-   */
-  public static <K, V> PTable<K, V> sort(PTable<K, V> table, Order key) {
-    PTypeFamily tf = table.getTypeFamily();
-    Configuration conf = table.getPipeline().getConfiguration();
-    GroupingOptions options = buildGroupingOptions(conf, tf, table.getKeyType(), key);
-    return table.groupByKey(options).ungroup();
-  }
-  
-  /**
-   * Sorts the {@link PCollection} of {@link Pair}s using the specified column
-   * ordering.
-   * 
-   * @return a {@link PCollection} representing the sorted collection.
-   */
-  public static <U, V> PCollection<Pair<U, V>> sortPairs(
-      PCollection<Pair<U, V>> collection, ColumnOrder... columnOrders) {
-    // put U and V into a pair/tuple in the key so we can do grouping and sorting
-    PTypeFamily tf = collection.getTypeFamily();
-    PType<Pair<U, V>> pType = collection.getPType();
-    @SuppressWarnings("unchecked")
-    PTableType<Pair<U, V>, Void> type = tf.tableOf(
-        tf.pairs(pType.getSubTypes().get(0), pType.getSubTypes().get(1)),
-        tf.nulls());
-    PTable<Pair<U, V>, Void> pt =
-      collection.parallelDo(new DoFn<Pair<U, V>, Pair<Pair<U, V>, Void>>() {
-        @Override
-        public void process(Pair<U, V> input,
-            Emitter<Pair<Pair<U, V>, Void>> emitter) {
-          emitter.emit(Pair.of(input, (Void) null));
-        }
-      }, type);
-    Configuration conf = collection.getPipeline().getConfiguration();
-    GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
-    PTable<Pair<U, V>, Void> sortedPt = pt.groupByKey(options).ungroup();
-    return sortedPt.parallelDo(new DoFn<Pair<Pair<U, V>,Void>, Pair<U, V>>() {
-      @Override
-      public void process(Pair<Pair<U, V>, Void> input,
-          Emitter<Pair<U, V>> emitter) {
-        emitter.emit(input.first());
-      }
-    }, collection.getPType());
-  }
-
-  /**
-   * Sorts the {@link PCollection} of {@link Tuple3}s using the specified column
-   * ordering.
-   * 
-   * @return a {@link PCollection} representing the sorted collection.
-   */
-  public static <V1, V2, V3> PCollection<Tuple3<V1, V2, V3>> sortTriples(
-      PCollection<Tuple3<V1, V2, V3>> collection, ColumnOrder... columnOrders) {
-    PTypeFamily tf = collection.getTypeFamily();
-    PType<Tuple3<V1, V2, V3>> pType = collection.getPType();
-    @SuppressWarnings("unchecked")
-    PTableType<Tuple3<V1, V2, V3>, Void> type = tf.tableOf(
-        tf.triples(pType.getSubTypes().get(0), pType.getSubTypes().get(1), pType.getSubTypes().get(2)),
-        tf.nulls());
-    PTable<Tuple3<V1, V2, V3>, Void> pt =
-      collection.parallelDo(new DoFn<Tuple3<V1, V2, V3>, Pair<Tuple3<V1, V2, V3>, Void>>() {
-        @Override
-        public void process(Tuple3<V1, V2, V3> input,
-            Emitter<Pair<Tuple3<V1, V2, V3>, Void>> emitter) {
-          emitter.emit(Pair.of(input, (Void) null));
-        }
-      }, type);
-    Configuration conf = collection.getPipeline().getConfiguration();
-    GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
-    PTable<Tuple3<V1, V2, V3>, Void> sortedPt = pt.groupByKey(options).ungroup();
-    return sortedPt.parallelDo(new DoFn<Pair<Tuple3<V1, V2, V3>,Void>, Tuple3<V1, V2, V3>>() {
-      @Override
-      public void process(Pair<Tuple3<V1, V2, V3>, Void> input,
-          Emitter<Tuple3<V1, V2, V3>> emitter) {
-        emitter.emit(input.first());
-      }
-    }, collection.getPType());
-  }
-
-  /**
-   * Sorts the {@link PCollection} of {@link Tuple4}s using the specified column
-   * ordering.
-   * 
-   * @return a {@link PCollection} representing the sorted collection.
-   */
-  public static <V1, V2, V3, V4> PCollection<Tuple4<V1, V2, V3, V4>> sortQuads(
-      PCollection<Tuple4<V1, V2, V3, V4>> collection, ColumnOrder... columnOrders) {
-    PTypeFamily tf = collection.getTypeFamily();
-    PType<Tuple4<V1, V2, V3, V4>> pType = collection.getPType();
-    @SuppressWarnings("unchecked")
-    PTableType<Tuple4<V1, V2, V3, V4>, Void> type = tf.tableOf(
-        tf.quads(pType.getSubTypes().get(0), pType.getSubTypes().get(1), pType.getSubTypes().get(2),  pType.getSubTypes().get(3)),
-        tf.nulls());
-    PTable<Tuple4<V1, V2, V3, V4>, Void> pt =
-      collection.parallelDo(new DoFn<Tuple4<V1, V2, V3, V4>, Pair<Tuple4<V1, V2, V3, V4>, Void>>() {
-        @Override
-        public void process(Tuple4<V1, V2, V3, V4> input,
-            Emitter<Pair<Tuple4<V1, V2, V3, V4>, Void>> emitter) {
-          emitter.emit(Pair.of(input, (Void) null));
-        }
-      }, type);
-    Configuration conf = collection.getPipeline().getConfiguration();
-    GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
-    PTable<Tuple4<V1, V2, V3, V4>, Void> sortedPt = pt.groupByKey(options).ungroup();
-    return sortedPt.parallelDo(new DoFn<Pair<Tuple4<V1, V2, V3, V4>,Void>, Tuple4<V1, V2, V3, V4>>() {
-      @Override
-      public void process(Pair<Tuple4<V1, V2, V3, V4>, Void> input,
-          Emitter<Tuple4<V1, V2, V3, V4>> emitter) {
-        emitter.emit(input.first());
-      }
-    }, collection.getPType());
-  }
-
-  /**
-   * Sorts the {@link PCollection} of {@link TupleN}s using the specified column
-   * ordering.
-   * 
-   * @return a {@link PCollection} representing the sorted collection.
-   */
-  public static PCollection<TupleN> sortTuples(PCollection<TupleN> collection,
-      ColumnOrder... columnOrders) {
-    PTypeFamily tf = collection.getTypeFamily();
-    PType<TupleN> pType = collection.getPType();
-    PTableType<TupleN, Void> type = tf.tableOf(
-        tf.tuples(pType.getSubTypes().toArray(new PType[0])),
-        tf.nulls());
-    PTable<TupleN, Void> pt =
-      collection.parallelDo(new DoFn<TupleN, Pair<TupleN, Void>>() {
-        @Override
-        public void process(TupleN input,
-            Emitter<Pair<TupleN, Void>> emitter) {
-          emitter.emit(Pair.of(input, (Void) null));
-        }
-      }, type);
-    Configuration conf = collection.getPipeline().getConfiguration();
-    GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
-    PTable<TupleN, Void> sortedPt = pt.groupByKey(options).ungroup();
-    return sortedPt.parallelDo(new DoFn<Pair<TupleN,Void>, TupleN>() {
-      @Override
-      public void process(Pair<TupleN, Void> input,
-          Emitter<TupleN> emitter) {
-        emitter.emit(input.first());
-      }
-    }, collection.getPType());
-  }
-  
-  // TODO: move to type family?
-  private static <T> GroupingOptions buildGroupingOptions(Configuration conf,
-      PTypeFamily tf, PType<T> ptype, Order order) {
-    Builder builder = GroupingOptions.builder();
-    if (order == Order.DESCENDING) {
-      if (tf == WritableTypeFamily.getInstance()) {
-        builder.sortComparatorClass(ReverseWritableComparator.class);
-      } else if (tf == AvroTypeFamily.getInstance()) {
-        AvroType<T> avroType = (AvroType<T>) ptype;
-        Schema schema = avroType.getSchema();
-        conf.set("crunch.schema", schema.toString());
-        builder.sortComparatorClass(ReverseAvroComparator.class);
-      } else {
-        throw new RuntimeException("Unrecognized type family: " + tf);
-      }
-    }
-    return builder.build();
-  }
-  
-  private static <T> GroupingOptions buildGroupingOptions(Configuration conf,
-      PTypeFamily tf, PType<T> ptype, ColumnOrder[] columnOrders) {
-    Builder builder = GroupingOptions.builder();
-    if (tf == WritableTypeFamily.getInstance()) {
-      TupleWritableComparator.configureOrdering(conf, columnOrders);
-      builder.sortComparatorClass(TupleWritableComparator.class);
-    } else if (tf == AvroTypeFamily.getInstance()) {
-      TupleAvroComparator.configureOrdering(conf, columnOrders, ptype);
-      builder.sortComparatorClass(TupleAvroComparator.class);
-    } else {
-      throw new RuntimeException("Unrecognized type family: " + tf);
-    }
-    return builder.build();
-  }
-  
-  static class ReverseWritableComparator<T> extends Configured implements RawComparator<T> {
-    
-    RawComparator<T> comparator;
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public void setConf(Configuration conf) {
-      super.setConf(conf);
-      if (conf != null) {
-        JobConf jobConf = new JobConf(conf);
-        comparator = WritableComparator.get(
-            jobConf.getMapOutputKeyClass().asSubclass(WritableComparable.class));
-      }
-    }
-
-    @Override
-    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4,
-        int arg5) {
-      return -comparator.compare(arg0, arg1, arg2, arg3, arg4, arg5);
-    }
-
-    @Override
-    public int compare(T o1, T o2) {
-      return -comparator.compare(o1, o2);
-    }
-
-  }
-  
-  static class ReverseAvroComparator<T> extends Configured implements RawComparator<T> {
-
-    Schema schema;
-    
-    @Override
-    public void setConf(Configuration conf) {
-      super.setConf(conf);
-      if (conf != null) {
-        schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
-      }
-    }
-
-    @Override
-    public int compare(T o1, T o2) {
-      return -ReflectData.get().compare(o1, o2, schema);
-    }
-
-    @Override
-    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4,
-        int arg5) {
-      return -BinaryData.compare(arg0, arg1, arg2, arg3, arg4, arg5, schema);
-    }
-    
-  }
-  
-  static class TupleWritableComparator extends WritableComparator implements Configurable {
-    
-    private static final String CRUNCH_ORDERING_PROPERTY = "crunch.ordering";
-    
-    Configuration conf;
-    ColumnOrder[] columnOrders;
-    
-    public TupleWritableComparator() {
-      super(TupleWritable.class, true);
-    }
-    
-    public static void configureOrdering(Configuration conf, Order... orders) {
-      conf.set(CRUNCH_ORDERING_PROPERTY, Joiner.on(",").join(
-        Iterables.transform(Arrays.asList(orders),
-          new Function<Order, String>() {
-        @Override
-        public String apply(Order o) {
-          return o.name();
-        }
-      })));
-    }
-
-    
-    public static void configureOrdering(Configuration conf, ColumnOrder... columnOrders) {
-      conf.set(CRUNCH_ORDERING_PROPERTY, Joiner.on(",").join(
-        Iterables.transform(Arrays.asList(columnOrders),
-          new Function<ColumnOrder, String>() {
-        @Override
-        public String apply(ColumnOrder o) {
-          return o.column + ";" + o.order.name();
-        }
-      })));
-    }
-    
-    @Override
-    public int compare(WritableComparable a, WritableComparable b) {
-      TupleWritable ta = (TupleWritable) a;
-      TupleWritable tb = (TupleWritable) b;
-      for (int i = 0; i < columnOrders.length; i++) {
-        int index = columnOrders[i].column - 1;
-        int order = 1;
-        if (columnOrders[i].order == Order.ASCENDING) {
-          order = 1;
-        } else  if (columnOrders[i].order == Order.DESCENDING) {
-          order = -1;
-        } else { // ignore
-          continue;
-        }
-        if (!ta.has(index) && !tb.has(index)) {
-          continue;
-        } else if (ta.has(index) && !tb.has(index)) {
-          return order;
-        } else if (!ta.has(index) && tb.has(index)) {
-          return -order;
-        } else {
-          Writable v1 = ta.get(index);
-          Writable v2 = tb.get(index);
-          if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
-            if (v1 instanceof WritableComparable
-                && v2 instanceof WritableComparable) {
-              int cmp = ((WritableComparable) v1)
-                  .compareTo((WritableComparable) v2);
-              if (cmp != 0) {
-                return order * cmp;
-              }
-            } else {
-              int cmp = v1.hashCode() - v2.hashCode();
-              if (cmp != 0) {
-                return order * cmp;
-              }
-            }
-          }
-        }
-      }
-      return 0; // ordering using specified columns found no differences
-    }
-
-    @Override
-    public Configuration getConf() {
-      return conf;
-    }
-
-    @Override
-    public void setConf(Configuration conf) {
-      this.conf = conf;
-      if (conf != null) {
-        String ordering = conf.get(CRUNCH_ORDERING_PROPERTY);
-        String[] columnOrderNames = ordering.split(",");
-        columnOrders = new ColumnOrder[columnOrderNames.length];
-        for (int i = 0; i < columnOrders.length; i++) {
-          String[] split = columnOrderNames[i].split(";");
-          int column = Integer.parseInt(split[0]);
-          Order order = Order.valueOf(split[1]);
-          columnOrders[i] = ColumnOrder.by(column, order);
-          
-        }
-      }
-    }
-  }
-  
-  static class TupleAvroComparator<T> extends Configured implements RawComparator<T> {
-
-    Schema schema;
-    
-    @Override
-    public void setConf(Configuration conf) {
-      super.setConf(conf);
-      if (conf != null) {
-        schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
-      }
-    }
-
-    public static <S> void configureOrdering(Configuration conf, ColumnOrder[] columnOrders,
-        PType<S> ptype) {
-      Schema orderedSchema = createOrderedTupleSchema(ptype, columnOrders);
-      conf.set("crunch.schema", orderedSchema.toString());
-    }
-    
-    // TODO: move to Avros
-    // TODO: need to re-order columns in map output then switch back in the reduce
-    //       this will require more extensive changes in Crunch
-    private static <S> Schema createOrderedTupleSchema(PType<S> ptype, ColumnOrder[] orders) {
-      // Guarantee each tuple schema has a globally unique name
-      String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
-      Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
-      List<Schema.Field> fields = Lists.newArrayList();
-      AvroType<S> parentAvroType = (AvroType<S>) ptype;
-      Schema parentAvroSchema = parentAvroType.getSchema();
-      
-      BitSet orderedColumns = new BitSet();
-      // First add any fields specified by ColumnOrder
-      for (ColumnOrder columnOrder : orders) {
-        int index = columnOrder.column - 1;
-        AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(index);
-        Schema fieldSchema = Schema.createUnion(
-            ImmutableList.of(atype.getSchema(), Schema.create(Type.NULL)));
-        String fieldName = parentAvroSchema.getFields().get(index).name();
-        fields.add(new Schema.Field(fieldName, fieldSchema, "", null,
-            Schema.Field.Order.valueOf(columnOrder.order.name())));
-        orderedColumns.set(index);
-      }
-      // Then add remaining fields from the ptypes, with no sort order
-      for (int i = 0; i < ptype.getSubTypes().size(); i++) {
-        if (orderedColumns.get(i)) {
-          continue;
-        }
-        AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(i);
-        Schema fieldSchema = Schema.createUnion(
-            ImmutableList.of(atype.getSchema(), Schema.create(Type.NULL)));
-        String fieldName = parentAvroSchema.getFields().get(i).name();
-        fields.add(new Schema.Field(fieldName, fieldSchema, "", null,
-            Schema.Field.Order.IGNORE));
-      }
-      schema.setFields(fields);
-      return schema;
-    }
-
-    @Override
-    public int compare(T o1, T o2) {
-      return ReflectData.get().compare(o1, o2, schema);
-    }
-
-    @Override
-    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4,
-        int arg5) {
-      return BinaryData.compare(arg0, arg1, arg2, arg3, arg4, arg5, schema);
-    }
-    
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java b/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
deleted file mode 100644
index 4f0d5cc..0000000
--- a/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import java.util.List;
-
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PType;
-import com.google.common.collect.Lists;
-
-/**
- * Used to perform the last step of an full outer join.
- *
- * @param <K> Type of the keys.
- * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
- */
-public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
-  
-  private transient int lastId;
-  private transient K lastKey;
-  private transient List<U> leftValues;
-
-  public FullOuterJoinFn(PType<U> leftValueType) {
-    super(leftValueType);
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void initialize() {
-    lastId = 1;
-    lastKey = null;
-    this.leftValues = Lists.newArrayList();
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void join(K key, int id, Iterable<Pair<U, V>> pairs,
-      Emitter<Pair<K, Pair<U, V>>> emitter) {
-    if (!key.equals(lastKey)) {
-      // Make sure that left side gets emitted.
-      if (0 == lastId && 0 == id) {
-        for (U u : leftValues) {
-          emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
-        }
-      }
-      lastKey = key;
-      leftValues.clear();
-    }
-    if (id == 0) {
-      for (Pair<U, V> pair : pairs) {
-        if (pair.first() != null)
-          leftValues.add(leftValueType.getDetachedValue(pair.first()));
-      }
-    } else {
-      for (Pair<U, V> pair : pairs) {
-        // Make sure that right side gets emitted.
-        if (leftValues.isEmpty()) {
-          leftValues.add(null);
-        }
-        for (U u : leftValues) {
-          emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
-        }
-      }
-    }
-
-    lastId = id;
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void cleanup(Emitter<Pair<K, Pair<U, V>>> emitter) {
-    if (0 == lastId) {
-      for (U u : leftValues) {
-        emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
-      }
-    }
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public String getJoinType() { return "fullOuterJoin"; }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java b/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
deleted file mode 100644
index 0c46bdf..0000000
--- a/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import java.util.List;
-
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PType;
-import com.google.common.collect.Lists;
-
-/**
- * Used to perform the last step of an inner join.
- *
- * @param <K> Type of the keys.
- * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
- */
-public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> {
-
-  private transient K lastKey;
-  private transient List<U> leftValues;
-  
-  public InnerJoinFn(PType<U> leftValueType) {
-    super(leftValueType);
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void initialize() {
-    lastKey = null;
-    this.leftValues = Lists.newArrayList();
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void join(K key, int id, Iterable<Pair<U, V>> pairs,
-      Emitter<Pair<K, Pair<U, V>>> emitter) {
-    if (!key.equals(lastKey)) {
-      lastKey = key;
-      leftValues.clear();
-    }
-    if (id == 0) { // from left
-      for (Pair<U, V> pair : pairs) {
-        if (pair.first() != null)
-          leftValues.add(leftValueType.getDetachedValue(pair.first()));
-      }
-    } else { // from right
-      for (Pair<U, V> pair : pairs) {
-        for (U u : leftValues) {
-          emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
-        }
-      }
-    }
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public String getJoinType() { return "innerJoin"; }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/lib/join/JoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/join/JoinFn.java b/src/main/java/org/apache/crunch/lib/join/JoinFn.java
deleted file mode 100644
index 494031c..0000000
--- a/src/main/java/org/apache/crunch/lib/join/JoinFn.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PType;
-
-/**
- * Represents a {@link org.apache.crunch.DoFn} for performing joins.
- *
- * @param <K> Type of the keys.
- * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
- */
-public abstract class JoinFn<K, U, V>
-    extends DoFn<Pair<Pair<K, Integer>, Iterable<Pair<U, V>>>, Pair<K, Pair<U, V>>> {
-  
-  protected PType<U> leftValueType;
-
-  /**
-   * Instantiate with the PType of the value of the left side of the join (used
-   * for creating deep copies of values).
-   * 
-   * @param leftValueType
-   *          The PType of the value type of the left side of the join
-   */
-  public JoinFn(PType<U> leftValueType) {
-    this.leftValueType = leftValueType;
-  }
-
-  /** @return The name of this join type (e.g. innerJoin, leftOuterJoin). */
-  public abstract String getJoinType();
-
-  /**
-   * Performs the actual joining.
-   *
-   * @param key The key for this grouping of values.
-   * @param id The side that this group of values is from (0 -> left, 1 -> right).
-   * @param pairs The group of values associated with this key and id pair.
-   * @param emitter The emitter to send the output to.
-   */
-  public abstract void join(K key, int id, Iterable<Pair<U, V>> pairs,
-      Emitter<Pair<K, Pair<U, V>>> emitter);
-
-  /**
-   * Split up the input record to make coding a bit more manageable.
-   *
-   * @param input The input record.
-   * @param emitter The emitter to send the output to.
-   */
-  @Override
-  public void process(Pair<Pair<K, Integer>, Iterable<Pair<U, V>>> input,
-      Emitter<Pair<K, Pair<U, V>>> emitter) {
-    join(input.first().first(), input.first().second(), input.second(), emitter);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/join/JoinUtils.java b/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
deleted file mode 100644
index c282642..0000000
--- a/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.io.BinaryData;
-import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Partitioner;
-
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.writable.TupleWritable;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-
-/**
- * Utilities that are useful in joining multiple data sets via a MapReduce.
- *
- */
-public class JoinUtils {
-
-  public static Class<? extends Partitioner> getPartitionerClass(PTypeFamily typeFamily) {
-    if (typeFamily == WritableTypeFamily.getInstance()) {
-      return TupleWritablePartitioner.class;
-    } else {
-      return AvroIndexedRecordPartitioner.class;
-    }
-  }
-  
-  public static Class<? extends RawComparator> getGroupingComparator(PTypeFamily typeFamily) {
-    if (typeFamily == WritableTypeFamily.getInstance()) {
-      return TupleWritableComparator.class;
-    } else {
-      return AvroPairGroupingComparator.class;
-    }
-  }
-  
-  public static class TupleWritablePartitioner extends Partitioner<TupleWritable, Writable> {
-    @Override
-    public int getPartition(TupleWritable key, Writable value, int numPartitions) {
-      return (Math.abs(key.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
-    }
-  }
-  
-  public static class TupleWritableComparator implements RawComparator<TupleWritable> {
-    
-    private DataInputBuffer buffer = new DataInputBuffer();
-    private TupleWritable key1 = new TupleWritable();
-    private TupleWritable key2 = new TupleWritable();
-    
-    @Override
-    public int compare(TupleWritable o1, TupleWritable o2) {
-      return ((WritableComparable)o1.get(0)).compareTo((WritableComparable)o2.get(0));
-    }
-
-    @Override
-    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-      try {
-        buffer.reset(b1, s1, l1);
-        key1.readFields(buffer);
-      
-        buffer.reset(b2, s2, l2);
-        key2.readFields(buffer);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-      
-      return compare(key1, key2);
-    }
-  }
-  
-  public static class AvroIndexedRecordPartitioner<K, V> extends Partitioner<AvroKey<K>, AvroValue<V>> {
-    @Override
-    public int getPartition(AvroKey<K> key, AvroValue<V> value, int numPartitions) {
-      IndexedRecord record = (IndexedRecord) key.datum();
-      return (Math.abs(record.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
-    }
-  }
-  
-  public static class AvroPairGroupingComparator<T> extends Configured implements RawComparator<AvroWrapper<T>> {
-    private Schema schema;
-
-    @Override
-    public void setConf(Configuration conf) {
-      super.setConf(conf);
-      if (conf != null) {
-        Schema mapOutputSchema = AvroJob.getMapOutputSchema(conf);
-        Schema keySchema = org.apache.avro.mapred.Pair.getKeySchema(mapOutputSchema);
-        schema = keySchema.getFields().get(0).schema();
-      }
-    }
-    
-    @Override
-    public int compare(AvroWrapper<T> x, AvroWrapper<T> y) {
-      return ReflectData.get().compare(x.datum(), y.datum(), schema);
-    }
-
-    @Override
-    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-      return BinaryData.compare(b1, s1, l1, b2, s2, l2, schema);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java b/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
deleted file mode 100644
index 6e4d3c6..0000000
--- a/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import java.util.List;
-
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PType;
-import com.google.common.collect.Lists;
-
-/**
- * Used to perform the last step of an left outer join.
- *
- * @param <K> Type of the keys.
- * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
- */
-public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
-  
-  private transient int lastId;
-  private transient K lastKey;
-  private transient List<U> leftValues;
-
-  public LeftOuterJoinFn(PType<U> leftValueType) {
-    super(leftValueType);
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void initialize() {
-    lastId = 1;
-    lastKey = null;
-    this.leftValues = Lists.newArrayList();
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void join(K key, int id, Iterable<Pair<U, V>> pairs,
-      Emitter<Pair<K, Pair<U, V>>> emitter) {
-    if (!key.equals(lastKey)) {
-      // Make sure that left side always gets emitted.
-      if (0 == lastId && 0 == id) {
-        for (U u : leftValues) {
-          emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
-        }
-      }
-      lastKey = key;
-      leftValues.clear();
-    }
-    if (id == 0) {
-      for (Pair<U, V> pair : pairs) {
-        if (pair.first() != null)
-          leftValues.add(leftValueType.getDetachedValue(pair.first()));
-      }
-    } else {
-      for (Pair<U, V> pair : pairs) {
-        for (U u : leftValues) {
-          emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
-        }
-      }
-    }
-
-    lastId = id;
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void cleanup(Emitter<Pair<K, Pair<U, V>>> emitter) {
-    if (0 == lastId) {
-      for (U u : leftValues) {
-        emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
-      }
-    }
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public String getJoinType() { return "leftOuterJoin"; }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java b/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
deleted file mode 100644
index 28a0829..0000000
--- a/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import java.io.IOException;
-
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.crunch.io.impl.SourcePathTargetImpl;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-
-/**
- * Utility for doing map side joins on a common key between two {@link PTable}s.
- * <p>
- * A map side join is an optimized join which doesn't use a reducer; instead,
- * the right side of the join is loaded into memory and the join is performed in
- * a mapper. This style of join has the important implication that the output of
- * the join is not sorted, which is the case with a conventional (reducer-based)
- * join.
- * <p>
- * <b>Note:</b>This utility is only supported when running with a
- * {@link MRPipeline} as the pipeline.
- */
-public class MapsideJoin {
-
-  /**
-   * Join two tables using a map side join. The right-side table will be loaded
-   * fully in memory, so this method should only be used if the right side
-   * table's contents can fit in the memory allocated to mappers. The join
-   * performed by this method is an inner join.
-   * 
-   * @param left
-   *          The left-side table of the join
-   * @param right
-   *          The right-side table of the join, whose contents will be fully
-   *          read into memory
-   * @return A table keyed on the join key, containing pairs of joined values
-   */
-  public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right) {
-
-    if (!(right.getPipeline() instanceof MRPipeline)) {
-      throw new CrunchRuntimeException("Map-side join is only supported within a MapReduce context");
-    }
-
-    MRPipeline pipeline = (MRPipeline) right.getPipeline();
-    pipeline.materialize(right);
-
-    // TODO Move necessary logic to MRPipeline so that we can theoretically
-    // optimize his by running the setup of multiple map-side joins concurrently
-    pipeline.run();
-
-    ReadableSourceTarget<Pair<K, V>> readableSourceTarget = pipeline
-        .getMaterializeSourceTarget(right);
-    if (!(readableSourceTarget instanceof SourcePathTargetImpl)) {
-      throw new CrunchRuntimeException("Right-side contents can't be read from a path");
-    }
-
-    // Suppress warnings because we've just checked this cast via instanceof
-    @SuppressWarnings("unchecked")
-    SourcePathTargetImpl<Pair<K, V>> sourcePathTarget = (SourcePathTargetImpl<Pair<K, V>>) readableSourceTarget;
-
-    Path path = sourcePathTarget.getPath();
-    DistributedCache.addCacheFile(path.toUri(), pipeline.getConfiguration());
-
-    MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(path.toString(),
-        right.getPType());
-    PTypeFamily typeFamily = left.getTypeFamily();
-    return left.parallelDo(
-        "mapjoin",
-        mapJoinDoFn,
-        typeFamily.tableOf(left.getKeyType(),
-            typeFamily.pairs(left.getValueType(), right.getValueType())));
-
-  }
-
-  static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
-
-    private String inputPath;
-    private PType<Pair<K, V>> ptype;
-    private Multimap<K, V> joinMap;
-
-    public MapsideJoinDoFn(String inputPath, PType<Pair<K, V>> ptype) {
-      this.inputPath = inputPath;
-      this.ptype = ptype;
-    }
-
-    private Path getCacheFilePath() {
-      try {
-        for (Path localPath : DistributedCache.getLocalCacheFiles(getConfiguration())) {
-          if (localPath.toString().endsWith(inputPath)) {
-            return localPath.makeQualified(FileSystem.getLocal(getConfiguration()));
-
-          }
-        }
-      } catch (IOException e) {
-        throw new CrunchRuntimeException(e);
-      }
-
-      throw new CrunchRuntimeException("Can't find local cache file for '" + inputPath + "'");
-    }
-
-    @Override
-    public void initialize() {
-      super.initialize();
-
-      ReadableSourceTarget<Pair<K, V>> sourceTarget = (ReadableSourceTarget<Pair<K, V>>) ptype
-          .getDefaultFileSource(getCacheFilePath());
-      Iterable<Pair<K, V>> iterable = null;
-      try {
-        iterable = sourceTarget.read(getConfiguration());
-      } catch (IOException e) {
-        throw new CrunchRuntimeException("Error reading right-side of map side join: ", e);
-      }
-
-      joinMap = ArrayListMultimap.create();
-      for (Pair<K, V> joinPair : iterable) {
-        joinMap.put(joinPair.first(), joinPair.second());
-      }
-    }
-
-    @Override
-    public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
-      K key = input.first();
-      U value = input.second();
-      for (V joinValue : joinMap.get(key)) {
-        Pair<U, V> valuePair = Pair.of(value, joinValue);
-        emitter.emit(Pair.of(key, valuePair));
-      }
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java b/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
deleted file mode 100644
index a7b2a0d..0000000
--- a/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.lib.join;
-
-import java.util.List;
-
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.PType;
-import com.google.common.collect.Lists;
-
-/**
- * Used to perform the last step of an right outer join.
- *
- * @param <K> Type of the keys.
- * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
- */
-public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
-
-  private transient K lastKey;
-  private transient List<U> leftValues;
-
-  public RightOuterJoinFn(PType<U> leftValueType) {
-    super(leftValueType);
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void initialize() {
-    lastKey = null;
-    this.leftValues = Lists.newArrayList();
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void join(K key, int id, Iterable<Pair<U, V>> pairs,
-      Emitter<Pair<K, Pair<U, V>>> emitter) {
-    if (!key.equals(lastKey)) {
-      lastKey = key;
-      leftValues.clear();
-    }
-    if (id == 0) {
-      for (Pair<U, V> pair : pairs) {
-        if (pair.first() != null)
-          leftValues.add(leftValueType.getDetachedValue(pair.first()));
-      }
-    } else {
-      for (Pair<U, V> pair : pairs) {
-        // Make sure that right side gets emitted.
-        if (leftValues.isEmpty()) {
-          leftValues.add(null);
-        }
-
-        for (U u : leftValues) {
-          emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
-        }
-      }
-    }
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public String getJoinType() { return "rightOuterJoin"; }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java b/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
deleted file mode 100644
index c713d90..0000000
--- a/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.materialize;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
-import org.apache.crunch.io.ReadableSourceTarget;
-
-public class MaterializableIterable<E> implements Iterable<E> {
-
-  private static final Log LOG = LogFactory.getLog(MaterializableIterable.class);
-  
-  private final Pipeline pipeline;
-  private final ReadableSourceTarget<E> sourceTarget;
-  private Iterable<E> materialized;
-  
-  public MaterializableIterable(Pipeline pipeline, ReadableSourceTarget<E> source) {
-	this.pipeline = pipeline;
-	this.sourceTarget = source;
-	this.materialized = null;
-  }
-  
-  public ReadableSourceTarget<E> getSourceTarget() {
-    return sourceTarget;
-  }
-  
-  @Override
-  public Iterator<E> iterator() {
-    if (materialized == null) {
-      pipeline.run();
-      materialize();
-    }
-    return materialized.iterator();
-  }
-
-  public void materialize() {
-	try {
-	  materialized = sourceTarget.read(pipeline.getConfiguration());
-	} catch (IOException e) {
-	  LOG.error("Could not materialize: " + sourceTarget, e);
-	  throw new CrunchRuntimeException(e);
-	}	
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/materialize/MaterializableMap.java b/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
deleted file mode 100644
index e751f50..0000000
--- a/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.materialize;
-
-import java.util.AbstractMap;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.crunch.Pair;
-
-public class MaterializableMap<K, V> extends AbstractMap<K, V> {
-  
-  private Iterable<Pair<K, V>> iterable;
-  private Set<Map.Entry<K, V>> entrySet;
-  
-  public MaterializableMap(Iterable<Pair<K, V>> iterable) {
-  	this.iterable = iterable;
-  }
-  
-  private Set<Map.Entry<K, V>> toMapEntries(Iterable<Pair<K, V>> xs) {
-    HashMap<K, V> m = new HashMap<K, V>();
-    for (Pair<K, V> x : xs)
-      m.put(x.first(), x.second());
-    return m.entrySet();
-  }
-
-  @Override
-  public Set<Map.Entry<K, V>> entrySet() {
-    if (entrySet == null)
-      entrySet = toMapEntries(iterable);
-    return entrySet;
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/test/FileHelper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/test/FileHelper.java b/src/main/java/org/apache/crunch/test/FileHelper.java
deleted file mode 100644
index 7f5d0c7..0000000
--- a/src/main/java/org/apache/crunch/test/FileHelper.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.test;
-
-import static com.google.common.io.Resources.getResource;
-import static com.google.common.io.Resources.newInputStreamSupplier;
-
-import java.io.File;
-import java.io.IOException;
-
-import com.google.common.io.Files;
-
-public class FileHelper {
-
-  public static String createTempCopyOf(String fileResource) throws IOException {
-	File tmpFile = File.createTempFile("tmp", "");
-	tmpFile.deleteOnExit();
-	Files.copy(newInputStreamSupplier(getResource(fileResource)), tmpFile);
-	return tmpFile.getAbsolutePath();
-  }
-  
-  public static File createOutputPath() throws IOException {
-    File output = File.createTempFile("output", "");
-    output.delete();
-    return output;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/test/InMemoryEmitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/test/InMemoryEmitter.java b/src/main/java/org/apache/crunch/test/InMemoryEmitter.java
deleted file mode 100644
index 46e90ae..0000000
--- a/src/main/java/org/apache/crunch/test/InMemoryEmitter.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.test;
-
-import java.util.List;
-
-import org.apache.crunch.Emitter;
-import com.google.common.collect.Lists;
-
-/**
- * An {@code Emitter} instance that writes emitted records to a backing {@code List}.
- *
- * @param <T>
- */
-public class InMemoryEmitter<T> implements Emitter<T> {
-  
-  private final List<T> output;
-  
-  public InMemoryEmitter() {
-    this(Lists.<T>newArrayList());
-  }
-  
-  public InMemoryEmitter(List<T> output) {
-    this.output = output;
-  }
-  
-  @Override
-  public void emit(T emitted) {
-    output.add(emitted);
-  }
-
-  @Override
-  public void flush() {
-
-  }
-
-  public List<T> getOutput() {
-    return output;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/test/TestCounters.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/test/TestCounters.java b/src/main/java/org/apache/crunch/test/TestCounters.java
deleted file mode 100644
index 8c8823c..0000000
--- a/src/main/java/org/apache/crunch/test/TestCounters.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.test;
-
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Counters;
-
-/**
- * A utility class used during unit testing to update and read counters.
- */
-public class TestCounters {
-
-  private static Counters COUNTERS = new Counters();
-  
-  public static Counter getCounter(Enum<?> e) {
-    return COUNTERS.findCounter(e);
-  }
-  
-  public static Counter getCounter(String group, String name) {
-    return COUNTERS.findCounter(group, name);
-  }
-  
-  public static void clearCounters() {
-	  COUNTERS = new Counters();
-  }   
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/tool/CrunchTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/tool/CrunchTool.java b/src/main/java/org/apache/crunch/tool/CrunchTool.java
deleted file mode 100644
index a788520..0000000
--- a/src/main/java/org/apache/crunch/tool/CrunchTool.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.tool;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.Tool;
-
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.Source;
-import org.apache.crunch.TableSource;
-import org.apache.crunch.Target;
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.io.From;
-import org.apache.crunch.io.To;
-
-/**
- * An extension of the {@code Tool} interface that creates a {@code Pipeline}
- * instance and provides methods for working with the Pipeline from inside of
- * the Tool's run method.
- *
- */
-public abstract class CrunchTool extends Configured implements Tool {
-
-  protected static final From from = new From();
-  protected static final To to = new To();
-  protected static final At at = new At();
-  
-  private Pipeline pipeline;
-
-  public CrunchTool() throws IOException {
-	this(false);
-  }
-  
-  public CrunchTool(boolean inMemory) throws IOException {
-    this.pipeline = inMemory ? MemPipeline.getInstance() : new MRPipeline(getClass());  
-  }
-  
-  @Override
-  public void setConf(Configuration conf) {
-	super.setConf(conf);
-	if (conf != null && pipeline != null) {
-	  pipeline.setConfiguration(conf);
-	}
-  }
-  
-  @Override
-  public Configuration getConf() {
-	return pipeline.getConfiguration();
-  }
-  
-  public void enableDebug() {
-    pipeline.enableDebug();
-  }
-  
-  public <T> PCollection<T> read(Source<T> source) {
-	return pipeline.read(source);
-  }
-  
-  public <K, V> PTable<K, V> read(TableSource<K, V> tableSource) {
-	return pipeline.read(tableSource);
-  }
-  
-  public PCollection<String> readTextFile(String pathName) {
-	return pipeline.readTextFile(pathName);
-  }
-  
-  public void write(PCollection<?> pcollection, Target target) {
-	pipeline.write(pcollection, target);
-  }
-  
-  public void writeTextFile(PCollection<?> pcollection, String pathName) {
-	pipeline.writeTextFile(pcollection, pathName);
-  }
-  
-  public void run() {
-	pipeline.run();
-  }
-  
-  public void done() {
-	pipeline.done();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/Converter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/Converter.java b/src/main/java/org/apache/crunch/types/Converter.java
deleted file mode 100644
index 70705f0..0000000
--- a/src/main/java/org/apache/crunch/types/Converter.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.io.Serializable;
-
-import org.apache.crunch.DoFn;
-
-/**
- * Converts the input key/value from a MapReduce task into the input to a
- * {@link DoFn}, or takes the output of a {@code DoFn} and write it to
- * the output key/values.
- */
-public interface Converter<K, V, S, T> extends Serializable {
-  S convertInput(K key, V value);
-  
-  T convertIterableInput(K key, Iterable<V> value);
-  
-  K outputKey(S value);
-  
-  V outputValue(S value);
-  
-  Class<K> getKeyClass();
-  
-  Class<V> getValueClass();
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/PGroupedTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/PGroupedTableType.java b/src/main/java/org/apache/crunch/types/PGroupedTableType.java
deleted file mode 100644
index e29018a..0000000
--- a/src/main/java/org/apache/crunch/types/PGroupedTableType.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PGroupedTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.SourceTarget;
-
-/**
- * The {@code PType} instance for {@link PGroupedTable} instances. Its settings are
- * derived from the {@code PTableType} that was grouped to create the
- * {@code PGroupedTable} instance.
- *
- */
-public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable<V>>> {
-
-  protected static class PTypeIterable<V> implements Iterable<V> {
-    private final Iterable<Object> iterable;
-    private final MapFn<Object, V> mapFn;
-
-    public PTypeIterable(MapFn<Object, V> mapFn, Iterable<Object> iterable) {
-      this.mapFn = mapFn;
-      this.iterable = iterable;
-    }
-
-    public Iterator<V> iterator() {
-      return new Iterator<V>() {
-        Iterator<Object> iter = iterable.iterator();
-
-        public boolean hasNext() {
-          return iter.hasNext();
-        }
-
-        public V next() {
-          return mapFn.map(iter.next());
-        }
-
-        public void remove() {
-          iter.remove();
-        }
-      };
-    }
-  }
-
-  public static class PairIterableMapFn<K, V> extends
-      MapFn<Pair<Object, Iterable<Object>>, Pair<K, Iterable<V>>> {    
-    private final MapFn<Object, K> keys;
-    private final MapFn<Object, V> values;
-
-    public PairIterableMapFn(MapFn<Object, K> keys, MapFn<Object, V> values) {
-      this.keys = keys;
-      this.values = values;
-    }
-
-    @Override
-    public void initialize() {
-      keys.initialize();
-      values.initialize();
-    }
-
-    @Override
-    public Pair<K, Iterable<V>> map(Pair<Object, Iterable<Object>> input) {
-      return Pair.<K, Iterable<V>>of(keys.map(input.first()),
-          new PTypeIterable(values, input.second()));
-    }
-  }
-
-  protected final PTableType<K, V> tableType;
-  
-  public PGroupedTableType(PTableType<K, V> tableType) {
-    this.tableType = tableType;
-  }
-
-  public PTableType<K, V> getTableType() {
-    return tableType;
-  }
-  
-  @Override
-  public PTypeFamily getFamily() {
-    return tableType.getFamily();
-  }
-
-  @Override
-  public List<PType> getSubTypes() {
-    return tableType.getSubTypes();
-  }
-  
-  @Override
-  public Converter getConverter() {
-    return tableType.getConverter();
-  }
-  
-  public abstract Converter getGroupingConverter();
-  
-  public abstract void configureShuffle(Job job, GroupingOptions options);
-  
-  @Override
-  public SourceTarget<Pair<K, Iterable<V>>> getDefaultFileSource(Path path) {
-    throw new UnsupportedOperationException("Grouped tables cannot be written out directly");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/PTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/PTableType.java b/src/main/java/org/apache/crunch/types/PTableType.java
deleted file mode 100644
index b9e946f..0000000
--- a/src/main/java/org/apache/crunch/types/PTableType.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-
-/**
- * An extension of {@code PType} specifically for {@link PTable} objects. It
- * allows separate access to the {@code PType}s of the key and value for the
- * {@code PTable}.
- *
- */
-public interface PTableType<K, V> extends PType<Pair<K, V>> {
-  /**
-   * Returns the key type for the table.
-   */
-  PType<K> getKeyType();
-
-  /**
-   * Returns the value type for the table.
-   */
-  PType<V> getValueType(); 
-  
-  /**
-   * Returns the grouped table version of this type.
-   */
-  PGroupedTableType<K, V> getGroupedTableType();
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/PType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/PType.java b/src/main/java/org/apache/crunch/types/PType.java
deleted file mode 100644
index 126b165..0000000
--- a/src/main/java/org/apache/crunch/types/PType.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.hadoop.fs.Path;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.SourceTarget;
-
-/**
- * A {@code PType} defines a mapping between a data type that is used in a
- * Crunch pipeline and a serialization and storage format that is used to
- * read/write data from/to HDFS. Every {@link PCollection} has an associated
- * {@code PType} that tells Crunch how to read/write data from that
- * {@code PCollection}.
- * 
- */
-public interface PType<T> extends Serializable {
-  /**
-   * Returns the Java type represented by this {@code PType}.
-   */
-  Class<T> getTypeClass();
-
-  /**
-   * Returns the {@code PTypeFamily} that this {@code PType} belongs to.
-   */
-  PTypeFamily getFamily();
-
-  MapFn<Object, T> getInputMapFn();
-
-  MapFn<T, Object> getOutputMapFn();
-
-  Converter getConverter();
-
-  /**
-   * Returns a copy of a value (or the value itself) that can safely be
-   * retained.
-   * <p>
-   * This is useful when iterable values being processed in a DoFn (via a
-   * reducer) need to be held on to for more than the scope of a single
-   * iteration, as a reducer (and therefore also a DoFn that has an Iterable as
-   * input) re-use deserialized values. More information on object reuse is
-   * available in the {@link DoFn} class documentation.
-   * 
-   * @param value
-   *          The value to be deep-copied
-   * @return A deep copy of the input value
-   */
-  T getDetachedValue(T value);
-
-  /**
-   * Returns a {@code SourceTarget} that is able to read/write data using the
-   * serialization format specified by this {@code PType}.
-   */
-  SourceTarget<T> getDefaultFileSource(Path path);
-
-  /**
-   * Returns the sub-types that make up this PType if it is a composite
-   * instance, such as a tuple.
-   */
-  List<PType> getSubTypes();
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/PTypeFamily.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/PTypeFamily.java b/src/main/java/org/apache/crunch/types/PTypeFamily.java
deleted file mode 100644
index 2ae9493..0000000
--- a/src/main/java/org/apache/crunch/types/PTypeFamily.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-
-/**
- * An abstract factory for creating {@code PType} instances that have the same
- * serialization/storage backing format.
- * 
- */
-public interface PTypeFamily {
-  PType<Void> nulls();
-  
-  PType<String> strings();
-
-  PType<Long> longs();
-
-  PType<Integer> ints();
-
-  PType<Float> floats();
-
-  PType<Double> doubles();
-
-  PType<Boolean> booleans();
-  
-  PType<ByteBuffer> bytes();
-  
-  <T> PType<T> records(Class<T> clazz);
-
-  <T> PType<Collection<T>> collections(PType<T> ptype);
-
-  <T> PType<Map<String, T>> maps(PType<T> ptype);
-  
-  <V1, V2> PType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2);
-
-  <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2,
-      PType<V3> p3);
-
-  <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1,
-      PType<V2> p2, PType<V3> p3, PType<V4> p4);
-
-  PType<TupleN> tuples(PType<?>... ptypes);
-
-  <T extends Tuple> PType<T> tuples(Class<T> clazz, PType<?>... ptypes);
-  
-  <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base);
-  
-  <K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value);
-  
-  /**
-   * Returns the equivalent of the given ptype for this family, if it exists.
-   */
-  <T> PType<T> as(PType<T> ptype);  
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/PTypeUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/PTypeUtils.java b/src/main/java/org/apache/crunch/types/PTypeUtils.java
deleted file mode 100644
index 4dda78d..0000000
--- a/src/main/java/org/apache/crunch/types/PTypeUtils.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-
-/**
- * Utilities for converting between {@code PType}s from different {@code PTypeFamily}
- * implementations.
- *
- */
-public class PTypeUtils {
-
-  public static <T> PType<T> convert(PType<T> ptype, PTypeFamily tf) {
-    if (ptype instanceof PTableType) {
-      PTableType ptt = (PTableType) ptype;
-      return tf.tableOf(tf.as(ptt.getKeyType()), tf.as(ptt.getValueType()));
-    }
-    Class<T> typeClass = ptype.getTypeClass();
-    if (Tuple.class.isAssignableFrom(typeClass)) {
-      List<PType> subTypes = ptype.getSubTypes();
-      if (Pair.class.equals(typeClass)) {
-        return tf.pairs(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)));
-      } else if (Tuple3.class.equals(typeClass)) {
-        return tf.triples(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)), tf.as(subTypes.get(2)));
-      } else if (Tuple4.class.equals(typeClass)) {
-        return tf.quads(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)),
-            tf.as(subTypes.get(2)), tf.as(subTypes.get(3)));
-      } else if (TupleN.class.equals(typeClass)) {
-        PType[] newPTypes = subTypes.toArray(new PType[0]);
-        for (int i = 0; i < newPTypes.length; i++) {
-          newPTypes[i] = tf.as(subTypes.get(i));
-        }
-        return (PType<T>) tf.tuples(newPTypes);
-      }
-    }
-    if (Collection.class.isAssignableFrom(typeClass)) {
-      return tf.collections(tf.as(ptype.getSubTypes().get(0)));
-    }
-    return tf.records(typeClass);
-  }
-  
-  private PTypeUtils() {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/TupleFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/TupleFactory.java b/src/main/java/org/apache/crunch/types/TupleFactory.java
deleted file mode 100644
index 49aae0d..0000000
--- a/src/main/java/org/apache/crunch/types/TupleFactory.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types;
-
-import java.io.Serializable;
-import java.lang.reflect.Constructor;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
-
-public abstract class TupleFactory<T extends Tuple> implements Serializable {
-
-  public void initialize() { }
-  
-  public abstract T makeTuple(Object...values);
-  
-  public static final TupleFactory<Pair> PAIR = new TupleFactory<Pair>() {
-    @Override
-    public Pair makeTuple(Object... values) {
-      return Pair.of(values[0], values[1]);
-    }
-  };
-
-  public static final TupleFactory<Tuple3> TUPLE3 = new TupleFactory<Tuple3>() {
-    @Override
-    public Tuple3 makeTuple(Object... values) {
-      return Tuple3.of(values[0], values[1], values[2]);
-    }
-  };
-  
-  public static final TupleFactory<Tuple4> TUPLE4 = new TupleFactory<Tuple4>() {
-    @Override
-    public Tuple4 makeTuple(Object... values) {
-      return Tuple4.of(values[0], values[1], values[2], values[3]);
-    }
-  };
-
-  public static final TupleFactory<TupleN> TUPLEN = new TupleFactory<TupleN>() {
-    @Override
-    public TupleN makeTuple(Object... values) {
-      return new TupleN(values);
-    }
-  };
-
-  public static <T extends Tuple> TupleFactory<T> create(Class<T> clazz, Class... typeArgs) {
-    return new CustomTupleFactory<T>(clazz, typeArgs);
-  }
-  
-  private static class CustomTupleFactory<T extends Tuple> extends TupleFactory<T> {
-
-    private final Class<T> clazz;
-    private final Class[] typeArgs;
-    
-    private transient Constructor<T> constructor;
-    
-    public CustomTupleFactory(Class<T> clazz, Class[] typeArgs) {
-      this.clazz = clazz;
-      this.typeArgs = typeArgs;
-    }
-    
-    @Override
-    public void initialize() {
-      try {
-        constructor = clazz.getConstructor(typeArgs);
-      } catch (Exception e) {
-        throw new CrunchRuntimeException(e);
-      }
-    }
-    
-    @Override
-    public T makeTuple(Object... values) {
-      try {
-        return constructor.newInstance(values);
-      } catch (Exception e) {
-        throw new CrunchRuntimeException(e);
-      }
-    }
-  }
-}


Mime
View raw message