incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [7/33] CRUNCH-8: Moving the code into multiple Maven modules. Contributed by Matthias Friedrich
Date Wed, 11 Jul 2012 05:14:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java
new file mode 100644
index 0000000..f8a7b3a
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSource.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileSourceImpl;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.AvroUtf8InputFormat;
+
+public class TextFileSource<T> extends FileSourceImpl<T> implements
+	ReadableSource<T> {
+
+  private static boolean isBZip2(Path path) {
+	String strPath = path.toString();
+	return strPath.endsWith(".bz") || strPath.endsWith(".bz2");
+  }
+  
+  private static <S> Class<? extends FileInputFormat<?,?>> getInputFormat(Path path, PType<S> ptype) {
+	if (ptype.getFamily().equals(AvroTypeFamily.getInstance())) {
+	  return AvroUtf8InputFormat.class;
+	} else if (isBZip2(path)){
+      return BZip2TextInputFormat.class;
+	} else {
+  	  return TextInputFormat.class;
+	}
+  }
+  
+  public TextFileSource(Path path, PType<T> ptype) {
+	super(path, ptype, getInputFormat(path, ptype));
+  }
+  
+  @Override
+  public long getSize(Configuration conf) {
+	long sz = super.getSize(conf);
+	if (isBZip2(path)) {
+	  sz *= 10; // Arbitrary compression factor
+	}
+	return sz;
+  }
+  
+  @Override
+  public String toString() {
+    return "Text(" + path + ")";
+  }
+  
+  @Override
+  public Iterable<T> read(Configuration conf) throws IOException {
+    return CompositePathIterable.create(FileSystem.get(path.toUri(), conf), path,
+        new TextFileReaderFactory<T>(ptype, conf));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java
new file mode 100644
index 0000000..5163c6a
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
+import org.apache.crunch.types.PType;
+
+public class TextFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
+  public TextFileSourceTarget(String path, PType<T> ptype) {
+    this(new Path(path), ptype);
+  }
+  
+  public TextFileSourceTarget(Path path, PType<T> ptype) {
+    super(new TextFileSource<T>(path, ptype), new TextFileTarget(path));
+  }
+  
+  @Override
+  public String toString() {
+	return target.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
new file mode 100644
index 0000000..ac3a52b
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+public class TextFileTarget extends FileTargetImpl {
+  public TextFileTarget(String path) {
+    this(new Path(path));
+  }
+  
+  public TextFileTarget(Path path) {
+    super(path, TextOutputFormat.class);
+  }
+  
+  @Override
+  public Path getPath() {
+    return path;
+  }
+
+  @Override
+  public String toString() {
+    return "Text(" + path + ")";
+  }
+
+  @Override
+  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+    if (ptype instanceof PTableType) {
+      return null;
+    }
+    return new TextFileSourceTarget<T>(path, ptype);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
new file mode 100644
index 0000000..dcc655b
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.fn.MapValuesFn;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import com.google.common.collect.Lists;
+
+/**
+ * Methods for performing various types of aggregations over {@link PCollection}
+ * instances.
+ *
+ */
+public class Aggregate {
+
+  /**
+   * Returns a {@code PTable} that contains the unique elements of this
+   * collection mapped to a count of their occurrences.
+   */
+  public static <S> PTable<S, Long> count(PCollection<S> collect) {
+    PTypeFamily tf = collect.getTypeFamily();
+    return collect.parallelDo("Aggregate.count", new MapFn<S, Pair<S, Long>>() {      
+      public Pair<S, Long> map(S input) {
+        return Pair.of(input, 1L);
+      }
+    }, tf.tableOf(collect.getPType(), tf.longs()))
+    .groupByKey()
+    .combineValues(CombineFn.<S> SUM_LONGS());
+  }
+  
+  public static class PairValueComparator<K, V> implements Comparator<Pair<K, V>> {
+    private final boolean ascending;
+    
+    public PairValueComparator(boolean ascending) {
+      this.ascending = ascending;
+    }
+    
+    @Override
+    public int compare(Pair<K, V> left, Pair<K, V> right) {
+      int cmp = ((Comparable<V>) left.second()).compareTo(right.second());
+      return ascending ? cmp : -cmp;
+    }
+  }
+  
+  public static class TopKFn<K, V> extends DoFn<Pair<K, V>, Pair<Integer, Pair<K, V>>> {
+    
+    private final int limit;
+    private final boolean maximize;
+    private transient PriorityQueue<Pair<K, V>> values;
+    
+    public TopKFn(int limit, boolean ascending) {
+      this.limit = limit;
+      this.maximize = ascending;
+    }
+    
+    public void initialize() {
+      this.values = new PriorityQueue<Pair<K, V>>(limit, new PairValueComparator<K, V>(maximize));
+    }
+    
+    public void process(Pair<K, V> input, Emitter<Pair<Integer, Pair<K, V>>> emitter) {
+      values.add(input);
+      if (values.size() > limit) {
+        values.poll();
+      }
+    }
+    
+    public void cleanup(Emitter<Pair<Integer, Pair<K, V>>> emitter) {
+      for (Pair<K, V> p : values) {
+        emitter.emit(Pair.of(0, p));
+      }
+    }
+  }
+  
+  public static class TopKCombineFn<K, V> extends CombineFn<Integer, Pair<K, V>> {
+    
+    private final int limit;
+    private final boolean maximize;
+    
+    public TopKCombineFn(int limit, boolean maximize) {
+      this.limit = limit;
+      this.maximize = maximize;
+    }
+    
+    @Override
+    public void process(Pair<Integer, Iterable<Pair<K, V>>> input,
+        Emitter<Pair<Integer, Pair<K, V>>> emitter) {
+      Comparator<Pair<K, V>> cmp = new PairValueComparator<K, V>(maximize);
+      PriorityQueue<Pair<K, V>> queue = new PriorityQueue<Pair<K, V>>(limit, cmp);
+      for (Pair<K, V> pair : input.second()) {
+        queue.add(pair);
+        if (queue.size() > limit) {
+          queue.poll();
+        }
+      }
+      
+      List<Pair<K, V>> values = Lists.newArrayList(queue);
+      Collections.sort(values, cmp);
+      for (int i = values.size() - 1; i >= 0; i--) {
+        emitter.emit(Pair.of(0, values.get(i)));
+      }
+    }
+  }
+  
+  public static <K, V> PTable<K, V> top(PTable<K, V> ptable, int limit, boolean maximize) {
+    PTypeFamily ptf = ptable.getTypeFamily();
+    PTableType<K, V> base = ptable.getPTableType();
+    PType<Pair<K, V>> pairType = ptf.pairs(base.getKeyType(), base.getValueType());
+    PTableType<Integer, Pair<K, V>> inter = ptf.tableOf(ptf.ints(), pairType);
+    return ptable.parallelDo("top" + limit + "map", new TopKFn<K, V>(limit, maximize), inter)
+        .groupByKey(1)
+        .combineValues(new TopKCombineFn<K, V>(limit, maximize))
+        .parallelDo("top" + limit + "reduce", new DoFn<Pair<Integer, Pair<K, V>>, Pair<K, V>>() {
+          public void process(Pair<Integer, Pair<K, V>> input,
+              Emitter<Pair<K, V>> emitter) {
+            emitter.emit(input.second()); 
+          }
+        }, base);
+  }
+  
+  /**
+   * Returns the largest numerical element from the input collection.
+   */
+  public static <S> PCollection<S> max(PCollection<S> collect) {
+	Class<S> clazz = collect.getPType().getTypeClass();
+	if (!clazz.isPrimitive() && !Comparable.class.isAssignableFrom(clazz)) {
+	  throw new IllegalArgumentException(
+	      "Can only get max for Comparable elements, not for: " + collect.getPType().getTypeClass());
+	}
+    PTypeFamily tf = collect.getTypeFamily();
+    return PTables.values(
+        collect.parallelDo("max", new DoFn<S, Pair<Boolean, S>>() {
+          private transient S max = null;
+          
+          public void process(S input, Emitter<Pair<Boolean, S>> emitter) {
+            if (max == null || ((Comparable<S>) max).compareTo(input) < 0) {
+              max = input;
+            }
+          }
+          
+          public void cleanup(Emitter<Pair<Boolean, S>> emitter) {
+            if (max != null) {
+              emitter.emit(Pair.of(true, max));
+            }
+          }
+        }, tf.tableOf(tf.booleans(), collect.getPType()))
+        .groupByKey(1).combineValues(new CombineFn<Boolean, S>() {
+          public void process(Pair<Boolean, Iterable<S>> input,
+              Emitter<Pair<Boolean, S>> emitter) {
+            S max = null;
+            for (S v : input.second()) {
+              if (max == null || ((Comparable<S>) max).compareTo(v) < 0) {
+                max = v;
+              }
+            }
+            emitter.emit(Pair.of(input.first(), max));
+          } }));
+  }
+  
+  /**
+   * Returns the smallest numerical element from the input collection.
+   */
+  public static <S> PCollection<S> min(PCollection<S> collect) {
+    Class<S> clazz = collect.getPType().getTypeClass();
+    if (!clazz.isPrimitive() && !Comparable.class.isAssignableFrom(clazz)) {
+      throw new IllegalArgumentException(
+          "Can only get min for Comparable elements, not for: " + collect.getPType().getTypeClass());
+    }
+    PTypeFamily tf = collect.getTypeFamily();
+    return PTables.values(
+        collect.parallelDo("min", new DoFn<S, Pair<Boolean, S>>() {
+          private transient S min = null;
+
+          public void process(S input, Emitter<Pair<Boolean, S>> emitter) {
+            if (min == null || ((Comparable<S>) min).compareTo(input) > 0) {
+              min = input;
+            }
+          }
+
+          public void cleanup(Emitter<Pair<Boolean, S>> emitter) {
+            if (min != null) {
+              emitter.emit(Pair.of(false, min));
+            }
+          }
+        }, tf.tableOf(tf.booleans(), collect.getPType()))
+        .groupByKey().combineValues(new CombineFn<Boolean, S>() {
+          public void process(Pair<Boolean, Iterable<S>> input,
+              Emitter<Pair<Boolean, S>> emitter) {
+            S min = null;
+            for (S v : input.second()) {
+              if (min == null || ((Comparable<S>) min).compareTo(v) > 0) {
+                min = v;
+              }
+            }
+            emitter.emit(Pair.of(input.first(), min));
+          } }));
+  }
+  
+  public static <K, V> PTable<K, Collection<V>> collectValues(PTable<K, V> collect) {
+    PTypeFamily tf = collect.getTypeFamily();
+    final PType<V> valueType = collect.getValueType();
+    return collect.groupByKey().parallelDo("collect", new MapValuesFn<K, Iterable<V>, Collection<V>>() {
+          public Collection<V> map(Iterable<V> values) {
+            List<V> collected = Lists.newArrayList();
+            for (V value : values) {
+              collected.add(valueType.getDetachedValue(value));
+            }
+            return collected;
+      }
+    }, tf.tableOf(collect.getKeyType(), tf.collections(collect.getValueType())));  
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/lib/Cartesian.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Cartesian.java b/crunch/src/main/java/org/apache/crunch/lib/Cartesian.java
new file mode 100644
index 0000000..e6ce7fc
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/Cartesian.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Collection;
+import java.util.Random;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PTypeFamily;
+
+/**
+ * Utilities for Cartesian products of two {@code PTable} or {@code PCollection} instances.
+ */
+@SuppressWarnings("serial")
+public class Cartesian {
+
+  /**
+   * Helper for building the artificial cross keys. This technique was taken from Pig's CROSS.
+   */
+  private static class GFCross<V> extends DoFn<V, Pair<Pair<Integer, Integer>, V>>{
+
+    private final int constantField;
+    private final int parallelism;
+    private final Random r;
+
+    public GFCross(int constantField, int parallelism) {
+      this.constantField = constantField;
+      this.parallelism = parallelism;
+      this.r = new Random();
+    }
+
+    public void process(V input, Emitter<Pair<Pair<Integer, Integer>, V>> emitter) {
+      int c = r.nextInt(parallelism);
+      if (constantField == 0) {
+        for (int i = 0; i < parallelism; i++) {
+          emitter.emit(Pair.of(Pair.of(c, i), input));
+        }
+      } else {
+        for (int i = 0; i < parallelism; i++) {
+          emitter.emit(Pair.of(Pair.of(i, c), input));
+        }
+      }
+    }
+  }
+
+  static final int DEFAULT_PARALLELISM = 6;
+
+  /**
+   * Performs a full cross join on the specified {@link PTable}s (using the same strategy as Pig's CROSS operator).
+   *
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Cross_join">Cross Join</a>
+   * @param left A PTable to perform a cross join on.
+   * @param right A PTable to perform a cross join on.
+   * @param <K1> Type of left PTable's keys.
+   * @param <K2> Type of right PTable's keys.
+   * @param <U> Type of the first {@link PTable}'s values
+   * @param <V> Type of the second {@link PTable}'s values
+   * @return The joined result as tuples of ((K1,K2), (U,V)).
+   */
+  public static <K1, K2, U, V> PTable<Pair<K1, K2>, Pair<U, V>> cross(
+      PTable<K1, U> left,
+      PTable<K2, V> right) {
+    return cross(left, right, DEFAULT_PARALLELISM);
+  }
+
+  /**
+   * Performs a full cross join on the specified {@link PTable}s (using the same strategy as Pig's CROSS operator).
+   *
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Cross_join">Cross Join</a>
+   * @param left A PTable to perform a cross join on.
+   * @param right A PTable to perform a cross join on.
+   * @param parallelism The square root of the number of reducers to use.  Increasing parallelism also increases copied data.
+   * @param <K1> Type of left PTable's keys.
+   * @param <K2> Type of right PTable's keys.
+   * @param <U> Type of the first {@link PTable}'s values
+   * @param <V> Type of the second {@link PTable}'s values
+   * @return The joined result as tuples of ((K1,K2), (U,V)).
+   */
+  public static <K1, K2, U, V> PTable<Pair<K1, K2>, Pair<U, V>> cross(
+      PTable<K1, U> left,
+      PTable<K2, V> right,
+      int parallelism) {
+
+    /* The strategy here is to simply emulate the following PigLatin:
+     *   A  = foreach table1 generate flatten(GFCross(0, 2)), flatten(*); 
+     *   B  = foreach table2 generate flatten(GFCross(1, 2)), flatten(*); 
+     *   C = cogroup A by ($0, $1), B by ($0, $1);
+     *   result = foreach C generate flatten(A), flatten(B);
+     */
+
+    PTypeFamily ltf = left.getTypeFamily();
+    PTypeFamily rtf = right.getTypeFamily();
+
+    PTable<Pair<Integer, Integer>, Pair<K1,U>> leftCross =
+        left.parallelDo(
+            new GFCross<Pair<K1,U>>(0, parallelism), 
+            ltf.tableOf(
+                ltf.pairs(ltf.ints(), ltf.ints()), 
+                ltf.pairs(left.getKeyType(), left.getValueType())));
+    PTable<Pair<Integer, Integer>, Pair<K2,V>> rightCross =
+        right.parallelDo(
+            new GFCross<Pair<K2,V>>(1, parallelism), 
+            rtf.tableOf(
+                rtf.pairs(rtf.ints(), rtf.ints()), 
+                rtf.pairs(right.getKeyType(), right.getValueType())));
+
+    PTable<Pair<Integer, Integer>, Pair<Collection<Pair<K1, U>>, Collection<Pair<K2, V>>>> cg =
+        leftCross.cogroup(rightCross);
+
+    PTypeFamily ctf = cg.getTypeFamily();
+
+    return cg.parallelDo(
+        new DoFn<Pair<Pair<Integer, Integer>, Pair<Collection<Pair<K1, U>>, Collection<Pair<K2, V>>>>, Pair<Pair<K1, K2>, Pair<U, V>>>() {
+          @Override
+          public void process(
+              Pair<Pair<Integer, Integer>, Pair<Collection<Pair<K1, U>>, Collection<Pair<K2, V>>>> input,
+              Emitter<Pair<Pair<K1, K2>, Pair<U, V>>> emitter) {
+            for (Pair<K1, U> l: input.second().first()) {
+              for (Pair<K2, V> r: input.second().second()) {
+                emitter.emit(Pair.of(Pair.of(l.first(), r.first()), Pair.of(l.second(), r.second())));
+              }
+            }
+          }
+        },
+        ctf.tableOf(
+            ctf.pairs(left.getKeyType(), right.getKeyType()), 
+            ctf.pairs(left.getValueType(), right.getValueType()))
+        );
+  }
+
+  /**
+   * Performs a full cross join on the specified {@link PCollection}s (using the same strategy as Pig's CROSS operator).
+   *
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Cross_join">Cross Join</a>
+   * @param left A PCollection to perform a cross join on.
+   * @param right A PCollection to perform a cross join on.
+   * @param <U> Type of the first {@link PCollection}'s values
+   * @param <V> Type of the second {@link PCollection}'s values
+   * @return The joined result as tuples of (U,V).
+   */
+  public static <U, V> PCollection<Pair<U, V>> cross(
+      PCollection<U> left,
+      PCollection<V> right) {
+    return cross(left, right, DEFAULT_PARALLELISM);
+  }
+
+  /**
+   * Performs a full cross join on the specified {@link PCollection}s (using the same strategy as Pig's CROSS operator).
+   *
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Cross_join">Cross Join</a>
+   * @param left A PCollection to perform a cross join on.
+   * @param right A PCollection to perform a cross join on.
+   * @param <U> Type of the first {@link PCollection}'s values
+   * @param <V> Type of the second {@link PCollection}'s values
+   * @return The joined result as tuples of (U,V).
+   */
+  public static <U, V> PCollection<Pair<U, V>> cross(
+      PCollection<U> left,
+      PCollection<V> right,
+      int parallelism) {
+
+    PTypeFamily ltf = left.getTypeFamily();
+    PTypeFamily rtf = right.getTypeFamily();
+
+    PTableType<Pair<Integer, Integer>, U> ptt = ltf.tableOf(
+        ltf.pairs(ltf.ints(), ltf.ints()), 
+        left.getPType());
+
+    if (ptt == null)
+      throw new Error();
+
+    PTable<Pair<Integer, Integer>, U> leftCross =
+        left.parallelDo(
+            new GFCross<U>(0, parallelism), 
+            ltf.tableOf(
+                ltf.pairs(ltf.ints(), ltf.ints()), 
+                left.getPType()));
+    PTable<Pair<Integer, Integer>, V> rightCross =
+        right.parallelDo(
+            new GFCross<V>(1, parallelism), 
+            rtf.tableOf(
+                rtf.pairs(rtf.ints(), rtf.ints()), 
+                right.getPType()));
+
+    PTable<Pair<Integer, Integer>, Pair<Collection<U>, Collection<V>>> cg =
+        leftCross.cogroup(rightCross);
+
+    PTypeFamily ctf = cg.getTypeFamily();
+
+    return cg.parallelDo(
+        new DoFn<Pair<Pair<Integer, Integer>, Pair<Collection<U>, Collection<V>>>, Pair<U,V>>() {
+          @Override
+          public void process(
+              Pair<Pair<Integer, Integer>, Pair<Collection<U>, Collection<V>>> input,
+              Emitter<Pair<U,V>> emitter) {
+            for (U l: input.second().first()) {
+              for (V r: input.second().second()) {
+                emitter.emit(Pair.of(l, r));
+              }
+            }
+          }
+        }, ctf.pairs(left.getPType(), right.getPType()));
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/lib/Cogroup.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Cogroup.java b/crunch/src/main/java/org/apache/crunch/lib/Cogroup.java
new file mode 100644
index 0000000..6f759bb
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/Cogroup.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Collection;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.fn.MapValuesFn;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import com.google.common.collect.Lists;
+
+public class Cogroup {
+  
+  /**
+   * Co-groups the two {@link PTable} arguments.
+   * 
+   * @return a {@code PTable} representing the co-grouped tables.
+   */
+  public static <K, U, V> PTable<K, Pair<Collection<U>, Collection<V>>> cogroup(
+      PTable<K, U> left, PTable<K, V> right) {
+    PTypeFamily ptf = left.getTypeFamily();
+    PType<K> keyType = left.getPTableType().getKeyType();
+    PType<U> leftType = left.getPTableType().getValueType();
+    PType<V> rightType = right.getPTableType().getValueType();
+    PType<Pair<U, V>> itype = ptf.pairs(leftType, rightType);
+
+    PTable<K, Pair<U, V>> cgLeft = left.parallelDo("coGroupTag1",
+        new CogroupFn1<K, U, V>(), ptf.tableOf(keyType, itype));
+    PTable<K, Pair<U, V>> cgRight = right.parallelDo("coGroupTag2",
+        new CogroupFn2<K, U, V>(), ptf.tableOf(keyType, itype));
+    
+    PTable<K, Pair<U, V>> both = cgLeft.union(cgRight);
+
+    PType<Pair<Collection<U>, Collection<V>>> otype = ptf.pairs(
+        ptf.collections(leftType), ptf.collections(rightType));
+    return both.groupByKey().parallelDo("cogroup",
+        new PostGroupFn<K, U, V>(), ptf.tableOf(keyType, otype));
+  }
+
+  private static class CogroupFn1<K, V, U> extends MapValuesFn<K, V, Pair<V, U>> {
+    @Override
+    public Pair<V, U> map(V v) {
+      return Pair.of(v, null);
+    }
+  }
+
+  private static class CogroupFn2<K, V, U> extends MapValuesFn<K, U, Pair<V, U>> {
+    @Override
+    public Pair<V, U> map(U u) {
+      return Pair.of(null, u);
+    }
+  }
+
+  private static class PostGroupFn<K, V, U> extends
+      DoFn<Pair<K, Iterable<Pair<V, U>>>, Pair<K, Pair<Collection<V>, Collection<U>>>> {
+    @Override
+    public void process(Pair<K, Iterable<Pair<V, U>>> input,
+        Emitter<Pair<K, Pair<Collection<V>, Collection<U>>>> emitter) {
+      Collection<V> cv = Lists.newArrayList();
+      Collection<U> cu = Lists.newArrayList();
+      for (Pair<V, U> pair : input.second()) {
+        if (pair.first() != null) {
+          cv.add(pair.first());
+        } else if (pair.second() != null) {
+          cu.add(pair.second());
+        }
+      }
+      emitter.emit(Pair.of(input.first(), Pair.of(cv, cu)));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/lib/Join.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Join.java b/crunch/src/main/java/org/apache/crunch/lib/Join.java
new file mode 100644
index 0000000..9840c33
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/Join.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.lib.join.FullOuterJoinFn;
+import org.apache.crunch.lib.join.InnerJoinFn;
+import org.apache.crunch.lib.join.JoinFn;
+import org.apache.crunch.lib.join.JoinUtils;
+import org.apache.crunch.lib.join.LeftOuterJoinFn;
+import org.apache.crunch.lib.join.RightOuterJoinFn;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PTypeFamily;
+
+/**
+ * Utilities for joining multiple {@code PTable} instances based on a common lastKey.
+ */
+public class Join {
+  /**
+   * Performs an inner join on the specified {@link PTable}s.
+   *
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Inner_join">Inner Join</a>
+   * @param left A PTable to perform an inner join on.
+   * @param right A PTable to perform an inner join on.
+   * @param <K> Type of the keys.
+   * @param <U> Type of the first {@link PTable}'s values
+   * @param <V> Type of the second {@link PTable}'s values
+   * @return The joined result.
+   */
+  public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right) {
+    return innerJoin(left, right);
+  }
+
+  /**
+   * Performs an inner join on the specified {@link PTable}s.
+   *
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Inner_join">Inner Join</a>
+   * @param left A PTable to perform an inner join on.
+   * @param right A PTable to perform an inner join on.
+   * @param <K> Type of the keys.
+   * @param <U> Type of the first {@link PTable}'s values
+   * @param <V> Type of the second {@link PTable}'s values
+   * @return The joined result.
+   */
+  public static <K, U, V> PTable<K, Pair<U, V>> innerJoin(PTable<K, U> left, PTable<K, V> right) {
+    return join(left, right, new InnerJoinFn<K, U, V>(left.getValueType()));
+  }
+
+  /**
+   * Performs a left outer join on the specified {@link PTable}s.
+   *
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Left_outer_join">Left Join</a>
+   * @param left A PTable to perform an left join on. All of this PTable's entries will appear
+   *     in the resulting PTable.
+   * @param right A PTable to perform an left join on.
+   * @param <K> Type of the keys.
+   * @param <U> Type of the first {@link PTable}'s values
+   * @param <V> Type of the second {@link PTable}'s values
+   * @return The joined result.
+   */
+  public static <K, U, V> PTable<K, Pair<U, V>> leftJoin(PTable<K, U> left, PTable<K, V> right) {
+    return join(left, right, new LeftOuterJoinFn<K, U, V>(left.getValueType()));
+  }
+
+  /**
+   * Performs a right outer join on the specified {@link PTable}s.
+   *
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Right_outer_join">Right Join</a>
+   * @param left A PTable to perform an right join on.
+   * @param right A PTable to perform an right join on. All of this PTable's entries will appear
+   *     in the resulting PTable.
+   * @param <K> Type of the keys.
+   * @param <U> Type of the first {@link PTable}'s values
+   * @param <V> Type of the second {@link PTable}'s values
+   * @return The joined result.
+   */
+  public static <K, U, V> PTable<K, Pair<U, V>> rightJoin(PTable<K, U> left, PTable<K, V> right) {
+    return join(left, right, new RightOuterJoinFn<K, U, V>(left.getValueType()));
+  }
+
+  /**
+   * Performs a full outer join on the specified {@link PTable}s.
+   *
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Full_outer_join">Full Join</a>
+   * @param left A PTable to perform an full join on.
+   * @param right A PTable to perform an full join on.
+   * @param <K> Type of the keys.
+   * @param <U> Type of the first {@link PTable}'s values
+   * @param <V> Type of the second {@link PTable}'s values
+   * @return The joined result.
+   */
+  public static <K, U, V> PTable<K, Pair<U, V>> fullJoin(PTable<K, U> left, PTable<K, V> right) {
+    return join(left, right, new FullOuterJoinFn<K, U, V>(left.getValueType()));
+  }
+
+  public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right,
+      JoinFn<K, U, V> joinFn) {
+    PTypeFamily ptf = left.getTypeFamily();
+    PGroupedTable<Pair<K, Integer>, Pair<U, V>> grouped = preJoin(left, right);
+    PTableType<K, Pair<U, V>> ret = ptf.tableOf(left.getKeyType(),
+        ptf.pairs(left.getValueType(), right.getValueType()));
+
+    return grouped.parallelDo(joinFn.getJoinType() + grouped.getName(), joinFn, ret);
+  }
+
+  private static <K, U, V> PGroupedTable<Pair<K, Integer>, Pair<U, V>> preJoin(
+      PTable<K, U> left, PTable<K, V> right) {
+    PTypeFamily ptf = left.getTypeFamily();
+    PTableType<Pair<K, Integer>, Pair<U, V>> ptt = ptf.tableOf(ptf.pairs(left.getKeyType(), ptf.ints()),
+        ptf.pairs(left.getValueType(), right.getValueType()));
+
+    PTable<Pair<K, Integer>, Pair<U, V>> tag1 = left.parallelDo("joinTagLeft",
+        new MapFn<Pair<K, U>, Pair<Pair<K, Integer>, Pair<U, V>>>() {
+      @Override
+      public Pair<Pair<K, Integer>, Pair<U, V>> map(Pair<K, U> input) {
+        return Pair.of(Pair.of(input.first(), 0), Pair.of(input.second(), (V) null));
+      }
+    }, ptt);
+    PTable<Pair<K, Integer>, Pair<U, V>> tag2 = right.parallelDo("joinTagRight",
+        new MapFn<Pair<K, V>, Pair<Pair<K, Integer>, Pair<U, V>>>() {
+      @Override
+      public Pair<Pair<K, Integer>, Pair<U, V>> map(Pair<K, V> input) {
+        return Pair.of(Pair.of(input.first(), 1), Pair.of((U) null, input.second()));
+      }
+    }, ptt);
+    
+    GroupingOptions.Builder optionsBuilder = GroupingOptions.builder();
+    optionsBuilder.partitionerClass(JoinUtils.getPartitionerClass(ptf));
+    
+    return (tag1.union(tag2)).groupByKey(optionsBuilder.build());	
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/lib/PTables.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/PTables.java b/crunch/src/main/java/org/apache/crunch/lib/PTables.java
new file mode 100644
index 0000000..911cb36
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/PTables.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.List;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import com.google.common.collect.Lists;
+
+/**
+ * Methods for performing common operations on PTables.
+ * 
+ */
+public class PTables {
+
+  public static <K, V> PCollection<K> keys(PTable<K, V> ptable) {
+    return ptable.parallelDo("PTables.keys", new DoFn<Pair<K, V>, K>() {
+      @Override
+      public void process(Pair<K, V> input, Emitter<K> emitter) {
+        emitter.emit(input.first());
+      }
+    }, ptable.getKeyType());
+  }
+
+  public static <K, V> PCollection<V> values(PTable<K, V> ptable) {
+    return ptable.parallelDo("PTables.values", new DoFn<Pair<K, V>, V>() {
+      @Override
+      public void process(Pair<K, V> input, Emitter<V> emitter) {
+        emitter.emit(input.second());
+      }
+    }, ptable.getValueType());
+  }
+
+  /**
+   * Create a detached value for a table {@link Pair}.
+   * 
+   * @param tableType
+   *          The table type
+   * @param value
+   *          The value from which a detached value is to be created
+   * @return The detached value
+   * @see PType#getDetachedValue(Object)
+   */
+  public static <K, V> Pair<K, V> getDetachedValue(PTableType<K, V> tableType, Pair<K, V> value) {
+    return Pair.of(tableType.getKeyType().getDetachedValue(value.first()), tableType.getValueType()
+        .getDetachedValue(value.second()));
+  }
+
+  /**
+   * Created a detached value for a {@link PGroupedTable} value.
+   * 
+   * 
+   * @param groupedTableType
+   *          The grouped table type
+   * @param value
+   *          The value from which a detached value is to be created
+   * @return The detached value
+   * @see PType#getDetachedValue(Object)
+   */
+  public static <K, V> Pair<K, Iterable<V>> getGroupedDetachedValue(
+      PGroupedTableType<K, V> groupedTableType, Pair<K, Iterable<V>> value) {
+
+    PTableType<K, V> tableType = groupedTableType.getTableType();
+    List<V> detachedIterable = Lists.newArrayList();
+    PType<V> valueType = tableType.getValueType();
+    for (V v : value.second()) {
+      detachedIterable.add(valueType.getDetachedValue(v));
+    }
+    return Pair.of(tableType.getKeyType().getDetachedValue(value.first()),
+        (Iterable<V>) detachedIterable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/lib/Sample.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Sample.java b/crunch/src/main/java/org/apache/crunch/lib/Sample.java
new file mode 100644
index 0000000..96d9694
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/Sample.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Random;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import com.google.common.base.Preconditions;
+
+public class Sample {
+
+  public static class SamplerFn<S> extends DoFn<S, S> {
+
+    private final long seed;
+    private final double acceptanceProbability;
+    private transient Random r;
+
+    public SamplerFn(long seed, double acceptanceProbability) {
+      Preconditions.checkArgument(0.0 < acceptanceProbability && acceptanceProbability < 1.0);
+      this.seed = seed;
+      this.acceptanceProbability = acceptanceProbability;
+    }
+
+    @Override
+    public void initialize() {
+      r = new Random(seed);
+    }
+
+    @Override
+    public void process(S input, Emitter<S> emitter) {
+      if (r.nextDouble() < acceptanceProbability) {
+        emitter.emit(input);
+      }
+    }
+  }
+  
+  public static <S> PCollection<S> sample(PCollection<S> input, double probability) {
+	return sample(input, System.currentTimeMillis(), probability);
+  }
+  
+  public static <S> PCollection<S> sample(PCollection<S> input, long seed, double probability) {
+    String stageName = String.format("sample(%.2f)", probability);
+	return input.parallelDo(stageName, new SamplerFn<S>(seed, probability), input.getPType());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/lib/Set.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Set.java b/crunch/src/main/java/org/apache/crunch/lib/Set.java
new file mode 100644
index 0000000..f915d53
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/Set.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Collection;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+/**
+ * Utilities for performing set operations (difference, intersection, etc) on
+ * {@code PCollection} instances.
+ */
+public class Set {
+
+  /**
+   * Compute the set difference between two sets of elements.
+   * 
+   * @return a collection containing elements that are in <code>coll1</code>
+   * but not in <code>coll2</code>
+   */
+  public static <T> PCollection<T> difference(PCollection<T> coll1,
+      PCollection<T> coll2) {
+    return Cogroup.cogroup(toTable(coll1), toTable(coll2))
+        .parallelDo(new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>, T>() {
+          @Override
+          public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input,
+              Emitter<T> emitter) {
+            Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
+            if (!groups.first().isEmpty() && groups.second().isEmpty()) {
+              emitter.emit(input.first());
+            }
+          }
+        }, coll1.getPType());
+  }
+  
+  /**
+   * Compute the intersection of two sets of elements.
+   * 
+   * @return a collection containing elements that common to both sets
+   * <code>coll1</code> and <code>coll2</code>
+   */
+  public static <T> PCollection<T> intersection(PCollection<T> coll1,
+      PCollection<T> coll2) {
+    return Cogroup.cogroup(toTable(coll1), toTable(coll2))
+        .parallelDo(new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>, T>() {
+          @Override
+          public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input,
+              Emitter<T> emitter) {
+            Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
+            if (!groups.first().isEmpty() && !groups.second().isEmpty()) {
+              emitter.emit(input.first());
+            }
+          }
+        }, coll1.getPType());
+  }
+  
+  /**
+   * Find the elements that are common to two sets, like the Unix <code>comm</code>
+   * utility. This method returns a {@link PCollection} of {@link Tuple3} objects,
+   * and the position in the tuple that an element appears is determined by
+   * the collections that it is a member of, as follows:
+   * <ol>
+   * <li>elements only in <code>coll1</code>,</li>
+   * <li>elements only in <code>coll2</code>, or</li>
+   * <li>elements in both collections</li>
+   * </ol>
+   * Tuples are otherwise filled with <code>null</code>.
+   * 
+   * @return a collection of {@link Tuple3} objects
+   */
+  public static <T> PCollection<Tuple3<T, T, T>> comm(PCollection<T> coll1,
+      PCollection<T> coll2) {
+    PTypeFamily typeFamily = coll1.getTypeFamily();
+    PType<T> type = coll1.getPType();
+    return Cogroup.cogroup(toTable(coll1), toTable(coll2))
+        .parallelDo(new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>,
+            Tuple3<T, T, T>>() {
+          @Override
+          public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input,
+              Emitter<Tuple3<T, T, T>> emitter) {
+            Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
+            boolean inFirst = !groups.first().isEmpty();
+            boolean inSecond = !groups.second().isEmpty();
+            T t = input.first();
+            emitter.emit(Tuple3.of(
+                inFirst && !inSecond ? t : null,
+                    !inFirst && inSecond ? t : null,
+                        inFirst && inSecond ? t : null));
+          }
+        }, typeFamily.triples(type, type, type));
+  }
+  
+  private static <T> PTable<T, Boolean> toTable(PCollection<T> coll) {
+    PTypeFamily typeFamily = coll.getTypeFamily();
+    return coll.parallelDo(new DoFn<T, Pair<T, Boolean>>() {
+      @Override
+      public void process(T input, Emitter<Pair<T, Boolean>> emitter) {
+        emitter.emit(Pair.of(input, Boolean.TRUE));
+      }
+    }, typeFamily.tableOf(coll.getPType(), typeFamily.booleans()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/lib/Sort.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Sort.java b/crunch/src/main/java/org/apache/crunch/lib/Sort.java
new file mode 100644
index 0000000..5a3f8e9
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/Sort.java
@@ -0,0 +1,547 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.GroupingOptions.Builder;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.TupleWritable;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ * Utilities for sorting {@code PCollection} instances.
+ */
+public class Sort {
+  
+  public enum Order {
+    ASCENDING, DESCENDING, IGNORE
+  }
+  
+  /**
+   * To sort by column 2 ascending then column 1 descending, you would use:
+   * <code>
+   * sortPairs(coll, by(2, ASCENDING), by(1, DESCENDING))
+   * </code>
+   * Column numbering is 1-based.
+   */
+  public static class ColumnOrder {
+    int column;
+    Order order;
+    public ColumnOrder(int column, Order order) {
+      this.column = column;
+      this.order = order;
+    }
+    public static ColumnOrder by(int column, Order order) {
+      return new ColumnOrder(column, order);
+    }
+    
+    @Override
+    public String toString() {
+      return"ColumnOrder: column:" + column + ", Order: " + order;
+    }
+  }
+  
+  /**
+   * Sorts the {@link PCollection} using the natural ordering of its elements.
+   * 
+   * @return a {@link PCollection} representing the sorted collection.
+   */
+  public static <T> PCollection<T> sort(PCollection<T> collection) {
+    return sort(collection, Order.ASCENDING);
+  }
+  
+  /**
+   * Sorts the {@link PCollection} using the natural ordering of its elements
+   * in the order specified.
+   * 
+   * @return a {@link PCollection} representing the sorted collection.
+   */
+  public static <T> PCollection<T> sort(PCollection<T> collection, Order order) {
+    PTypeFamily tf = collection.getTypeFamily();
+    PTableType<T, Void> type = tf.tableOf(collection.getPType(), tf.nulls());
+    Configuration conf = collection.getPipeline().getConfiguration();
+    GroupingOptions options = buildGroupingOptions(conf, tf,
+        collection.getPType(), order);
+    PTable<T, Void> pt =
+      collection.parallelDo("sort-pre", new DoFn<T, Pair<T, Void>>() {
+        @Override
+        public void process(T input,
+            Emitter<Pair<T, Void>> emitter) {
+          emitter.emit(Pair.of(input, (Void) null));
+        }
+      }, type);
+    PTable<T, Void> sortedPt = pt.groupByKey(options).ungroup();
+    return sortedPt.parallelDo("sort-post", new DoFn<Pair<T, Void>, T>() {
+      @Override
+      public void process(Pair<T, Void> input, Emitter<T> emitter) {
+        emitter.emit(input.first());
+      }
+    }, collection.getPType());
+  }
+  
+
+  /**
+   * Sorts the {@link PTable} using the natural ordering of its keys.
+   * 
+   * @return a {@link PTable} representing the sorted table.
+   */
+  public static <K, V> PTable<K, V> sort(PTable<K, V> table) {
+    return sort(table, Order.ASCENDING);
+  }
+
+  /**
+   * Sorts the {@link PTable} using the natural ordering of its keys
+   * in the order specified.
+   * 
+   * @return a {@link PTable} representing the sorted collection.
+   */
+  public static <K, V> PTable<K, V> sort(PTable<K, V> table, Order key) {
+    PTypeFamily tf = table.getTypeFamily();
+    Configuration conf = table.getPipeline().getConfiguration();
+    GroupingOptions options = buildGroupingOptions(conf, tf, table.getKeyType(), key);
+    return table.groupByKey(options).ungroup();
+  }
+  
+  /**
+   * Sorts the {@link PCollection} of {@link Pair}s using the specified column
+   * ordering.
+   * 
+   * @return a {@link PCollection} representing the sorted collection.
+   */
+  public static <U, V> PCollection<Pair<U, V>> sortPairs(
+      PCollection<Pair<U, V>> collection, ColumnOrder... columnOrders) {
+    // put U and V into a pair/tuple in the key so we can do grouping and sorting
+    PTypeFamily tf = collection.getTypeFamily();
+    PType<Pair<U, V>> pType = collection.getPType();
+    @SuppressWarnings("unchecked")
+    PTableType<Pair<U, V>, Void> type = tf.tableOf(
+        tf.pairs(pType.getSubTypes().get(0), pType.getSubTypes().get(1)),
+        tf.nulls());
+    PTable<Pair<U, V>, Void> pt =
+      collection.parallelDo(new DoFn<Pair<U, V>, Pair<Pair<U, V>, Void>>() {
+        @Override
+        public void process(Pair<U, V> input,
+            Emitter<Pair<Pair<U, V>, Void>> emitter) {
+          emitter.emit(Pair.of(input, (Void) null));
+        }
+      }, type);
+    Configuration conf = collection.getPipeline().getConfiguration();
+    GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
+    PTable<Pair<U, V>, Void> sortedPt = pt.groupByKey(options).ungroup();
+    return sortedPt.parallelDo(new DoFn<Pair<Pair<U, V>,Void>, Pair<U, V>>() {
+      @Override
+      public void process(Pair<Pair<U, V>, Void> input,
+          Emitter<Pair<U, V>> emitter) {
+        emitter.emit(input.first());
+      }
+    }, collection.getPType());
+  }
+
+  /**
+   * Sorts the {@link PCollection} of {@link Tuple3}s using the specified column
+   * ordering.
+   * 
+   * @return a {@link PCollection} representing the sorted collection.
+   */
+  public static <V1, V2, V3> PCollection<Tuple3<V1, V2, V3>> sortTriples(
+      PCollection<Tuple3<V1, V2, V3>> collection, ColumnOrder... columnOrders) {
+    PTypeFamily tf = collection.getTypeFamily();
+    PType<Tuple3<V1, V2, V3>> pType = collection.getPType();
+    @SuppressWarnings("unchecked")
+    PTableType<Tuple3<V1, V2, V3>, Void> type = tf.tableOf(
+        tf.triples(pType.getSubTypes().get(0), pType.getSubTypes().get(1), pType.getSubTypes().get(2)),
+        tf.nulls());
+    PTable<Tuple3<V1, V2, V3>, Void> pt =
+      collection.parallelDo(new DoFn<Tuple3<V1, V2, V3>, Pair<Tuple3<V1, V2, V3>, Void>>() {
+        @Override
+        public void process(Tuple3<V1, V2, V3> input,
+            Emitter<Pair<Tuple3<V1, V2, V3>, Void>> emitter) {
+          emitter.emit(Pair.of(input, (Void) null));
+        }
+      }, type);
+    Configuration conf = collection.getPipeline().getConfiguration();
+    GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
+    PTable<Tuple3<V1, V2, V3>, Void> sortedPt = pt.groupByKey(options).ungroup();
+    return sortedPt.parallelDo(new DoFn<Pair<Tuple3<V1, V2, V3>,Void>, Tuple3<V1, V2, V3>>() {
+      @Override
+      public void process(Pair<Tuple3<V1, V2, V3>, Void> input,
+          Emitter<Tuple3<V1, V2, V3>> emitter) {
+        emitter.emit(input.first());
+      }
+    }, collection.getPType());
+  }
+
+  /**
+   * Sorts the {@link PCollection} of {@link Tuple4}s using the specified column
+   * ordering.
+   * 
+   * @return a {@link PCollection} representing the sorted collection.
+   */
+  public static <V1, V2, V3, V4> PCollection<Tuple4<V1, V2, V3, V4>> sortQuads(
+      PCollection<Tuple4<V1, V2, V3, V4>> collection, ColumnOrder... columnOrders) {
+    PTypeFamily tf = collection.getTypeFamily();
+    PType<Tuple4<V1, V2, V3, V4>> pType = collection.getPType();
+    @SuppressWarnings("unchecked")
+    PTableType<Tuple4<V1, V2, V3, V4>, Void> type = tf.tableOf(
+        tf.quads(pType.getSubTypes().get(0), pType.getSubTypes().get(1), pType.getSubTypes().get(2),  pType.getSubTypes().get(3)),
+        tf.nulls());
+    PTable<Tuple4<V1, V2, V3, V4>, Void> pt =
+      collection.parallelDo(new DoFn<Tuple4<V1, V2, V3, V4>, Pair<Tuple4<V1, V2, V3, V4>, Void>>() {
+        @Override
+        public void process(Tuple4<V1, V2, V3, V4> input,
+            Emitter<Pair<Tuple4<V1, V2, V3, V4>, Void>> emitter) {
+          emitter.emit(Pair.of(input, (Void) null));
+        }
+      }, type);
+    Configuration conf = collection.getPipeline().getConfiguration();
+    GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
+    PTable<Tuple4<V1, V2, V3, V4>, Void> sortedPt = pt.groupByKey(options).ungroup();
+    return sortedPt.parallelDo(new DoFn<Pair<Tuple4<V1, V2, V3, V4>,Void>, Tuple4<V1, V2, V3, V4>>() {
+      @Override
+      public void process(Pair<Tuple4<V1, V2, V3, V4>, Void> input,
+          Emitter<Tuple4<V1, V2, V3, V4>> emitter) {
+        emitter.emit(input.first());
+      }
+    }, collection.getPType());
+  }
+
+  /**
+   * Sorts the {@link PCollection} of {@link TupleN}s using the specified column
+   * ordering.
+   * 
+   * @return a {@link PCollection} representing the sorted collection.
+   */
+  public static PCollection<TupleN> sortTuples(PCollection<TupleN> collection,
+      ColumnOrder... columnOrders) {
+    PTypeFamily tf = collection.getTypeFamily();
+    PType<TupleN> pType = collection.getPType();
+    PTableType<TupleN, Void> type = tf.tableOf(
+        tf.tuples(pType.getSubTypes().toArray(new PType[0])),
+        tf.nulls());
+    PTable<TupleN, Void> pt =
+      collection.parallelDo(new DoFn<TupleN, Pair<TupleN, Void>>() {
+        @Override
+        public void process(TupleN input,
+            Emitter<Pair<TupleN, Void>> emitter) {
+          emitter.emit(Pair.of(input, (Void) null));
+        }
+      }, type);
+    Configuration conf = collection.getPipeline().getConfiguration();
+    GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
+    PTable<TupleN, Void> sortedPt = pt.groupByKey(options).ungroup();
+    return sortedPt.parallelDo(new DoFn<Pair<TupleN,Void>, TupleN>() {
+      @Override
+      public void process(Pair<TupleN, Void> input,
+          Emitter<TupleN> emitter) {
+        emitter.emit(input.first());
+      }
+    }, collection.getPType());
+  }
+  
+  // TODO: move to type family?
+  private static <T> GroupingOptions buildGroupingOptions(Configuration conf,
+      PTypeFamily tf, PType<T> ptype, Order order) {
+    Builder builder = GroupingOptions.builder();
+    if (order == Order.DESCENDING) {
+      if (tf == WritableTypeFamily.getInstance()) {
+        builder.sortComparatorClass(ReverseWritableComparator.class);
+      } else if (tf == AvroTypeFamily.getInstance()) {
+        AvroType<T> avroType = (AvroType<T>) ptype;
+        Schema schema = avroType.getSchema();
+        conf.set("crunch.schema", schema.toString());
+        builder.sortComparatorClass(ReverseAvroComparator.class);
+      } else {
+        throw new RuntimeException("Unrecognized type family: " + tf);
+      }
+    }
+    return builder.build();
+  }
+  
+  private static <T> GroupingOptions buildGroupingOptions(Configuration conf,
+      PTypeFamily tf, PType<T> ptype, ColumnOrder[] columnOrders) {
+    Builder builder = GroupingOptions.builder();
+    if (tf == WritableTypeFamily.getInstance()) {
+      TupleWritableComparator.configureOrdering(conf, columnOrders);
+      builder.sortComparatorClass(TupleWritableComparator.class);
+    } else if (tf == AvroTypeFamily.getInstance()) {
+      TupleAvroComparator.configureOrdering(conf, columnOrders, ptype);
+      builder.sortComparatorClass(TupleAvroComparator.class);
+    } else {
+      throw new RuntimeException("Unrecognized type family: " + tf);
+    }
+    return builder.build();
+  }
+  
+  static class ReverseWritableComparator<T> extends Configured implements RawComparator<T> {
+    
+    RawComparator<T> comparator;
+    
+    @SuppressWarnings("unchecked")
+    @Override
+    public void setConf(Configuration conf) {
+      super.setConf(conf);
+      if (conf != null) {
+        JobConf jobConf = new JobConf(conf);
+        comparator = WritableComparator.get(
+            jobConf.getMapOutputKeyClass().asSubclass(WritableComparable.class));
+      }
+    }
+
+    @Override
+    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4,
+        int arg5) {
+      return -comparator.compare(arg0, arg1, arg2, arg3, arg4, arg5);
+    }
+
+    @Override
+    public int compare(T o1, T o2) {
+      return -comparator.compare(o1, o2);
+    }
+
+  }
+  
+  static class ReverseAvroComparator<T> extends Configured implements RawComparator<T> {
+
+    Schema schema;
+    
+    @Override
+    public void setConf(Configuration conf) {
+      super.setConf(conf);
+      if (conf != null) {
+        schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
+      }
+    }
+
+    @Override
+    public int compare(T o1, T o2) {
+      return -ReflectData.get().compare(o1, o2, schema);
+    }
+
+    @Override
+    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4,
+        int arg5) {
+      return -BinaryData.compare(arg0, arg1, arg2, arg3, arg4, arg5, schema);
+    }
+    
+  }
+  
+  static class TupleWritableComparator extends WritableComparator implements Configurable {
+    
+    private static final String CRUNCH_ORDERING_PROPERTY = "crunch.ordering";
+    
+    Configuration conf;
+    ColumnOrder[] columnOrders;
+    
+    public TupleWritableComparator() {
+      super(TupleWritable.class, true);
+    }
+    
+    public static void configureOrdering(Configuration conf, Order... orders) {
+      conf.set(CRUNCH_ORDERING_PROPERTY, Joiner.on(",").join(
+        Iterables.transform(Arrays.asList(orders),
+          new Function<Order, String>() {
+        @Override
+        public String apply(Order o) {
+          return o.name();
+        }
+      })));
+    }
+
+    
+    public static void configureOrdering(Configuration conf, ColumnOrder... columnOrders) {
+      conf.set(CRUNCH_ORDERING_PROPERTY, Joiner.on(",").join(
+        Iterables.transform(Arrays.asList(columnOrders),
+          new Function<ColumnOrder, String>() {
+        @Override
+        public String apply(ColumnOrder o) {
+          return o.column + ";" + o.order.name();
+        }
+      })));
+    }
+    
+    @Override
+    public int compare(WritableComparable a, WritableComparable b) {
+      TupleWritable ta = (TupleWritable) a;
+      TupleWritable tb = (TupleWritable) b;
+      for (int i = 0; i < columnOrders.length; i++) {
+        int index = columnOrders[i].column - 1;
+        int order = 1;
+        if (columnOrders[i].order == Order.ASCENDING) {
+          order = 1;
+        } else  if (columnOrders[i].order == Order.DESCENDING) {
+          order = -1;
+        } else { // ignore
+          continue;
+        }
+        if (!ta.has(index) && !tb.has(index)) {
+          continue;
+        } else if (ta.has(index) && !tb.has(index)) {
+          return order;
+        } else if (!ta.has(index) && tb.has(index)) {
+          return -order;
+        } else {
+          Writable v1 = ta.get(index);
+          Writable v2 = tb.get(index);
+          if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
+            if (v1 instanceof WritableComparable
+                && v2 instanceof WritableComparable) {
+              int cmp = ((WritableComparable) v1)
+                  .compareTo((WritableComparable) v2);
+              if (cmp != 0) {
+                return order * cmp;
+              }
+            } else {
+              int cmp = v1.hashCode() - v2.hashCode();
+              if (cmp != 0) {
+                return order * cmp;
+              }
+            }
+          }
+        }
+      }
+      return 0; // ordering using specified columns found no differences
+    }
+
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+      if (conf != null) {
+        String ordering = conf.get(CRUNCH_ORDERING_PROPERTY);
+        String[] columnOrderNames = ordering.split(",");
+        columnOrders = new ColumnOrder[columnOrderNames.length];
+        for (int i = 0; i < columnOrders.length; i++) {
+          String[] split = columnOrderNames[i].split(";");
+          int column = Integer.parseInt(split[0]);
+          Order order = Order.valueOf(split[1]);
+          columnOrders[i] = ColumnOrder.by(column, order);
+          
+        }
+      }
+    }
+  }
+  
+  static class TupleAvroComparator<T> extends Configured implements RawComparator<T> {
+
+    Schema schema;
+    
+    @Override
+    public void setConf(Configuration conf) {
+      super.setConf(conf);
+      if (conf != null) {
+        schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
+      }
+    }
+
+    public static <S> void configureOrdering(Configuration conf, ColumnOrder[] columnOrders,
+        PType<S> ptype) {
+      Schema orderedSchema = createOrderedTupleSchema(ptype, columnOrders);
+      conf.set("crunch.schema", orderedSchema.toString());
+    }
+    
+    // TODO: move to Avros
+    // TODO: need to re-order columns in map output then switch back in the reduce
+    //       this will require more extensive changes in Crunch
+    private static <S> Schema createOrderedTupleSchema(PType<S> ptype, ColumnOrder[] orders) {
+      // Guarantee each tuple schema has a globally unique name
+      String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
+      Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
+      List<Schema.Field> fields = Lists.newArrayList();
+      AvroType<S> parentAvroType = (AvroType<S>) ptype;
+      Schema parentAvroSchema = parentAvroType.getSchema();
+      
+      BitSet orderedColumns = new BitSet();
+      // First add any fields specified by ColumnOrder
+      for (ColumnOrder columnOrder : orders) {
+        int index = columnOrder.column - 1;
+        AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(index);
+        Schema fieldSchema = Schema.createUnion(
+            ImmutableList.of(atype.getSchema(), Schema.create(Type.NULL)));
+        String fieldName = parentAvroSchema.getFields().get(index).name();
+        fields.add(new Schema.Field(fieldName, fieldSchema, "", null,
+            Schema.Field.Order.valueOf(columnOrder.order.name())));
+        orderedColumns.set(index);
+      }
+      // Then add remaining fields from the ptypes, with no sort order
+      for (int i = 0; i < ptype.getSubTypes().size(); i++) {
+        if (orderedColumns.get(i)) {
+          continue;
+        }
+        AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(i);
+        Schema fieldSchema = Schema.createUnion(
+            ImmutableList.of(atype.getSchema(), Schema.create(Type.NULL)));
+        String fieldName = parentAvroSchema.getFields().get(i).name();
+        fields.add(new Schema.Field(fieldName, fieldSchema, "", null,
+            Schema.Field.Order.IGNORE));
+      }
+      schema.setFields(fields);
+      return schema;
+    }
+
+    @Override
+    public int compare(T o1, T o2) {
+      return ReflectData.get().compare(o1, o2, schema);
+    }
+
+    @Override
+    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4,
+        int arg5) {
+      return BinaryData.compare(arg0, arg1, arg2, arg3, arg4, arg5, schema);
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
new file mode 100644
index 0000000..4f0d5cc
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+import com.google.common.collect.Lists;
+
+/**
+ * Used to perform the last step of an full outer join.
+ *
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ */
+public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
+  
+  private transient int lastId;
+  private transient K lastKey;
+  private transient List<U> leftValues;
+
+  public FullOuterJoinFn(PType<U> leftValueType) {
+    super(leftValueType);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void initialize() {
+    lastId = 1;
+    lastKey = null;
+    this.leftValues = Lists.newArrayList();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void join(K key, int id, Iterable<Pair<U, V>> pairs,
+      Emitter<Pair<K, Pair<U, V>>> emitter) {
+    if (!key.equals(lastKey)) {
+      // Make sure that left side gets emitted.
+      if (0 == lastId && 0 == id) {
+        for (U u : leftValues) {
+          emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
+        }
+      }
+      lastKey = key;
+      leftValues.clear();
+    }
+    if (id == 0) {
+      for (Pair<U, V> pair : pairs) {
+        if (pair.first() != null)
+          leftValues.add(leftValueType.getDetachedValue(pair.first()));
+      }
+    } else {
+      for (Pair<U, V> pair : pairs) {
+        // Make sure that right side gets emitted.
+        if (leftValues.isEmpty()) {
+          leftValues.add(null);
+        }
+        for (U u : leftValues) {
+          emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
+        }
+      }
+    }
+
+    lastId = id;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void cleanup(Emitter<Pair<K, Pair<U, V>>> emitter) {
+    if (0 == lastId) {
+      for (U u : leftValues) {
+        emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String getJoinType() { return "fullOuterJoin"; }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
new file mode 100644
index 0000000..0c46bdf
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+import com.google.common.collect.Lists;
+
+/**
+ * Used to perform the last step of an inner join.
+ *
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ */
+public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> {
+
+  private transient K lastKey;
+  private transient List<U> leftValues;
+  
+  public InnerJoinFn(PType<U> leftValueType) {
+    super(leftValueType);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void initialize() {
+    lastKey = null;
+    this.leftValues = Lists.newArrayList();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void join(K key, int id, Iterable<Pair<U, V>> pairs,
+      Emitter<Pair<K, Pair<U, V>>> emitter) {
+    if (!key.equals(lastKey)) {
+      lastKey = key;
+      leftValues.clear();
+    }
+    if (id == 0) { // from left
+      for (Pair<U, V> pair : pairs) {
+        if (pair.first() != null)
+          leftValues.add(leftValueType.getDetachedValue(pair.first()));
+      }
+    } else { // from right
+      for (Pair<U, V> pair : pairs) {
+        for (U u : leftValues) {
+          emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
+        }
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String getJoinType() { return "innerJoin"; }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
new file mode 100644
index 0000000..494031c
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+
+/**
+ * Represents a {@link org.apache.crunch.DoFn} for performing joins.
+ *
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ */
+public abstract class JoinFn<K, U, V>
+    extends DoFn<Pair<Pair<K, Integer>, Iterable<Pair<U, V>>>, Pair<K, Pair<U, V>>> {
+  
+  protected PType<U> leftValueType;
+
+  /**
+   * Instantiate with the PType of the value of the left side of the join (used
+   * for creating deep copies of values).
+   * 
+   * @param leftValueType
+   *          The PType of the value type of the left side of the join
+   */
+  public JoinFn(PType<U> leftValueType) {
+    this.leftValueType = leftValueType;
+  }
+
+  /** @return The name of this join type (e.g. innerJoin, leftOuterJoin). */
+  public abstract String getJoinType();
+
+  /**
+   * Performs the actual joining.
+   *
+   * @param key The key for this grouping of values.
+   * @param id The side that this group of values is from (0 -> left, 1 -> right).
+   * @param pairs The group of values associated with this key and id pair.
+   * @param emitter The emitter to send the output to.
+   */
+  public abstract void join(K key, int id, Iterable<Pair<U, V>> pairs,
+      Emitter<Pair<K, Pair<U, V>>> emitter);
+
+  /**
+   * Split up the input record to make coding a bit more manageable.
+   *
+   * @param input The input record.
+   * @param emitter The emitter to send the output to.
+   */
+  @Override
+  public void process(Pair<Pair<K, Integer>, Iterable<Pair<U, V>>> input,
+      Emitter<Pair<K, Pair<U, V>>> emitter) {
+    join(input.first().first(), input.first().second(), input.second(), emitter);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/JoinUtils.java b/crunch/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
new file mode 100644
index 0000000..c282642
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.BinaryData;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.writable.TupleWritable;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+
+/**
+ * Utilities that are useful in joining multiple data sets via a MapReduce.
+ *
+ */
+public class JoinUtils {
+
+  public static Class<? extends Partitioner> getPartitionerClass(PTypeFamily typeFamily) {
+    if (typeFamily == WritableTypeFamily.getInstance()) {
+      return TupleWritablePartitioner.class;
+    } else {
+      return AvroIndexedRecordPartitioner.class;
+    }
+  }
+  
+  public static Class<? extends RawComparator> getGroupingComparator(PTypeFamily typeFamily) {
+    if (typeFamily == WritableTypeFamily.getInstance()) {
+      return TupleWritableComparator.class;
+    } else {
+      return AvroPairGroupingComparator.class;
+    }
+  }
+  
+  public static class TupleWritablePartitioner extends Partitioner<TupleWritable, Writable> {
+    @Override
+    public int getPartition(TupleWritable key, Writable value, int numPartitions) {
+      return (Math.abs(key.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
+    }
+  }
+  
+  public static class TupleWritableComparator implements RawComparator<TupleWritable> {
+    
+    private DataInputBuffer buffer = new DataInputBuffer();
+    private TupleWritable key1 = new TupleWritable();
+    private TupleWritable key2 = new TupleWritable();
+    
+    @Override
+    public int compare(TupleWritable o1, TupleWritable o2) {
+      return ((WritableComparable)o1.get(0)).compareTo((WritableComparable)o2.get(0));
+    }
+
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      try {
+        buffer.reset(b1, s1, l1);
+        key1.readFields(buffer);
+      
+        buffer.reset(b2, s2, l2);
+        key2.readFields(buffer);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      
+      return compare(key1, key2);
+    }
+  }
+  
+  public static class AvroIndexedRecordPartitioner<K, V> extends Partitioner<AvroKey<K>, AvroValue<V>> {
+    @Override
+    public int getPartition(AvroKey<K> key, AvroValue<V> value, int numPartitions) {
+      IndexedRecord record = (IndexedRecord) key.datum();
+      return (Math.abs(record.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
+    }
+  }
+  
+  public static class AvroPairGroupingComparator<T> extends Configured implements RawComparator<AvroWrapper<T>> {
+    private Schema schema;
+
+    @Override
+    public void setConf(Configuration conf) {
+      super.setConf(conf);
+      if (conf != null) {
+        Schema mapOutputSchema = AvroJob.getMapOutputSchema(conf);
+        Schema keySchema = org.apache.avro.mapred.Pair.getKeySchema(mapOutputSchema);
+        schema = keySchema.getFields().get(0).schema();
+      }
+    }
+    
+    @Override
+    public int compare(AvroWrapper<T> x, AvroWrapper<T> y) {
+      return ReflectData.get().compare(x.datum(), y.datum(), schema);
+    }
+
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      return BinaryData.compare(b1, s1, l1, b2, s2, l2, schema);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
new file mode 100644
index 0000000..6e4d3c6
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.util.List;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+import com.google.common.collect.Lists;
+
+/**
+ * Used to perform the last step of an left outer join.
+ *
+ * @param <K> Type of the keys.
+ * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ */
+public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
+  
+  private transient int lastId;
+  private transient K lastKey;
+  private transient List<U> leftValues;
+
+  public LeftOuterJoinFn(PType<U> leftValueType) {
+    super(leftValueType);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void initialize() {
+    lastId = 1;
+    lastKey = null;
+    this.leftValues = Lists.newArrayList();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void join(K key, int id, Iterable<Pair<U, V>> pairs,
+      Emitter<Pair<K, Pair<U, V>>> emitter) {
+    if (!key.equals(lastKey)) {
+      // Make sure that left side always gets emitted.
+      if (0 == lastId && 0 == id) {
+        for (U u : leftValues) {
+          emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
+        }
+      }
+      lastKey = key;
+      leftValues.clear();
+    }
+    if (id == 0) {
+      for (Pair<U, V> pair : pairs) {
+        if (pair.first() != null)
+          leftValues.add(leftValueType.getDetachedValue(pair.first()));
+      }
+    } else {
+      for (Pair<U, V> pair : pairs) {
+        for (U u : leftValues) {
+          emitter.emit(Pair.of(lastKey, Pair.of(u, pair.second())));
+        }
+      }
+    }
+
+    lastId = id;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void cleanup(Emitter<Pair<K, Pair<U, V>>> emitter) {
+    if (0 == lastId) {
+      for (U u : leftValues) {
+        emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null)));
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String getJoinType() { return "leftOuterJoin"; }
+}


Mime
View raw message