crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [36/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
Date Tue, 23 Apr 2013 20:41:38 GMT
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/resources/urls.txt
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/urls.txt b/crunch-core/src/it/resources/urls.txt
new file mode 100644
index 0000000..827e711
--- /dev/null
+++ b/crunch-core/src/it/resources/urls.txt
@@ -0,0 +1,11 @@
+www.A.com	www.B.com
+www.A.com	www.C.com
+www.A.com	www.D.com
+www.A.com	www.E.com
+www.B.com	www.D.com
+www.B.com	www.E.com
+www.C.com	www.D.com
+www.D.com	www.B.com
+www.E.com	www.A.com
+www.F.com	www.B.com
+www.F.com	www.C.com

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/Aggregator.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Aggregator.java b/crunch-core/src/main/java/org/apache/crunch/Aggregator.java
new file mode 100644
index 0000000..432452b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/Aggregator.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+
+
+/**
+ * Aggregate a sequence of values into a possibly smaller sequence of the same type.
+ *
+ * <p>In most cases, an Aggregator will turn multiple values into a single value,
+ * like creating a sum, finding the minimum or maximum, etc. In some cases
+ * (ie. finding the top K elements), an implementation may return more than
+ * one value. The {@link org.apache.crunch.fn.Aggregators} utility class contains
+ * factory methods for creating all kinds of pre-defined Aggregators that should
+ * cover the most common cases.</p>
+ *
+ * <p>Aggregator implementations should usually be <em>associative</em> and
+ * <em>commutative</em>, which makes their results deterministic. If your aggregation
+ * function isn't commutative, you can still use secondary sort to that effect.</p>
+ *
+ * <p>The lifecycle of an {@link Aggregator} always begins with you instantiating
+ * it and passing it to Crunch. When running your {@link Pipeline}, Crunch serializes
+ * the instance and deserializes it wherever it is needed on the cluster. This is how
+ * Crunch uses a deserialized instance:<p>
+ *
+ * <ol>
+ *   <li>call {@link #initialize(Configuration)} once</li>
+ *   <li>call {@link #reset()}
+ *   <li>call {@link #update(Object)} multiple times until all values of a sequence
+ *       have been aggregated</li>
+ *   <li>call {@link #results()} to retrieve the aggregated result</li>
+ *   <li>go back to step 2 until all sequences have been aggregated</li>
+ * </ol>
+ *
+ * @param <T> The value types to aggregate
+ */
+public interface Aggregator<T> extends Serializable {
+
+  /**
+   * Perform any setup of this instance that is required prior to processing
+   * inputs.
+   *
+   * @param conf Hadoop configuration
+   */
+  void initialize(Configuration conf);
+
+  /**
+   * Clears the internal state of this Aggregator and prepares it for the
+   * values associated with the next key.
+   *
+   * Depending on what you aggregate, this typically means setting a variable
+   * to zero or clearing a list. Failing to do this will yield wrong results!
+   */
+  void reset();
+
+  /**
+   * Incorporate the given value into the aggregate state maintained by this
+   * instance.
+   *
+   * @param value The value to add to the aggregated state
+   */
+  void update(T value);
+
+  /**
+   * Returns the current aggregated state of this instance.
+   */
+  Iterable<T> results();
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/CombineFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/CombineFn.java b/crunch-core/src/main/java/org/apache/crunch/CombineFn.java
new file mode 100644
index 0000000..71e8057
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/CombineFn.java
@@ -0,0 +1,1211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.SortedSet;
+
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.util.Tuples;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * A special {@link DoFn} implementation that converts an {@link Iterable} of
+ * values into a single value. If a {@code CombineFn} instance is used on a
+ * {@link PGroupedTable}, the function will be applied to the output of the map
+ * stage before the data is passed to the reducer, which can improve the runtime
+ * of certain classes of jobs.
+ * <p>
+ * Note that the incoming {@code Iterable} can only be used to create an 
+ * {@code Iterator} once. Calling {@link Iterable#iterator()} method a second
+ * time will throw an {@link IllegalStateException}.
+ */
+public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, T>> {
+
+  /**
+   * @deprecated Use {@link org.apache.crunch.Aggregator}
+   */
+  public static interface Aggregator<T> extends Serializable {
+    /**
+     * Perform any setup of this instance that is required prior to processing
+     * inputs.
+     */
+    void initialize(Configuration configuration);
+
+    /**
+     * Clears the internal state of this Aggregator and prepares it for the
+     * values associated with the next key.
+     */
+    void reset();
+
+    /**
+     * Incorporate the given value into the aggregate state maintained by this
+     * instance.
+     */
+    void update(T value);
+
+    /**
+     * Returns the current aggregated state of this instance.
+     */
+    Iterable<T> results();
+  }
+
+  /**
+   * Base class for aggregators that do not require any initialization.
+   *
+   * @deprecated Use {@link org.apache.crunch.fn.Aggregators.SimpleAggregator}
+   */
+  public static abstract class SimpleAggregator<T> implements Aggregator<T> {
+    @Override
+    public void initialize(Configuration conf) {
+      // No-op
+    }
+  }
+  
+  /**
+   * Interface for constructing new aggregator instances.
+   *
+   * @deprecated Use {@link PGroupedTable#combineValues(Aggregator)} which doesn't require a factory.
+   */
+  public static interface AggregatorFactory<T> {
+    Aggregator<T> create();
+  }
+
+  /**
+   * A {@code CombineFn} that delegates all of the actual work to an
+   * {@code Aggregator} instance.
+   *
+   * @deprecated Use the {@link Aggregators#toCombineFn(org.apache.crunch.Aggregator)} adapter
+   */
+  public static class AggregatorCombineFn<K, V> extends CombineFn<K, V> {
+
+    private final Aggregator<V> aggregator;
+
+    public AggregatorCombineFn(Aggregator<V> aggregator) {
+      this.aggregator = aggregator;
+    }
+
+    @Override
+    public void initialize() {
+      aggregator.initialize(getConfiguration());
+    }
+    
+    @Override
+    public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
+      aggregator.reset();
+      for (V v : input.second()) {
+        aggregator.update(v);
+      }
+      for (V v : aggregator.results()) {
+        emitter.emit(Pair.of(input.first(), v));
+      }
+    }
+  }
+
+  private static abstract class TupleAggregator<T> implements Aggregator<T> {
+    private final List<Aggregator<Object>> aggregators;
+
+    public TupleAggregator(Aggregator<?>... aggregators) {
+      this.aggregators = Lists.newArrayList();
+      for (Aggregator<?> a : aggregators) {
+        this.aggregators.add((Aggregator<Object>) a);
+      }
+    }
+
+    @Override
+    public void initialize(Configuration configuration) {
+      for (Aggregator<?> a : aggregators) {
+        a.initialize(configuration);
+      }
+    }
+    
+    @Override
+    public void reset() {
+      for (Aggregator<?> a : aggregators) {
+        a.reset();
+      }
+    }
+
+    protected void updateTuple(Tuple t) {
+      for (int i = 0; i < aggregators.size(); i++) {
+        aggregators.get(i).update(t.get(i));
+      }
+    }
+
+    protected Iterable<Object> results(int index) {
+      return aggregators.get(index).results();
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#pairAggregator(Aggregator, Aggregator)}
+   */
+  public static class PairAggregator<V1, V2> extends TupleAggregator<Pair<V1, V2>> {
+
+    public PairAggregator(Aggregator<V1> a1, Aggregator<V2> a2) {
+      super(a1, a2);
+    }
+
+    @Override
+    public void update(Pair<V1, V2> value) {
+      updateTuple(value);
+    }
+
+    @Override
+    public Iterable<Pair<V1, V2>> results() {
+      return new Tuples.PairIterable<V1, V2>((Iterable<V1>) results(0), (Iterable<V2>) results(1));
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#tripAggregator(Aggregator, Aggregator, Aggregator)}
+   */
+  public static class TripAggregator<A, B, C> extends TupleAggregator<Tuple3<A, B, C>> {
+
+    public TripAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3) {
+      super(a1, a2, a3);
+    }
+
+    @Override
+    public void update(Tuple3<A, B, C> value) {
+      updateTuple(value);
+    }
+
+    @Override
+    public Iterable<Tuple3<A, B, C>> results() {
+      return new Tuples.TripIterable<A, B, C>((Iterable<A>) results(0), (Iterable<B>) results(1),
+          (Iterable<C>) results(2));
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#quadAggregator(Aggregator, Aggregator, Aggregator, Aggregator)}
+   */
+  public static class QuadAggregator<A, B, C, D> extends TupleAggregator<Tuple4<A, B, C, D>> {
+
+    public QuadAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3, Aggregator<D> a4) {
+      super(a1, a2, a3, a4);
+    }
+
+    @Override
+    public void update(Tuple4<A, B, C, D> value) {
+      updateTuple(value);
+    }
+
+    @Override
+    public Iterable<Tuple4<A, B, C, D>> results() {
+      return new Tuples.QuadIterable<A, B, C, D>((Iterable<A>) results(0), (Iterable<B>) results(1),
+          (Iterable<C>) results(2), (Iterable<D>) results(3));
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#tupleAggregator(Aggregator...)}
+   */
+  public static class TupleNAggregator extends TupleAggregator<TupleN> {
+
+    private final int size;
+
+    public TupleNAggregator(Aggregator<?>... aggregators) {
+      super(aggregators);
+      size = aggregators.length;
+    }
+
+    @Override
+    public void update(TupleN value) {
+      updateTuple(value);
+    }
+
+    @Override
+    public Iterable<TupleN> results() {
+      Iterable<?>[] iterables = new Iterable[size];
+      for (int i = 0; i < size; i++) {
+        iterables[i] = results(i);
+      }
+      return new Tuples.TupleNIterable(iterables);
+    }
+
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#toCombineFn(Aggregator)}
+   */
+  public static final <K, V> CombineFn<K, V> aggregator(Aggregator<V> aggregator) {
+    return new AggregatorCombineFn<K, V>(aggregator);
+  }
+
+  /**
+   * @deprecated Use {@link PGroupedTable#combineValues(Aggregator)} which doesn't require a factory.
+   */
+  public static final <K, V> CombineFn<K, V> aggregatorFactory(AggregatorFactory<V> aggregator) {
+    return new AggregatorCombineFn<K, V>(aggregator.create());
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#pairAggregator(Aggregator, Aggregator)}
+   */
+  public static final <K, V1, V2> CombineFn<K, Pair<V1, V2>> pairAggregator(AggregatorFactory<V1> a1,
+      AggregatorFactory<V2> a2) {
+    return aggregator(new PairAggregator<V1, V2>(a1.create(), a2.create()));
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#tripAggregator(Aggregator, Aggregator, Aggregator)}
+   */
+  public static final <K, A, B, C> CombineFn<K, Tuple3<A, B, C>> tripAggregator(AggregatorFactory<A> a1,
+      AggregatorFactory<B> a2, AggregatorFactory<C> a3) {
+    return aggregator(new TripAggregator<A, B, C>(a1.create(), a2.create(), a3.create()));
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#quadAggregator(Aggregator, Aggregator, Aggregator, Aggregator)}
+   */
+  public static final <K, A, B, C, D> CombineFn<K, Tuple4<A, B, C, D>> quadAggregator(AggregatorFactory<A> a1,
+      AggregatorFactory<B> a2, AggregatorFactory<C> a3, AggregatorFactory<D> a4) {
+    return aggregator(new QuadAggregator<A, B, C, D>(a1.create(), a2.create(), a3.create(), a4.create()));
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#tupleAggregator(Aggregator...)}
+   */
+  public static final <K> CombineFn<K, TupleN> tupleAggregator(AggregatorFactory<?>... factories) {
+    Aggregator<?>[] aggs = new Aggregator[factories.length];
+    for (int i = 0; i < aggs.length; i++) {
+      aggs[i] = factories[i].create();
+    }
+    return aggregator(new TupleNAggregator(aggs));
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#SUM_LONGS()}
+   */
+  public static final <K> CombineFn<K, Long> SUM_LONGS() {
+    return aggregatorFactory(SUM_LONGS);
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#SUM_INTS()}
+   */
+  public static final <K> CombineFn<K, Integer> SUM_INTS() {
+    return aggregatorFactory(SUM_INTS);
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#SUM_FLOATS()}
+   */
+  public static final <K> CombineFn<K, Float> SUM_FLOATS() {
+    return aggregatorFactory(SUM_FLOATS);
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#SUM_DOUBLES()}
+   */
+  public static final <K> CombineFn<K, Double> SUM_DOUBLES() {
+    return aggregatorFactory(SUM_DOUBLES);
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#SUM_BIGINTS()}
+   */
+  public static final <K> CombineFn<K, BigInteger> SUM_BIGINTS() {
+    return aggregatorFactory(SUM_BIGINTS);
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_LONGS()}
+   */
+  public static final <K> CombineFn<K, Long> MAX_LONGS() {
+    return aggregatorFactory(MAX_LONGS);
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_LONGS(int)}
+   */
+  public static final <K> CombineFn<K, Long> MAX_LONGS(int n) {
+    return aggregator(new MaxNAggregator<Long>(n));
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_INTS()}
+   */
+  public static final <K> CombineFn<K, Integer> MAX_INTS() {
+    return aggregatorFactory(MAX_INTS);
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_INTS(int)}
+   */
+  public static final <K> CombineFn<K, Integer> MAX_INTS(int n) {
+    return aggregator(new MaxNAggregator<Integer>(n));
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_FLOATS()}
+   */
+  public static final <K> CombineFn<K, Float> MAX_FLOATS() {
+    return aggregatorFactory(MAX_FLOATS);
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_FLOATS(int)}
+   */
+  public static final <K> CombineFn<K, Float> MAX_FLOATS(int n) {
+    return aggregator(new MaxNAggregator<Float>(n));
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_DOUBLES()}
+   */
+  public static final <K> CombineFn<K, Double> MAX_DOUBLES() {
+    return aggregatorFactory(MAX_DOUBLES);
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_DOUBLES(int)}
+   */
+  public static final <K> CombineFn<K, Double> MAX_DOUBLES(int n) {
+    return aggregator(new MaxNAggregator<Double>(n));
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_BIGINTS()}
+   */
+  public static final <K> CombineFn<K, BigInteger> MAX_BIGINTS() {
+    return aggregatorFactory(MAX_BIGINTS);
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_BIGINTS(int)}
+   */
+  public static final <K> CombineFn<K, BigInteger> MAX_BIGINTS(int n) {
+    return aggregator(new MaxNAggregator<BigInteger>(n));
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_LONGS()}
+   */
+  public static final <K> CombineFn<K, Long> MIN_LONGS() {
+    return aggregatorFactory(MIN_LONGS);
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_LONGS(int)}
+   */
+  public static final <K> CombineFn<K, Long> MIN_LONGS(int n) {
+    return aggregator(new MinNAggregator<Long>(n));
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_INTS()}
+   */
+  public static final <K> CombineFn<K, Integer> MIN_INTS() {
+    return aggregatorFactory(MIN_INTS);
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_INTS(int)}
+   */
+  public static final <K> CombineFn<K, Integer> MIN_INTS(int n) {
+    return aggregator(new MinNAggregator<Integer>(n));
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_FLOATS()}
+   */
+  public static final <K> CombineFn<K, Float> MIN_FLOATS() {
+    return aggregatorFactory(MIN_FLOATS);
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_FLOATS(int)}
+   */
+  public static final <K> CombineFn<K, Float> MIN_FLOATS(int n) {
+    return aggregator(new MinNAggregator<Float>(n));
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_DOUBLES()}
+   */
+  public static final <K> CombineFn<K, Double> MIN_DOUBLES() {
+    return aggregatorFactory(MIN_DOUBLES);
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_DOUBLES(int)}
+   */
+  public static final <K> CombineFn<K, Double> MIN_DOUBLES(int n) {
+    return aggregator(new MinNAggregator<Double>(n));
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_BIGINTS()}
+   */
+  public static final <K> CombineFn<K, BigInteger> MIN_BIGINTS() {
+    return aggregatorFactory(MIN_BIGINTS);
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_BIGINTS(int)}
+   */
+  public static final <K> CombineFn<K, BigInteger> MIN_BIGINTS(int n) {
+    return aggregator(new MinNAggregator<BigInteger>(n));
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#FIRST_N(int)}
+   */
+  public static final <K, V> CombineFn<K, V> FIRST_N(int n) {
+    return aggregator(new FirstNAggregator<V>(n));
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#LAST_N(int)}
+   */
+  public static final <K, V> CombineFn<K, V> LAST_N(int n) {
+    return aggregator(new LastNAggregator<V>(n));
+  }
+
+  /**
+   * Used to concatenate strings, with a separator between each strings. There
+   * is no limits of length for the concatenated string.
+   * 
+   * @param separator
+   *            the separator which will be appended between each string
+   * @param skipNull
+   *            define if we should skip null values. Throw
+   *            NullPointerException if set to false and there is a null
+   *            value.
+   * @return
+   *
+   * @deprecated Use {@link Aggregators#STRING_CONCAT(String, boolean)}
+   */
+  public static final <K> CombineFn<K, String> STRING_CONCAT(final String separator, final boolean skipNull) {
+      return aggregator(new StringConcatAggregator(separator, skipNull));
+  }
+
+  /**
+   * Used to concatenate strings, with a separator between each strings. You
+   * can specify the maximum length of the output string and of the input
+   * strings, if they are > 0. If a value is <= 0, there is no limits.
+   * 
+   * Any too large string (or any string which would made the output too
+   * large) will be silently discarded.
+   * 
+   * @param separator
+   *            the separator which will be appended between each string
+   * @param skipNull
+   *            define if we should skip null values. Throw
+   *            NullPointerException if set to false and there is a null
+   *            value.
+   * @param maxOutputLength
+   *            the maximum length of the output string. If it's set <= 0,
+   *            there is no limits. The number of characters of the output
+   *            string will be < maxOutputLength.
+   * @param maxInputLength
+   *            the maximum length of the input strings. If it's set <= 0,
+   *            there is no limits. The number of characters of the int string
+   *            will be < maxInputLength to be concatenated.
+   * @return
+   *
+   * @deprecated Use {@link Aggregators#STRING_CONCAT(String, boolean, long, long)}
+   */
+  public static final <K> CombineFn<K, String> STRING_CONCAT(final String separator, final boolean skipNull, final long maxOutputLength, final long maxInputLength) {
+      return aggregator(new StringConcatAggregator(separator, skipNull, maxOutputLength, maxInputLength));
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#SUM_LONGS()}
+   */
+  public static class SumLongs extends SimpleAggregator<Long> {
+    private long sum = 0;
+
+    @Override
+    public void reset() {
+      sum = 0;
+    }
+
+    @Override
+    public void update(Long next) {
+      sum += next;
+    }
+
+    @Override
+    public Iterable<Long> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#SUM_LONGS()}
+   */
+  public static AggregatorFactory<Long> SUM_LONGS = new AggregatorFactory<Long>() {
+    public Aggregator<Long> create() {
+      return new SumLongs();
+    }
+  };
+
+  /**
+   * @deprecated Use {@link Aggregators#SUM_INTS()}
+   */
+  public static class SumInts extends SimpleAggregator<Integer> {
+    private int sum = 0;
+
+    @Override
+    public void reset() {
+      sum = 0;
+    }
+
+    @Override
+    public void update(Integer next) {
+      sum += next;
+    }
+
+    @Override
+    public Iterable<Integer> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#SUM_INTS()}
+   */
+  public static AggregatorFactory<Integer> SUM_INTS = new AggregatorFactory<Integer>() {
+    public Aggregator<Integer> create() {
+      return new SumInts();
+    }
+  };
+
+  /**
+   * @deprecated Use {@link Aggregators#SUM_FLOATS()}
+   */
+  public static class SumFloats extends SimpleAggregator<Float> {
+    private float sum = 0;
+
+    @Override
+    public void reset() {
+      sum = 0f;
+    }
+
+    @Override
+    public void update(Float next) {
+      sum += next;
+    }
+
+    @Override
+    public Iterable<Float> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#SUM_FLOATS()}
+   */
+  public static AggregatorFactory<Float> SUM_FLOATS = new AggregatorFactory<Float>() {
+    public Aggregator<Float> create() {
+      return new SumFloats();
+    }
+  };
+
+  /**
+   * @deprecated Use {@link Aggregators#SUM_DOUBLES()}
+   */
+  public static class SumDoubles extends SimpleAggregator<Double> {
+    private double sum = 0;
+
+    @Override
+    public void reset() {
+      sum = 0f;
+    }
+
+    @Override
+    public void update(Double next) {
+      sum += next;
+    }
+
+    @Override
+    public Iterable<Double> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#SUM_DOUBLES()}
+   */
+  public static AggregatorFactory<Double> SUM_DOUBLES = new AggregatorFactory<Double>() {
+    public Aggregator<Double> create() {
+      return new SumDoubles();
+    }
+  };
+
+  /**
+   * @deprecated Use {@link Aggregators#SUM_BIGINTS()}
+   */
+  public static class SumBigInts extends SimpleAggregator<BigInteger> {
+    private BigInteger sum = BigInteger.ZERO;
+
+    @Override
+    public void reset() {
+      sum = BigInteger.ZERO;
+    }
+
+    @Override
+    public void update(BigInteger next) {
+      sum = sum.add(next);
+    }
+
+    @Override
+    public Iterable<BigInteger> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#SUM_BIGINTS()}
+   */
+  public static AggregatorFactory<BigInteger> SUM_BIGINTS = new AggregatorFactory<BigInteger>() {
+    public Aggregator<BigInteger> create() {
+      return new SumBigInts();
+    }
+  };
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_LONGS()}
+   */
+  public static class MaxLongs extends SimpleAggregator<Long> {
+    private Long max = null;
+
+    @Override
+    public void reset() {
+      max = null;
+    }
+
+    @Override
+    public void update(Long next) {
+      if (max == null || max < next) {
+        max = next;
+      }
+    }
+
+    @Override
+    public Iterable<Long> results() {
+      return ImmutableList.of(max);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_LONGS()}
+   */
+  public static AggregatorFactory<Long> MAX_LONGS = new AggregatorFactory<Long>() {
+    public Aggregator<Long> create() {
+      return new MaxLongs();
+    }
+  };
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_INTS()}
+   */
+  public static class MaxInts extends SimpleAggregator<Integer> {
+    private Integer max = null;
+
+    @Override
+    public void reset() {
+      max = null;
+    }
+
+    @Override
+    public void update(Integer next) {
+      if (max == null || max < next) {
+        max = next;
+      }
+    }
+
+    @Override
+    public Iterable<Integer> results() {
+      return ImmutableList.of(max);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_INTS()}
+   */
+  public static AggregatorFactory<Integer> MAX_INTS = new AggregatorFactory<Integer>() {
+    public Aggregator<Integer> create() {
+      return new MaxInts();
+    }
+  };
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_FLOATS()}
+   */
+  public static class MaxFloats extends SimpleAggregator<Float> {
+    private Float max = null;
+
+    @Override
+    public void reset() {
+      max = null;
+    }
+
+    @Override
+    public void update(Float next) {
+      if (max == null || max < next) {
+        max = next;
+      }
+    }
+
+    @Override
+    public Iterable<Float> results() {
+      return ImmutableList.of(max);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_FLOATS()}
+   */
+  public static AggregatorFactory<Float> MAX_FLOATS = new AggregatorFactory<Float>() {
+    public Aggregator<Float> create() {
+      return new MaxFloats();
+    }
+  };
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_DOUBLES()}
+   */
+  public static class MaxDoubles extends SimpleAggregator<Double> {
+    private Double max = null;
+
+    @Override
+    public void reset() {
+      max = null;
+    }
+
+    @Override
+    public void update(Double next) {
+      if (max == null || max < next) {
+        max = next;
+      }
+    }
+
+    @Override
+    public Iterable<Double> results() {
+      return ImmutableList.of(max);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_DOUBLES()}
+   */
+  public static AggregatorFactory<Double> MAX_DOUBLES = new AggregatorFactory<Double>() {
+    public Aggregator<Double> create() {
+      return new MaxDoubles();
+    }
+  };
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_BIGINTS()}
+   */
+  public static class MaxBigInts extends SimpleAggregator<BigInteger> {
+    private BigInteger max = null;
+
+    @Override
+    public void reset() {
+      max = null;
+    }
+
+    @Override
+    public void update(BigInteger next) {
+      if (max == null || max.compareTo(next) < 0) {
+        max = next;
+      }
+    }
+
+    @Override
+    public Iterable<BigInteger> results() {
+      return ImmutableList.of(max);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_BIGINTS()}
+   */
+  public static AggregatorFactory<BigInteger> MAX_BIGINTS = new AggregatorFactory<BigInteger>() {
+    public Aggregator<BigInteger> create() {
+      return new MaxBigInts();
+    }
+  };
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_LONGS()}
+   */
+  public static class MinLongs extends SimpleAggregator<Long> {
+    private Long min = null;
+
+    @Override
+    public void reset() {
+      min = null;
+    }
+
+    @Override
+    public void update(Long next) {
+      if (min == null || min > next) {
+        min = next;
+      }
+    }
+
+    @Override
+    public Iterable<Long> results() {
+      return ImmutableList.of(min);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_LONGS()}
+   */
+  public static AggregatorFactory<Long> MIN_LONGS = new AggregatorFactory<Long>() {
+    public Aggregator<Long> create() {
+      return new MinLongs();
+    }
+  };
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_INTS()}
+   */
+  public static class MinInts extends SimpleAggregator<Integer> {
+    private Integer min = null;
+
+    @Override
+    public void reset() {
+      min = null;
+    }
+
+    @Override
+    public void update(Integer next) {
+      if (min == null || min > next) {
+        min = next;
+      }
+    }
+
+    @Override
+    public Iterable<Integer> results() {
+      return ImmutableList.of(min);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_INTS()}
+   */
+  public static AggregatorFactory<Integer> MIN_INTS = new AggregatorFactory<Integer>() {
+    public Aggregator<Integer> create() {
+      return new MinInts();
+    }
+  };
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_FLOATS()}
+   */
+  public static class MinFloats extends SimpleAggregator<Float> {
+    private Float min = null;
+
+    @Override
+    public void reset() {
+      min = null;
+    }
+
+    @Override
+    public void update(Float next) {
+      if (min == null || min > next) {
+        min = next;
+      }
+    }
+
+    @Override
+    public Iterable<Float> results() {
+      return ImmutableList.of(min);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_FLOATS()}
+   */
+  public static AggregatorFactory<Float> MIN_FLOATS = new AggregatorFactory<Float>() {
+    public Aggregator<Float> create() {
+      return new MinFloats();
+    }
+  };
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_DOUBLES()}
+   */
+  public static class MinDoubles extends SimpleAggregator<Double> {
+    private Double min = null;
+
+    @Override
+    public void reset() {
+      min = null;
+    }
+
+    @Override
+    public void update(Double next) {
+      if (min == null || min > next) {
+        min = next;
+      }
+    }
+
+    @Override
+    public Iterable<Double> results() {
+      return ImmutableList.of(min);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_DOUBLES()}
+   */
+  public static AggregatorFactory<Double> MIN_DOUBLES = new AggregatorFactory<Double>() {
+    public Aggregator<Double> create() {
+      return new MinDoubles();
+    }
+  };
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_BIGINTS()}
+   */
+  public static class MinBigInts extends SimpleAggregator<BigInteger> {
+    private BigInteger min = null;
+
+    @Override
+    public void reset() {
+      min = null;
+    }
+
+    @Override
+    public void update(BigInteger next) {
+      if (min == null || min.compareTo(next) > 0) {
+        min = next;
+      }
+    }
+
+    @Override
+    public Iterable<BigInteger> results() {
+      return ImmutableList.of(min);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_BIGINTS()}
+   */
+  public static AggregatorFactory<BigInteger> MIN_BIGINTS = new AggregatorFactory<BigInteger>() {
+    public Aggregator<BigInteger> create() {
+      return new MinBigInts();
+    }
+  };
+
+  /**
+   * @deprecated Use {@link Aggregators#MAX_N(int, Class)}
+   */
+  public static class MaxNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
+    private final int arity;
+    private transient SortedSet<V> elements;
+
+    public MaxNAggregator(int arity) {
+      this.arity = arity;
+    }
+
+    @Override
+    public void reset() {
+      if (elements == null) {
+        elements = Sets.newTreeSet();
+      } else {
+        elements.clear();
+      }
+    }
+
+    @Override
+    public void update(V value) {
+      if (elements.size() < arity) {
+        elements.add(value);
+      } else if (value.compareTo(elements.first()) > 0) {
+        elements.remove(elements.first());
+        elements.add(value);
+      }
+    }
+
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#MIN_N(int, Class)}
+   */
+  public static class MinNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
+    private final int arity;
+    private transient SortedSet<V> elements;
+
+    public MinNAggregator(int arity) {
+      this.arity = arity;
+    }
+
+    @Override
+    public void reset() {
+      if (elements == null) {
+        elements = Sets.newTreeSet();
+      } else {
+        elements.clear();
+      }
+    }
+
+    @Override
+    public void update(V value) {
+      if (elements.size() < arity) {
+        elements.add(value);
+      } else if (value.compareTo(elements.last()) < 0) {
+        elements.remove(elements.last());
+        elements.add(value);
+      }
+    }
+
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#FIRST_N(int)}
+   */
+  public static class FirstNAggregator<V> extends SimpleAggregator<V> {
+    private final int arity;
+    private final List<V> elements;
+
+    public FirstNAggregator(int arity) {
+      this.arity = arity;
+      this.elements = Lists.newArrayList();
+    }
+
+    @Override
+    public void reset() {
+      elements.clear();
+    }
+
+    @Override
+    public void update(V value) {
+      if (elements.size() < arity) {
+        elements.add(value);
+      }
+    }
+
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#LAST_N(int)}
+   */
+  public static class LastNAggregator<V> extends SimpleAggregator<V> {
+    private final int arity;
+    private final LinkedList<V> elements;
+
+    public LastNAggregator(int arity) {
+      this.arity = arity;
+      this.elements = Lists.newLinkedList();
+    }
+
+    @Override
+    public void reset() {
+      elements.clear();
+    }
+
+    @Override
+    public void update(V value) {
+      elements.add(value);
+      if (elements.size() == arity + 1) {
+        elements.removeFirst();
+      }
+    }
+
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link Aggregators#STRING_CONCAT(String, boolean, long, long)}
+   */
+  public static class StringConcatAggregator extends SimpleAggregator<String> {
+    private final String separator;
+    private final boolean skipNulls;
+    private final long maxOutputLength;
+    private final long maxInputLength;
+    private long currentLength;
+    private final LinkedList<String> list = new LinkedList<String>();
+
+    private transient Joiner joiner;
+    
+    public StringConcatAggregator(final String separator, final boolean skipNulls) {
+      this.separator = separator;
+      this.skipNulls = skipNulls;
+      this.maxInputLength = 0;
+      this.maxOutputLength = 0;
+    }
+
+    public StringConcatAggregator(final String separator, final boolean skipNull, final long maxOutputLength, final long maxInputLength) {
+      this.separator = separator;
+      this.skipNulls = skipNull;
+      this.maxOutputLength = maxOutputLength;
+      this.maxInputLength = maxInputLength;
+      this.currentLength = -separator.length();
+    }
+
+    @Override
+    public void reset() {
+      if (joiner == null) {
+        joiner = skipNulls ? Joiner.on(separator).skipNulls() : Joiner.on(separator);
+      }
+      currentLength = -separator.length();
+      list.clear();
+    }
+
+    @Override
+    public void update(final String next) {
+      long length = (next == null) ? 0 : next.length() + separator.length();
+      if (maxOutputLength > 0 && currentLength + length > maxOutputLength || maxInputLength > 0 && next.length() > maxInputLength) {
+        return;
+      }
+      if (maxOutputLength > 0) {
+        currentLength += length;
+      }
+      list.add(next);
+    }
+
+    @Override
+    public Iterable<String> results() {
+      return ImmutableList.of(joiner.join(list));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/CrunchRuntimeException.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/CrunchRuntimeException.java b/crunch-core/src/main/java/org/apache/crunch/CrunchRuntimeException.java
new file mode 100644
index 0000000..044f600
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/CrunchRuntimeException.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+/**
+ * A {@code RuntimeException} implementation that includes some additional options
+ * for the Crunch execution engine to track reporting status. Clients may
+ * use instances of this class in their own {@code DoFn} implementations.
+ */
+public class CrunchRuntimeException extends RuntimeException {
+
+  private boolean logged = false;
+
+  public CrunchRuntimeException(String msg) {
+    super(msg);
+  }
+
+  public CrunchRuntimeException(Exception e) {
+    super(e);
+  }
+
+  public CrunchRuntimeException(String msg, Exception e) {
+    super(msg, e);
+  }
+
+  /**
+   * Returns true if this exception was written to the debug logs.
+   */
+  public boolean wasLogged() {
+    return logged;
+  }
+
+  /**
+   * Indicate that this exception has been written to the debug logs.
+   */
+  public void markLogged() {
+    this.logged = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/DoFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/DoFn.java b/crunch-core/src/main/java/org/apache/crunch/DoFn.java
new file mode 100644
index 0000000..2c6389a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/DoFn.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * Base class for all data processing functions in Crunch.
+ * 
+ * <p>
+ * Note that all {@code DoFn} instances implement {@link Serializable}, and thus
+ * all of their non-transient member variables must implement
+ * {@code Serializable} as well. If your DoFn depends on non-serializable
+ * classes for data processing, they may be declared as {@code transient} and
+ * initialized in the DoFn's {@code initialize} method.
+ * 
+ */
+public abstract class DoFn<S, T> implements Serializable {
+  private transient TaskInputOutputContext<?, ?, ?, ?> context;
+
+  /**
+   * Configure this DoFn. Subclasses may override this method to modify the
+   * configuration of the Job that this DoFn instance belongs to.
+   * 
+   * <p>
+   * Called during the job planning phase by the crunch-client.
+   * </p>
+   * 
+   * @param conf
+   *          The Configuration instance for the Job.
+   */
+  public void configure(Configuration conf) {
+  }
+
+  /**
+   * Initialize this DoFn. This initialization will happen before the actual
+   * {@link #process(Object, Emitter)} is triggered. Subclasses may override
+   * this method to do appropriate initialization.
+   * 
+   * <p>
+   * Called during the setup of the job instance this {@code DoFn} is associated
+   * with.
+   * </p>
+   * 
+   */
+  public void initialize() {
+  }
+
+  /**
+   * Processes the records from a {@link PCollection}.
+   * 
+   * <br/>
+   * <br/>
+   * <b>Note:</b> Crunch can reuse a single input record object whose content
+   * changes on each {@link #process(Object, Emitter)} method call. This
+   * functionality is imposed by Hadoop's <a href=
+   * "http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/Reducer.html"
+   * >Reducer</a> implementation: <i>The framework will reuse the key and value
+   * objects that are passed into the reduce, therefore the application should
+   * clone the objects they want to keep a copy of.</i>
+   * 
+   * @param input
+   *          The input record.
+   * @param emitter
+   *          The emitter to send the output to
+   */
+  public abstract void process(S input, Emitter<T> emitter);
+
+  /**
+   * Called during the cleanup of the MapReduce job this {@code DoFn} is
+   * associated with. Subclasses may override this method to do appropriate
+   * cleanup.
+   * 
+   * @param emitter
+   *          The emitter that was used for output
+   */
+  public void cleanup(Emitter<T> emitter) {
+  }
+
+  /**
+   * Called during setup to pass the {@link TaskInputOutputContext} to this
+   * {@code DoFn} instance.
+   */
+  public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+    this.context = context;
+  }
+
+  /**
+   * Returns an estimate of how applying this function to a {@link PCollection}
+   * will cause it to change in side. The optimizer uses these estimates to
+   * decide where to break up dependent MR jobs into separate Map and Reduce
+   * phases in order to minimize I/O.
+   * 
+   * <p>
+   * Subclasses of {@code DoFn} that will substantially alter the size of the
+   * resulting {@code PCollection} should override this method.
+   */
+  public float scaleFactor() {
+    return 1.2f;
+  }
+
+  protected TaskInputOutputContext<?, ?, ?, ?> getContext() {
+    return context;
+  }
+
+  protected Configuration getConfiguration() {
+    return context.getConfiguration();
+  }
+
+  protected Counter getCounter(Enum<?> counterName) {
+    return context.getCounter(counterName);
+  }
+
+  protected Counter getCounter(String groupName, String counterName) {
+    return context.getCounter(groupName, counterName);
+  }
+
+  protected void increment(Enum<?> counterName) {
+    increment(counterName, 1);
+  }
+
+  protected void increment(Enum<?> counterName, long value) {
+    getCounter(counterName).increment(value);
+  }
+
+  protected void progress() {
+    context.progress();
+  }
+
+  protected TaskAttemptID getTaskAttemptID() {
+    return context.getTaskAttemptID();
+  }
+
+  protected void setStatus(String status) {
+    context.setStatus(status);
+  }
+
+  protected String getStatus() {
+    return context.getStatus();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/Emitter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Emitter.java b/crunch-core/src/main/java/org/apache/crunch/Emitter.java
new file mode 100644
index 0000000..d104a09
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/Emitter.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+/**
+ * Interface for writing outputs from a {@link DoFn}.
+ * 
+ */
+public interface Emitter<T> {
+  /**
+   * Write the emitted value to the next stage of the pipeline.
+   * 
+   * @param emitted
+   *          The value to write
+   */
+  void emit(T emitted);
+
+  /**
+   * Flushes any values cached by this emitter. Called during the cleanup stage.
+   */
+  void flush();
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/FilterFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/FilterFn.java b/crunch-core/src/main/java/org/apache/crunch/FilterFn.java
new file mode 100644
index 0000000..440f122
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/FilterFn.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.List;
+
+import org.apache.crunch.fn.FilterFns;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * A {@link DoFn} for the common case of filtering the members of a
+ * {@link PCollection} based on a boolean condition.
+ */
+public abstract class FilterFn<T> extends DoFn<T, T> {
+
+  /**
+   * If true, emit the given record.
+   */
+  public abstract boolean accept(T input);
+
+  @Override
+  public void process(T input, Emitter<T> emitter) {
+    if (accept(input)) {
+      emitter.emit(input);
+    }
+  }
+  
+  @Override
+  public final void cleanup(Emitter<T> emitter) {
+    cleanup();
+  }
+  
+  /**
+   * Called during the cleanup of the MapReduce job this {@code FilterFn} is
+   * associated with. Subclasses may override this method to do appropriate
+   * cleanup.
+   */
+  public void cleanup() {
+  }
+  
+  @Override
+  public float scaleFactor() {
+    return 0.5f;
+  }
+
+  /**
+   * @deprecated Use {@link FilterFns#and(FilterFn...)}
+   */
+  public static <S> FilterFn<S> and(FilterFn<S>... fns) {
+    return new AndFn<S>(fns);
+  }
+
+  /**
+   * @deprecated Use {@link FilterFns#and(FilterFn...)}
+   */
+  public static class AndFn<S> extends FilterFn<S> {
+
+    private final List<FilterFn<S>> fns;
+
+    public AndFn(FilterFn<S>... fns) {
+      this.fns = ImmutableList.<FilterFn<S>> copyOf(fns);
+    }
+    
+    @Override
+    public void configure(Configuration conf) {
+      for (FilterFn<S> fn : fns) {
+        fn.configure(conf);
+      }
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      for (FilterFn<S> fn : fns) {
+        fn.setContext(context);
+      }
+    }
+    
+    @Override
+    public void initialize() {
+      for (FilterFn<S> fn : fns) {
+        fn.initialize();
+      }
+    }
+
+    @Override
+    public void cleanup() {
+      for (FilterFn<S> fn : fns) {
+        fn.cleanup();
+      }
+    }
+
+    @Override
+    public boolean accept(S input) {
+      for (FilterFn<S> fn : fns) {
+        if (!fn.accept(input)) {
+          return false;
+        }
+      }
+      return true;
+    }
+    
+    @Override
+    public float scaleFactor() {
+      float scaleFactor = 1.0f;
+      for (FilterFn<S> fn : fns) {
+        scaleFactor *= fn.scaleFactor();
+      }
+      return scaleFactor;
+    }
+  }
+
+  /**
+   * @deprecated Use {@link FilterFns#or(FilterFn...)}
+   */
+  public static <S> FilterFn<S> or(FilterFn<S>... fns) {
+    return new OrFn<S>(fns);
+  }
+
+  /**
+   * @deprecated Use {@link FilterFns#or(FilterFn...)}
+   */
+  public static class OrFn<S> extends FilterFn<S> {
+
+    private final List<FilterFn<S>> fns;
+
+    public OrFn(FilterFn<S>... fns) {
+      this.fns = ImmutableList.<FilterFn<S>> copyOf(fns);
+    }
+    
+    @Override
+    public void configure(Configuration conf) {
+      for (FilterFn<S> fn : fns) {
+        fn.configure(conf);
+      }
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      for (FilterFn<S> fn : fns) {
+        fn.setContext(context);
+      }
+    }
+    
+    @Override
+    public void initialize() {
+      for (FilterFn<S> fn : fns) {
+        fn.initialize();
+      }
+    }
+    
+    @Override
+    public void cleanup() {
+      for (FilterFn<S> fn : fns) {
+        fn.cleanup();
+      }
+    }
+
+    @Override
+    public boolean accept(S input) {
+      for (FilterFn<S> fn : fns) {
+        if (fn.accept(input)) {
+          return true;
+        }
+      }
+      return false;
+    }
+    
+    @Override
+    public float scaleFactor() {
+      float scaleFactor = 0.0f;
+      for (FilterFn<S> fn : fns) {
+        scaleFactor += fn.scaleFactor();
+      }
+      return Math.min(1.0f, scaleFactor);
+    }
+  }
+
+  /**
+   * @deprecated Use {@link FilterFns#not(FilterFn)}
+   */
+  public static <S> FilterFn<S> not(FilterFn<S> fn) {
+    return new NotFn<S>(fn);
+  }
+
+  /**
+   * @deprecated Use {@link FilterFns#not(FilterFn)}
+   */
+  public static class NotFn<S> extends FilterFn<S> {
+
+    private final FilterFn<S> base;
+
+    public NotFn(FilterFn<S> base) {
+      this.base = base;
+    }
+    
+    @Override
+    public void configure(Configuration conf) {
+        base.configure(conf);
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      base.setContext(context);
+    }
+    
+    @Override
+    public void initialize() {
+      base.initialize();
+    }
+    
+    @Override
+    public void cleanup() {
+      base.cleanup();
+    }
+    
+    @Override
+    public boolean accept(S input) {
+      return !base.accept(input);
+    }
+
+    @Override
+    public float scaleFactor() {
+      return 1.0f - base.scaleFactor();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/GroupingOptions.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/GroupingOptions.java b/crunch-core/src/main/java/org/apache/crunch/GroupingOptions.java
new file mode 100644
index 0000000..4aa1343
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/GroupingOptions.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Options that can be passed to a {@code groupByKey} operation in order to
+ * exercise finer control over how the partitioning, grouping, and sorting of
+ * keys is performed.
+ * 
+ */
+public class GroupingOptions {
+
+  private final Class<? extends Partitioner> partitionerClass;
+  private final Class<? extends RawComparator> groupingComparatorClass;
+  private final Class<? extends RawComparator> sortComparatorClass;
+  private final int numReducers;
+  private final Map<String, String> extraConf;
+  private final Set<SourceTarget<?>> sourceTargets;
+  
+  private GroupingOptions(Class<? extends Partitioner> partitionerClass,
+      Class<? extends RawComparator> groupingComparatorClass, Class<? extends RawComparator> sortComparatorClass,
+      int numReducers, Map<String, String> extraConf, Set<SourceTarget<?>> sourceTargets) {
+    this.partitionerClass = partitionerClass;
+    this.groupingComparatorClass = groupingComparatorClass;
+    this.sortComparatorClass = sortComparatorClass;
+    this.numReducers = numReducers;
+    this.extraConf = extraConf;
+    this.sourceTargets = sourceTargets;
+  }
+
+  public int getNumReducers() {
+    return numReducers;
+  }
+
+  public Class<? extends RawComparator> getSortComparatorClass() {
+    return sortComparatorClass;
+  }
+
+  public Class<? extends RawComparator> getGroupingComparatorClass() {
+    return groupingComparatorClass;
+  }
+  
+  public Class<? extends Partitioner> getPartitionerClass() {
+    return partitionerClass;
+  }
+  
+  public Set<SourceTarget<?>> getSourceTargets() {
+    return sourceTargets;
+  }
+  
+  public void configure(Job job) {
+    if (partitionerClass != null) {
+      job.setPartitionerClass(partitionerClass);
+    }
+    if (groupingComparatorClass != null) {
+      job.setGroupingComparatorClass(groupingComparatorClass);
+    }
+    if (sortComparatorClass != null) {
+      job.setSortComparatorClass(sortComparatorClass);
+    }
+    if (numReducers > 0) {
+      job.setNumReduceTasks(numReducers);
+    }
+    for (Map.Entry<String, String> e : extraConf.entrySet()) {
+      job.getConfiguration().set(e.getKey(), e.getValue());
+    }
+  }
+
+  public boolean isCompatibleWith(GroupingOptions other) {
+    if (partitionerClass != other.partitionerClass) {
+      return false;
+    }
+    if (groupingComparatorClass != other.groupingComparatorClass) {
+      return false;
+    }
+    if (sortComparatorClass != other.sortComparatorClass) {
+      return false;
+    }
+    if (!extraConf.equals(other.extraConf)) {
+      return false;
+    }
+    return true;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder class for creating {@code GroupingOptions} instances.
+   * 
+   */
+  public static class Builder {
+    private Class<? extends Partitioner> partitionerClass;
+    private Class<? extends RawComparator> groupingComparatorClass;
+    private Class<? extends RawComparator> sortComparatorClass;
+    private int numReducers;
+    private Map<String, String> extraConf = Maps.newHashMap();
+    private Set<SourceTarget<?>> sourceTargets = Sets.newHashSet();
+    
+    public Builder() {
+    }
+
+    public Builder partitionerClass(Class<? extends Partitioner> partitionerClass) {
+      this.partitionerClass = partitionerClass;
+      return this;
+    }
+
+    public Builder groupingComparatorClass(Class<? extends RawComparator> groupingComparatorClass) {
+      this.groupingComparatorClass = groupingComparatorClass;
+      return this;
+    }
+
+    public Builder sortComparatorClass(Class<? extends RawComparator> sortComparatorClass) {
+      this.sortComparatorClass = sortComparatorClass;
+      return this;
+    }
+
+    public Builder numReducers(int numReducers) {
+      if (numReducers <= 0) {
+        throw new IllegalArgumentException("Invalid number of reducers: " + numReducers);
+      }
+      this.numReducers = numReducers;
+      return this;
+    }
+
+    public Builder conf(String confKey, String confValue) {
+      this.extraConf.put(confKey, confValue);
+      return this;
+    }
+    
+    public Builder sourceTarget(SourceTarget<?> st) {
+      this.sourceTargets.add(st);
+      return this;
+    }
+    
+    public GroupingOptions build() {
+      return new GroupingOptions(partitionerClass, groupingComparatorClass, sortComparatorClass,
+          numReducers, extraConf, sourceTargets);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/MapFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/MapFn.java b/crunch-core/src/main/java/org/apache/crunch/MapFn.java
new file mode 100644
index 0000000..dbf172e
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/MapFn.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+/**
+ * A {@link DoFn} for the common case of emitting exactly one value for each
+ * input record.
+ * 
+ */
+public abstract class MapFn<S, T> extends DoFn<S, T> {
+
+  /**
+   * Maps the given input into an instance of the output type.
+   */
+  public abstract T map(S input);
+
+  @Override
+  public void process(S input, Emitter<T> emitter) {
+    emitter.emit(map(input));
+  }
+
+  @Override
+  public float scaleFactor() {
+    return 1.0f;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PCollection.java b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
new file mode 100644
index 0000000..6f5abf6
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.Collection;
+
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+/**
+ * A representation of an immutable, distributed collection of elements that is
+ * the fundamental target of computations in Crunch.
+ *
+ */
+public interface PCollection<S> {
+  /**
+   * Returns the {@code Pipeline} associated with this PCollection.
+   */
+  Pipeline getPipeline();
+
+  /**
+   * Returns a {@code PCollection} instance that acts as the union of this
+   * {@code PCollection} and the given {@code PCollection}.
+   */
+  PCollection<S> union(PCollection<S> other);
+  
+  /**
+   * Returns a {@code PCollection} instance that acts as the union of this
+   * {@code PCollection} and the input {@code PCollection}s.
+   */
+  PCollection<S> union(PCollection<S>... collections);
+
+  /**
+   * Applies the given doFn to the elements of this {@code PCollection} and
+   * returns a new {@code PCollection} that is the output of this processing.
+   *
+   * @param doFn
+   *          The {@code DoFn} to apply
+   * @param type
+   *          The {@link PType} of the resulting {@code PCollection}
+   * @return a new {@code PCollection}
+   */
+  <T> PCollection<T> parallelDo(DoFn<S, T> doFn, PType<T> type);
+
+  /**
+   * Applies the given doFn to the elements of this {@code PCollection} and
+   * returns a new {@code PCollection} that is the output of this processing.
+   *
+   * @param name
+   *          An identifier for this processing step, useful for debugging
+   * @param doFn
+   *          The {@code DoFn} to apply
+   * @param type
+   *          The {@link PType} of the resulting {@code PCollection}
+   * @return a new {@code PCollection}
+   */
+  <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type);
+  
+  /**
+   * Applies the given doFn to the elements of this {@code PCollection} and
+   * returns a new {@code PCollection} that is the output of this processing.
+   *
+   * @param name
+   *          An identifier for this processing step, useful for debugging
+   * @param doFn
+   *          The {@code DoFn} to apply
+   * @param type
+   *          The {@link PType} of the resulting {@code PCollection}
+   * @param options
+   *          Optional information that is needed for certain pipeline operations
+   * @return a new {@code PCollection}
+   */
+  <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type,
+      ParallelDoOptions options);
+
+  /**
+   * Similar to the other {@code parallelDo} instance, but returns a
+   * {@code PTable} instance instead of a {@code PCollection}.
+   *
+   * @param doFn
+   *          The {@code DoFn} to apply
+   * @param type
+   *          The {@link PTableType} of the resulting {@code PTable}
+   * @return a new {@code PTable}
+   */
+  <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type);
+
+  /**
+   * Similar to the other {@code parallelDo} instance, but returns a
+   * {@code PTable} instance instead of a {@code PCollection}.
+   *
+   * @param name
+   *          An identifier for this processing step
+   * @param doFn
+   *          The {@code DoFn} to apply
+   * @param type
+   *          The {@link PTableType} of the resulting {@code PTable}
+   * @return a new {@code PTable}
+   */
+  <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type);
+  
+  /**
+   * Similar to the other {@code parallelDo} instance, but returns a
+   * {@code PTable} instance instead of a {@code PCollection}.
+   *
+   * @param name
+   *          An identifier for this processing step
+   * @param doFn
+   *          The {@code DoFn} to apply
+   * @param type
+   *          The {@link PTableType} of the resulting {@code PTable}
+   * @param options
+   *          Optional information that is needed for certain pipeline operations
+   * @return a new {@code PTable}
+   */
+  <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type,
+      ParallelDoOptions options);
+
+  /**
+   * Write the contents of this {@code PCollection} to the given {@code Target},
+   * using the storage format specified by the target.
+   *
+   * @param target
+   *          The target to write to
+   */
+  PCollection<S> write(Target target);
+
+  /**
+   * Write the contents of this {@code PCollection} to the given {@code Target},
+   * using the given {@code Target.WriteMode} to handle existing
+   * targets.
+   * 
+   * @param target
+   *          The target
+   * @param writeMode
+   *          The rule for handling existing outputs at the target location
+   */
+  PCollection<S> write(Target target, Target.WriteMode writeMode);
+  
+  /**
+   * Returns a reference to the data set represented by this PCollection that
+   * may be used by the client to read the data locally.
+   */
+  Iterable<S> materialize();
+
+  /**
+   * @return A {@code PObject} encapsulating an in-memory {@link Collection} containing the values
+   * of this {@code PCollection}.
+   */
+  PObject<Collection<S>> asCollection();
+
+  /**
+   * Returns the {@code PType} of this {@code PCollection}.
+   */
+  PType<S> getPType();
+
+  /**
+   * Returns the {@code PTypeFamily} of this {@code PCollection}.
+   */
+  PTypeFamily getTypeFamily();
+
+  /**
+   * Returns the size of the data represented by this {@code PCollection} in
+   * bytes.
+   */
+  long getSize();
+
+  /**
+   * Returns the number of elements represented by this {@code PCollection}.
+   *
+   * @return An {@code PObject} containing the number of elements in this {@code PCollection}.
+   */
+  PObject<Long> length();
+
+  /**
+   * Returns a shorthand name for this PCollection.
+   */
+  String getName();
+
+  /**
+   * Apply the given filter function to this instance and return the resulting
+   * {@code PCollection}.
+   */
+  PCollection<S> filter(FilterFn<S> filterFn);
+
+  /**
+   * Apply the given filter function to this instance and return the resulting
+   * {@code PCollection}.
+   *
+   * @param name
+   *          An identifier for this processing step
+   * @param filterFn
+   *          The {@code FilterFn} to apply
+   */
+  PCollection<S> filter(String name, FilterFn<S> filterFn);
+
+  /**
+   * Apply the given map function to each element of this instance in order to
+   * create a {@code PTable}.
+   */
+  <K> PTable<K, S> by(MapFn<S, K> extractKeyFn, PType<K> keyType);
+
+  /**
+   * Apply the given map function to each element of this instance in order to
+   * create a {@code PTable}.
+   *
+   * @param name
+   *          An identifier for this processing step
+   * @param extractKeyFn
+   *          The {@code MapFn} to apply
+   */
+  <K> PTable<K, S> by(String name, MapFn<S, K> extractKeyFn, PType<K> keyType);
+
+  /**
+   * Returns a {@code PTable} instance that contains the counts of each unique
+   * element of this PCollection.
+   */
+  PTable<S, Long> count();
+
+  /**
+   * Returns a {@code PObject} of the maximum element of this instance.
+   */
+  PObject<S> max();
+
+  /**
+   * Returns a {@code PObject} of the minimum element of this instance.
+   */
+  PObject<S> min();
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
new file mode 100644
index 0000000..d77ffdb
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.crunch.Aggregator;
+
+/**
+ * The Crunch representation of a grouped {@link PTable}.
+ * 
+ */
+public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
+
+  /**
+   * Combines the values of this grouping using the given {@code CombineFn}.
+   * 
+   * @param combineFn
+   *          The combiner function
+   * @return A {@code PTable} where each key has a single value
+   */
+  PTable<K, V> combineValues(CombineFn<K, V> combineFn);
+
+  /**
+   * Combine the values in each group using the given {@link Aggregator}.
+   *
+   * @param aggregator The function to use
+   * @return A {@link PTable} where each group key maps to an aggregated
+   *         value. Group keys may be repeated if an aggregator returns
+   *         more than one value.
+   */
+  PTable<K, V> combineValues(Aggregator<V> aggregator);
+
+  /**
+   * Convert this grouping back into a multimap.
+   * 
+   * @return an ungrouped version of the data in this {@code PGroupedTable}.
+   */
+  PTable<K, V> ungroup();
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/PObject.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PObject.java b/crunch-core/src/main/java/org/apache/crunch/PObject.java
new file mode 100644
index 0000000..897a01f
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/PObject.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+/**
+ * A {@code PObject} represents a singleton object value that results from a distributed
+ * computation. Computation producing the value is deferred until
+ * {@link org.apache.crunch.PObject#getValue()} is called.
+ *
+ * @param <T> The type of value encapsulated by this {@code PObject}.
+ */
+public interface PObject<T> {
+  /**
+   * Gets the value associated with this {@code PObject}.  Calling this method will trigger
+   * whatever computation is necessary to obtain the value and block until that computation
+   * succeeds.
+   *
+   * @return The value associated with this {@code PObject}.
+   */
+  T getValue();
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/PTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PTable.java b/crunch-core/src/main/java/org/apache/crunch/PTable.java
new file mode 100644
index 0000000..8df9853
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/PTable.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+/**
+ * A sub-interface of {@code PCollection} that represents an immutable,
+ * distributed multi-map of keys and values.
+ *
+ */
+public interface PTable<K, V> extends PCollection<Pair<K, V>> {
+
+  /**
+   Returns a {@code PTable} instance that acts as the union of this
+   * {@code PTable} and the other {@code PTable}s.
+   */
+  PTable<K, V> union(PTable<K, V> other);
+  
+  /**
+   * Returns a {@code PTable} instance that acts as the union of this
+   * {@code PTable} and the input {@code PTable}s.
+   */
+  PTable<K, V> union(PTable<K, V>... others);
+
+  /**
+   * Performs a grouping operation on the keys of this table.
+   *
+   * @return a {@code PGroupedTable} instance that represents the grouping
+   */
+  PGroupedTable<K, V> groupByKey();
+
+  /**
+   * Performs a grouping operation on the keys of this table, using the given
+   * number of partitions.
+   *
+   * @param numPartitions
+   *          The number of partitions for the data.
+   * @return a {@code PGroupedTable} instance that represents this grouping
+   */
+  PGroupedTable<K, V> groupByKey(int numPartitions);
+
+  /**
+   * Performs a grouping operation on the keys of this table, using the
+   * additional {@code GroupingOptions} to control how the grouping is executed.
+   *
+   * @param options
+   *          The grouping options to use
+   * @return a {@code PGroupedTable} instance that represents the grouping
+   */
+  PGroupedTable<K, V> groupByKey(GroupingOptions options);
+
+  /**
+   * Writes this {@code PTable} to the given {@code Target}.
+   */
+  PTable<K, V> write(Target target);
+
+  /**
+   * Writes this {@code PTable} to the given {@code Target}, using the
+   * given {@code Target.WriteMode} to handle existing targets.
+   */
+  PTable<K, V> write(Target target, Target.WriteMode writeMode);
+
+  /**
+   * Returns the {@code PTableType} of this {@code PTable}.
+   */
+  PTableType<K, V> getPTableType();
+
+  /**
+   * Returns the {@code PType} of the key.
+   */
+  PType<K> getKeyType();
+
+  /**
+   * Returns the {@code PType} of the value.
+   */
+  PType<V> getValueType();
+
+  /**
+   * Aggregate all of the values with the same key into a single key-value pair
+   * in the returned PTable.
+   */
+  PTable<K, Collection<V>> collectValues();
+
+  /**
+   * Apply the given filter function to this instance and return the resulting
+   * {@code PTable}.
+   */
+  PTable<K, V> filter(FilterFn<Pair<K, V>> filterFn);
+  
+  /**
+   * Apply the given filter function to this instance and return the resulting
+   * {@code PTable}.
+   *
+   * @param name
+   *          An identifier for this processing step
+   * @param filterFn
+   *          The {@code FilterFn} to apply
+   */
+  PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn);
+  
+  /**
+   * Returns a PTable made up of the pairs in this PTable with the largest value
+   * field.
+   *
+   * @param count
+   *          The number of pairs to return
+   */
+  PTable<K, V> top(int count);
+
+  /**
+   * Returns a PTable made up of the pairs in this PTable with the smallest
+   * value field.
+   *
+   * @param count
+   *          The number of pairs to return
+   */
+  PTable<K, V> bottom(int count);
+
+  /**
+   * Perform an inner join on this table and the one passed in as an argument on
+   * their common keys.
+   */
+  <U> PTable<K, Pair<V, U>> join(PTable<K, U> other);
+
+  /**
+   * Co-group operation with the given table on common keys.
+   */
+  <U> PTable<K, Pair<Collection<V>, Collection<U>>> cogroup(PTable<K, U> other);
+
+  /**
+   * Returns a {@link PCollection} made up of the keys in this PTable.
+   */
+  PCollection<K> keys();
+
+  /**
+   * Returns a {@link PCollection} made up of the values in this PTable.
+   */
+  PCollection<V> values();
+
+  /**
+   * Returns a Map<K, V> made up of the keys and values in this PTable.
+   * <p>
+   * <b>Note:</b> The contents of the returned map may not be exactly the same
+   * as this PTable, as a PTable is a multi-map (i.e. can contain multiple
+   * values for a single key).
+   */
+  Map<K, V> materializeToMap();
+
+  /**
+   * Returns a {@link PObject} encapsulating a {@link Map} made up of the keys and values in this
+   * {@code PTable}.
+   * <p><b>Note:</b>The contents of the returned map may not be exactly the same as this PTable,
+   * as a PTable is a multi-map (i.e. can contain multiple values for a single key).
+   * </p>
+   *
+   * @return The {@code PObject} encapsulating a {@code Map} made up of the keys and values in
+   * this {@code PTable}.
+   */
+  PObject<Map<K, V>> asMap();
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/Pair.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Pair.java b/crunch-core/src/main/java/org/apache/crunch/Pair.java
new file mode 100644
index 0000000..fd058b6
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/Pair.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+/**
+ * A convenience class for two-element {@link Tuple}s.
+ */
+public class Pair<K, V> implements Tuple, Comparable<Pair<K, V>> {
+
+  private final K first;
+  private final V second;
+
+  public static <T, U> Pair<T, U> of(T first, U second) {
+    return new Pair<T, U>(first, second);
+  }
+
+  public Pair(K first, V second) {
+    this.first = first;
+    this.second = second;
+  }
+
+  public K first() {
+    return first;
+  }
+
+  public V second() {
+    return second;
+  }
+
+  public Object get(int index) {
+    switch (index) {
+    case 0:
+      return first;
+    case 1:
+      return second;
+    default:
+      throw new ArrayIndexOutOfBoundsException();
+    }
+  }
+
+  public int size() {
+    return 2;
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    return hcb.append(first).append(second).toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    Pair<?, ?> other = (Pair<?, ?>) obj;
+    return (first == other.first || (first != null && first.equals(other.first)))
+        && (second == other.second || (second != null && second.equals(other.second)));
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("[");
+    sb.append(first).append(",").append(second).append("]");
+    return sb.toString();
+  }
+
+  private int cmp(Object lhs, Object rhs) {
+    if (lhs == rhs) {
+      return 0;
+    } else if (lhs != null && Comparable.class.isAssignableFrom(lhs.getClass())) {
+      return ((Comparable) lhs).compareTo(rhs);
+    }
+    return (lhs == null ? 0 : lhs.hashCode()) - (rhs == null ? 0 : rhs.hashCode());
+  }
+
+  @Override
+  public int compareTo(Pair<K, V> o) {
+    int diff = cmp(first, o.first);
+    if (diff == 0) {
+      diff = cmp(second, o.second);
+    }
+    return diff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java b/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java
new file mode 100644
index 0000000..2407b3a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/ParallelDoOptions.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.Collections;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Container class that includes optional information about a {@code parallelDo} operation
+ * applied to a {@code PCollection}. Primarily used within the Crunch framework
+ * itself for certain types of advanced processing operations, such as in-memory joins
+ * that require reading a file from the filesystem into a {@code DoFn}.
+ */
+public class ParallelDoOptions {
+  private final Set<SourceTarget<?>> sourceTargets;
+  
+  private ParallelDoOptions(Set<SourceTarget<?>> sourceTargets) {
+    this.sourceTargets = sourceTargets;
+  }
+  
+  public Set<SourceTarget<?>> getSourceTargets() {
+    return sourceTargets;
+  }
+  
+  public static Builder builder() {
+    return new Builder();
+  }
+  
+  public static class Builder {
+    private Set<SourceTarget<?>> sourceTargets;
+    
+    public Builder() {
+      this.sourceTargets = Sets.newHashSet();
+    }
+    
+    public Builder sourceTargets(SourceTarget<?>... sourceTargets) {
+      Collections.addAll(this.sourceTargets, sourceTargets);
+      return this;
+    }
+    
+    public ParallelDoOptions build() {
+      return new ParallelDoOptions(sourceTargets);
+    }
+  }
+}


Mime
View raw message