crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-285: Clean up deprecated Aggregator/FilterFn code
Date Tue, 29 Oct 2013 19:12:57 GMT
Updated Branches:
  refs/heads/master 09624fe3c -> cde5f34e7


CRUNCH-285: Clean up deprecated Aggregator/FilterFn code


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/cde5f34e
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/cde5f34e
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/cde5f34e

Branch: refs/heads/master
Commit: cde5f34e78804dd874521a4399a7110f2016b3b8
Parents: 09624fe
Author: Josh Wills <jwills@apache.org>
Authored: Sun Oct 20 17:03:43 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Tue Oct 29 12:10:39 2013 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/crunch/CombineFn.java  | 1164 ------------------
 .../main/java/org/apache/crunch/FilterFn.java   |  181 ---
 .../java/org/apache/crunch/fn/FilterFns.java    |  159 ++-
 .../java/org/apache/crunch/fn/MapKeysFn.java    |   35 -
 .../java/org/apache/crunch/fn/MapValuesFn.java  |   35 -
 .../test/java/org/apache/crunch/AndFnTest.java  |   77 --
 .../java/org/apache/crunch/CombineFnTest.java   |  222 ----
 .../test/java/org/apache/crunch/NotFnTest.java  |   72 --
 .../test/java/org/apache/crunch/OrFnTest.java   |   78 --
 .../java/org/apache/crunch/fn/AndFnTest.java    |   78 ++
 .../java/org/apache/crunch/fn/MapKeysTest.java  |   51 -
 .../org/apache/crunch/fn/MapValuesTest.java     |   50 -
 .../java/org/apache/crunch/fn/NotFnTest.java    |   72 ++
 .../java/org/apache/crunch/fn/OrFnTest.java     |   79 ++
 14 files changed, 385 insertions(+), 1968 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/cde5f34e/crunch-core/src/main/java/org/apache/crunch/CombineFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/CombineFn.java b/crunch-core/src/main/java/org/apache/crunch/CombineFn.java
index 71e8057..9bf7641 100644
--- a/crunch-core/src/main/java/org/apache/crunch/CombineFn.java
+++ b/crunch-core/src/main/java/org/apache/crunch/CombineFn.java
@@ -44,1168 +44,4 @@ import com.google.common.collect.Sets;
  * time will throw an {@link IllegalStateException}.
  */
 public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, T>> {
-
-  /**
-   * @deprecated Use {@link org.apache.crunch.Aggregator}
-   */
-  public static interface Aggregator<T> extends Serializable {
-    /**
-     * Perform any setup of this instance that is required prior to processing
-     * inputs.
-     */
-    void initialize(Configuration configuration);
-
-    /**
-     * Clears the internal state of this Aggregator and prepares it for the
-     * values associated with the next key.
-     */
-    void reset();
-
-    /**
-     * Incorporate the given value into the aggregate state maintained by this
-     * instance.
-     */
-    void update(T value);
-
-    /**
-     * Returns the current aggregated state of this instance.
-     */
-    Iterable<T> results();
-  }
-
-  /**
-   * Base class for aggregators that do not require any initialization.
-   *
-   * @deprecated Use {@link org.apache.crunch.fn.Aggregators.SimpleAggregator}
-   */
-  public static abstract class SimpleAggregator<T> implements Aggregator<T> {
-    @Override
-    public void initialize(Configuration conf) {
-      // No-op
-    }
-  }
-  
-  /**
-   * Interface for constructing new aggregator instances.
-   *
-   * @deprecated Use {@link PGroupedTable#combineValues(Aggregator)} which doesn't require a factory.
-   */
-  public static interface AggregatorFactory<T> {
-    Aggregator<T> create();
-  }
-
-  /**
-   * A {@code CombineFn} that delegates all of the actual work to an
-   * {@code Aggregator} instance.
-   *
-   * @deprecated Use the {@link Aggregators#toCombineFn(org.apache.crunch.Aggregator)} adapter
-   */
-  public static class AggregatorCombineFn<K, V> extends CombineFn<K, V> {
-
-    private final Aggregator<V> aggregator;
-
-    public AggregatorCombineFn(Aggregator<V> aggregator) {
-      this.aggregator = aggregator;
-    }
-
-    @Override
-    public void initialize() {
-      aggregator.initialize(getConfiguration());
-    }
-    
-    @Override
-    public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
-      aggregator.reset();
-      for (V v : input.second()) {
-        aggregator.update(v);
-      }
-      for (V v : aggregator.results()) {
-        emitter.emit(Pair.of(input.first(), v));
-      }
-    }
-  }
-
-  private static abstract class TupleAggregator<T> implements Aggregator<T> {
-    private final List<Aggregator<Object>> aggregators;
-
-    public TupleAggregator(Aggregator<?>... aggregators) {
-      this.aggregators = Lists.newArrayList();
-      for (Aggregator<?> a : aggregators) {
-        this.aggregators.add((Aggregator<Object>) a);
-      }
-    }
-
-    @Override
-    public void initialize(Configuration configuration) {
-      for (Aggregator<?> a : aggregators) {
-        a.initialize(configuration);
-      }
-    }
-    
-    @Override
-    public void reset() {
-      for (Aggregator<?> a : aggregators) {
-        a.reset();
-      }
-    }
-
-    protected void updateTuple(Tuple t) {
-      for (int i = 0; i < aggregators.size(); i++) {
-        aggregators.get(i).update(t.get(i));
-      }
-    }
-
-    protected Iterable<Object> results(int index) {
-      return aggregators.get(index).results();
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#pairAggregator(Aggregator, Aggregator)}
-   */
-  public static class PairAggregator<V1, V2> extends TupleAggregator<Pair<V1, V2>> {
-
-    public PairAggregator(Aggregator<V1> a1, Aggregator<V2> a2) {
-      super(a1, a2);
-    }
-
-    @Override
-    public void update(Pair<V1, V2> value) {
-      updateTuple(value);
-    }
-
-    @Override
-    public Iterable<Pair<V1, V2>> results() {
-      return new Tuples.PairIterable<V1, V2>((Iterable<V1>) results(0), (Iterable<V2>) results(1));
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#tripAggregator(Aggregator, Aggregator, Aggregator)}
-   */
-  public static class TripAggregator<A, B, C> extends TupleAggregator<Tuple3<A, B, C>> {
-
-    public TripAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3) {
-      super(a1, a2, a3);
-    }
-
-    @Override
-    public void update(Tuple3<A, B, C> value) {
-      updateTuple(value);
-    }
-
-    @Override
-    public Iterable<Tuple3<A, B, C>> results() {
-      return new Tuples.TripIterable<A, B, C>((Iterable<A>) results(0), (Iterable<B>) results(1),
-          (Iterable<C>) results(2));
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#quadAggregator(Aggregator, Aggregator, Aggregator, Aggregator)}
-   */
-  public static class QuadAggregator<A, B, C, D> extends TupleAggregator<Tuple4<A, B, C, D>> {
-
-    public QuadAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3, Aggregator<D> a4) {
-      super(a1, a2, a3, a4);
-    }
-
-    @Override
-    public void update(Tuple4<A, B, C, D> value) {
-      updateTuple(value);
-    }
-
-    @Override
-    public Iterable<Tuple4<A, B, C, D>> results() {
-      return new Tuples.QuadIterable<A, B, C, D>((Iterable<A>) results(0), (Iterable<B>) results(1),
-          (Iterable<C>) results(2), (Iterable<D>) results(3));
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#tupleAggregator(Aggregator...)}
-   */
-  public static class TupleNAggregator extends TupleAggregator<TupleN> {
-
-    private final int size;
-
-    public TupleNAggregator(Aggregator<?>... aggregators) {
-      super(aggregators);
-      size = aggregators.length;
-    }
-
-    @Override
-    public void update(TupleN value) {
-      updateTuple(value);
-    }
-
-    @Override
-    public Iterable<TupleN> results() {
-      Iterable<?>[] iterables = new Iterable[size];
-      for (int i = 0; i < size; i++) {
-        iterables[i] = results(i);
-      }
-      return new Tuples.TupleNIterable(iterables);
-    }
-
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#toCombineFn(Aggregator)}
-   */
-  public static final <K, V> CombineFn<K, V> aggregator(Aggregator<V> aggregator) {
-    return new AggregatorCombineFn<K, V>(aggregator);
-  }
-
-  /**
-   * @deprecated Use {@link PGroupedTable#combineValues(Aggregator)} which doesn't require a factory.
-   */
-  public static final <K, V> CombineFn<K, V> aggregatorFactory(AggregatorFactory<V> aggregator) {
-    return new AggregatorCombineFn<K, V>(aggregator.create());
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#pairAggregator(Aggregator, Aggregator)}
-   */
-  public static final <K, V1, V2> CombineFn<K, Pair<V1, V2>> pairAggregator(AggregatorFactory<V1> a1,
-      AggregatorFactory<V2> a2) {
-    return aggregator(new PairAggregator<V1, V2>(a1.create(), a2.create()));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#tripAggregator(Aggregator, Aggregator, Aggregator)}
-   */
-  public static final <K, A, B, C> CombineFn<K, Tuple3<A, B, C>> tripAggregator(AggregatorFactory<A> a1,
-      AggregatorFactory<B> a2, AggregatorFactory<C> a3) {
-    return aggregator(new TripAggregator<A, B, C>(a1.create(), a2.create(), a3.create()));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#quadAggregator(Aggregator, Aggregator, Aggregator, Aggregator)}
-   */
-  public static final <K, A, B, C, D> CombineFn<K, Tuple4<A, B, C, D>> quadAggregator(AggregatorFactory<A> a1,
-      AggregatorFactory<B> a2, AggregatorFactory<C> a3, AggregatorFactory<D> a4) {
-    return aggregator(new QuadAggregator<A, B, C, D>(a1.create(), a2.create(), a3.create(), a4.create()));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#tupleAggregator(Aggregator...)}
-   */
-  public static final <K> CombineFn<K, TupleN> tupleAggregator(AggregatorFactory<?>... factories) {
-    Aggregator<?>[] aggs = new Aggregator[factories.length];
-    for (int i = 0; i < aggs.length; i++) {
-      aggs[i] = factories[i].create();
-    }
-    return aggregator(new TupleNAggregator(aggs));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_LONGS()}
-   */
-  public static final <K> CombineFn<K, Long> SUM_LONGS() {
-    return aggregatorFactory(SUM_LONGS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_INTS()}
-   */
-  public static final <K> CombineFn<K, Integer> SUM_INTS() {
-    return aggregatorFactory(SUM_INTS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_FLOATS()}
-   */
-  public static final <K> CombineFn<K, Float> SUM_FLOATS() {
-    return aggregatorFactory(SUM_FLOATS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_DOUBLES()}
-   */
-  public static final <K> CombineFn<K, Double> SUM_DOUBLES() {
-    return aggregatorFactory(SUM_DOUBLES);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_BIGINTS()}
-   */
-  public static final <K> CombineFn<K, BigInteger> SUM_BIGINTS() {
-    return aggregatorFactory(SUM_BIGINTS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_LONGS()}
-   */
-  public static final <K> CombineFn<K, Long> MAX_LONGS() {
-    return aggregatorFactory(MAX_LONGS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_LONGS(int)}
-   */
-  public static final <K> CombineFn<K, Long> MAX_LONGS(int n) {
-    return aggregator(new MaxNAggregator<Long>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_INTS()}
-   */
-  public static final <K> CombineFn<K, Integer> MAX_INTS() {
-    return aggregatorFactory(MAX_INTS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_INTS(int)}
-   */
-  public static final <K> CombineFn<K, Integer> MAX_INTS(int n) {
-    return aggregator(new MaxNAggregator<Integer>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_FLOATS()}
-   */
-  public static final <K> CombineFn<K, Float> MAX_FLOATS() {
-    return aggregatorFactory(MAX_FLOATS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_FLOATS(int)}
-   */
-  public static final <K> CombineFn<K, Float> MAX_FLOATS(int n) {
-    return aggregator(new MaxNAggregator<Float>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_DOUBLES()}
-   */
-  public static final <K> CombineFn<K, Double> MAX_DOUBLES() {
-    return aggregatorFactory(MAX_DOUBLES);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_DOUBLES(int)}
-   */
-  public static final <K> CombineFn<K, Double> MAX_DOUBLES(int n) {
-    return aggregator(new MaxNAggregator<Double>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_BIGINTS()}
-   */
-  public static final <K> CombineFn<K, BigInteger> MAX_BIGINTS() {
-    return aggregatorFactory(MAX_BIGINTS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_BIGINTS(int)}
-   */
-  public static final <K> CombineFn<K, BigInteger> MAX_BIGINTS(int n) {
-    return aggregator(new MaxNAggregator<BigInteger>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_LONGS()}
-   */
-  public static final <K> CombineFn<K, Long> MIN_LONGS() {
-    return aggregatorFactory(MIN_LONGS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_LONGS(int)}
-   */
-  public static final <K> CombineFn<K, Long> MIN_LONGS(int n) {
-    return aggregator(new MinNAggregator<Long>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_INTS()}
-   */
-  public static final <K> CombineFn<K, Integer> MIN_INTS() {
-    return aggregatorFactory(MIN_INTS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_INTS(int)}
-   */
-  public static final <K> CombineFn<K, Integer> MIN_INTS(int n) {
-    return aggregator(new MinNAggregator<Integer>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_FLOATS()}
-   */
-  public static final <K> CombineFn<K, Float> MIN_FLOATS() {
-    return aggregatorFactory(MIN_FLOATS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_FLOATS(int)}
-   */
-  public static final <K> CombineFn<K, Float> MIN_FLOATS(int n) {
-    return aggregator(new MinNAggregator<Float>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_DOUBLES()}
-   */
-  public static final <K> CombineFn<K, Double> MIN_DOUBLES() {
-    return aggregatorFactory(MIN_DOUBLES);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_DOUBLES(int)}
-   */
-  public static final <K> CombineFn<K, Double> MIN_DOUBLES(int n) {
-    return aggregator(new MinNAggregator<Double>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_BIGINTS()}
-   */
-  public static final <K> CombineFn<K, BigInteger> MIN_BIGINTS() {
-    return aggregatorFactory(MIN_BIGINTS);
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_BIGINTS(int)}
-   */
-  public static final <K> CombineFn<K, BigInteger> MIN_BIGINTS(int n) {
-    return aggregator(new MinNAggregator<BigInteger>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#FIRST_N(int)}
-   */
-  public static final <K, V> CombineFn<K, V> FIRST_N(int n) {
-    return aggregator(new FirstNAggregator<V>(n));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#LAST_N(int)}
-   */
-  public static final <K, V> CombineFn<K, V> LAST_N(int n) {
-    return aggregator(new LastNAggregator<V>(n));
-  }
-
-  /**
-   * Used to concatenate strings, with a separator between each strings. There
-   * is no limits of length for the concatenated string.
-   * 
-   * @param separator
-   *            the separator which will be appended between each string
-   * @param skipNull
-   *            define if we should skip null values. Throw
-   *            NullPointerException if set to false and there is a null
-   *            value.
-   * @return
-   *
-   * @deprecated Use {@link Aggregators#STRING_CONCAT(String, boolean)}
-   */
-  public static final <K> CombineFn<K, String> STRING_CONCAT(final String separator, final boolean skipNull) {
-      return aggregator(new StringConcatAggregator(separator, skipNull));
-  }
-
-  /**
-   * Used to concatenate strings, with a separator between each strings. You
-   * can specify the maximum length of the output string and of the input
-   * strings, if they are > 0. If a value is <= 0, there is no limits.
-   * 
-   * Any too large string (or any string which would made the output too
-   * large) will be silently discarded.
-   * 
-   * @param separator
-   *            the separator which will be appended between each string
-   * @param skipNull
-   *            define if we should skip null values. Throw
-   *            NullPointerException if set to false and there is a null
-   *            value.
-   * @param maxOutputLength
-   *            the maximum length of the output string. If it's set <= 0,
-   *            there is no limits. The number of characters of the output
-   *            string will be < maxOutputLength.
-   * @param maxInputLength
-   *            the maximum length of the input strings. If it's set <= 0,
-   *            there is no limits. The number of characters of the int string
-   *            will be < maxInputLength to be concatenated.
-   * @return
-   *
-   * @deprecated Use {@link Aggregators#STRING_CONCAT(String, boolean, long, long)}
-   */
-  public static final <K> CombineFn<K, String> STRING_CONCAT(final String separator, final boolean skipNull, final long maxOutputLength, final long maxInputLength) {
-      return aggregator(new StringConcatAggregator(separator, skipNull, maxOutputLength, maxInputLength));
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_LONGS()}
-   */
-  public static class SumLongs extends SimpleAggregator<Long> {
-    private long sum = 0;
-
-    @Override
-    public void reset() {
-      sum = 0;
-    }
-
-    @Override
-    public void update(Long next) {
-      sum += next;
-    }
-
-    @Override
-    public Iterable<Long> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_LONGS()}
-   */
-  public static AggregatorFactory<Long> SUM_LONGS = new AggregatorFactory<Long>() {
-    public Aggregator<Long> create() {
-      return new SumLongs();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_INTS()}
-   */
-  public static class SumInts extends SimpleAggregator<Integer> {
-    private int sum = 0;
-
-    @Override
-    public void reset() {
-      sum = 0;
-    }
-
-    @Override
-    public void update(Integer next) {
-      sum += next;
-    }
-
-    @Override
-    public Iterable<Integer> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_INTS()}
-   */
-  public static AggregatorFactory<Integer> SUM_INTS = new AggregatorFactory<Integer>() {
-    public Aggregator<Integer> create() {
-      return new SumInts();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_FLOATS()}
-   */
-  public static class SumFloats extends SimpleAggregator<Float> {
-    private float sum = 0;
-
-    @Override
-    public void reset() {
-      sum = 0f;
-    }
-
-    @Override
-    public void update(Float next) {
-      sum += next;
-    }
-
-    @Override
-    public Iterable<Float> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_FLOATS()}
-   */
-  public static AggregatorFactory<Float> SUM_FLOATS = new AggregatorFactory<Float>() {
-    public Aggregator<Float> create() {
-      return new SumFloats();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_DOUBLES()}
-   */
-  public static class SumDoubles extends SimpleAggregator<Double> {
-    private double sum = 0;
-
-    @Override
-    public void reset() {
-      sum = 0f;
-    }
-
-    @Override
-    public void update(Double next) {
-      sum += next;
-    }
-
-    @Override
-    public Iterable<Double> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_DOUBLES()}
-   */
-  public static AggregatorFactory<Double> SUM_DOUBLES = new AggregatorFactory<Double>() {
-    public Aggregator<Double> create() {
-      return new SumDoubles();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_BIGINTS()}
-   */
-  public static class SumBigInts extends SimpleAggregator<BigInteger> {
-    private BigInteger sum = BigInteger.ZERO;
-
-    @Override
-    public void reset() {
-      sum = BigInteger.ZERO;
-    }
-
-    @Override
-    public void update(BigInteger next) {
-      sum = sum.add(next);
-    }
-
-    @Override
-    public Iterable<BigInteger> results() {
-      return ImmutableList.of(sum);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#SUM_BIGINTS()}
-   */
-  public static AggregatorFactory<BigInteger> SUM_BIGINTS = new AggregatorFactory<BigInteger>() {
-    public Aggregator<BigInteger> create() {
-      return new SumBigInts();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_LONGS()}
-   */
-  public static class MaxLongs extends SimpleAggregator<Long> {
-    private Long max = null;
-
-    @Override
-    public void reset() {
-      max = null;
-    }
-
-    @Override
-    public void update(Long next) {
-      if (max == null || max < next) {
-        max = next;
-      }
-    }
-
-    @Override
-    public Iterable<Long> results() {
-      return ImmutableList.of(max);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_LONGS()}
-   */
-  public static AggregatorFactory<Long> MAX_LONGS = new AggregatorFactory<Long>() {
-    public Aggregator<Long> create() {
-      return new MaxLongs();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_INTS()}
-   */
-  public static class MaxInts extends SimpleAggregator<Integer> {
-    private Integer max = null;
-
-    @Override
-    public void reset() {
-      max = null;
-    }
-
-    @Override
-    public void update(Integer next) {
-      if (max == null || max < next) {
-        max = next;
-      }
-    }
-
-    @Override
-    public Iterable<Integer> results() {
-      return ImmutableList.of(max);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_INTS()}
-   */
-  public static AggregatorFactory<Integer> MAX_INTS = new AggregatorFactory<Integer>() {
-    public Aggregator<Integer> create() {
-      return new MaxInts();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_FLOATS()}
-   */
-  public static class MaxFloats extends SimpleAggregator<Float> {
-    private Float max = null;
-
-    @Override
-    public void reset() {
-      max = null;
-    }
-
-    @Override
-    public void update(Float next) {
-      if (max == null || max < next) {
-        max = next;
-      }
-    }
-
-    @Override
-    public Iterable<Float> results() {
-      return ImmutableList.of(max);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_FLOATS()}
-   */
-  public static AggregatorFactory<Float> MAX_FLOATS = new AggregatorFactory<Float>() {
-    public Aggregator<Float> create() {
-      return new MaxFloats();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_DOUBLES()}
-   */
-  public static class MaxDoubles extends SimpleAggregator<Double> {
-    private Double max = null;
-
-    @Override
-    public void reset() {
-      max = null;
-    }
-
-    @Override
-    public void update(Double next) {
-      if (max == null || max < next) {
-        max = next;
-      }
-    }
-
-    @Override
-    public Iterable<Double> results() {
-      return ImmutableList.of(max);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_DOUBLES()}
-   */
-  public static AggregatorFactory<Double> MAX_DOUBLES = new AggregatorFactory<Double>() {
-    public Aggregator<Double> create() {
-      return new MaxDoubles();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_BIGINTS()}
-   */
-  public static class MaxBigInts extends SimpleAggregator<BigInteger> {
-    private BigInteger max = null;
-
-    @Override
-    public void reset() {
-      max = null;
-    }
-
-    @Override
-    public void update(BigInteger next) {
-      if (max == null || max.compareTo(next) < 0) {
-        max = next;
-      }
-    }
-
-    @Override
-    public Iterable<BigInteger> results() {
-      return ImmutableList.of(max);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_BIGINTS()}
-   */
-  public static AggregatorFactory<BigInteger> MAX_BIGINTS = new AggregatorFactory<BigInteger>() {
-    public Aggregator<BigInteger> create() {
-      return new MaxBigInts();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_LONGS()}
-   */
-  public static class MinLongs extends SimpleAggregator<Long> {
-    private Long min = null;
-
-    @Override
-    public void reset() {
-      min = null;
-    }
-
-    @Override
-    public void update(Long next) {
-      if (min == null || min > next) {
-        min = next;
-      }
-    }
-
-    @Override
-    public Iterable<Long> results() {
-      return ImmutableList.of(min);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_LONGS()}
-   */
-  public static AggregatorFactory<Long> MIN_LONGS = new AggregatorFactory<Long>() {
-    public Aggregator<Long> create() {
-      return new MinLongs();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_INTS()}
-   */
-  public static class MinInts extends SimpleAggregator<Integer> {
-    private Integer min = null;
-
-    @Override
-    public void reset() {
-      min = null;
-    }
-
-    @Override
-    public void update(Integer next) {
-      if (min == null || min > next) {
-        min = next;
-      }
-    }
-
-    @Override
-    public Iterable<Integer> results() {
-      return ImmutableList.of(min);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_INTS()}
-   */
-  public static AggregatorFactory<Integer> MIN_INTS = new AggregatorFactory<Integer>() {
-    public Aggregator<Integer> create() {
-      return new MinInts();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_FLOATS()}
-   */
-  public static class MinFloats extends SimpleAggregator<Float> {
-    private Float min = null;
-
-    @Override
-    public void reset() {
-      min = null;
-    }
-
-    @Override
-    public void update(Float next) {
-      if (min == null || min > next) {
-        min = next;
-      }
-    }
-
-    @Override
-    public Iterable<Float> results() {
-      return ImmutableList.of(min);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_FLOATS()}
-   */
-  public static AggregatorFactory<Float> MIN_FLOATS = new AggregatorFactory<Float>() {
-    public Aggregator<Float> create() {
-      return new MinFloats();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_DOUBLES()}
-   */
-  public static class MinDoubles extends SimpleAggregator<Double> {
-    private Double min = null;
-
-    @Override
-    public void reset() {
-      min = null;
-    }
-
-    @Override
-    public void update(Double next) {
-      if (min == null || min > next) {
-        min = next;
-      }
-    }
-
-    @Override
-    public Iterable<Double> results() {
-      return ImmutableList.of(min);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_DOUBLES()}
-   */
-  public static AggregatorFactory<Double> MIN_DOUBLES = new AggregatorFactory<Double>() {
-    public Aggregator<Double> create() {
-      return new MinDoubles();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_BIGINTS()}
-   */
-  public static class MinBigInts extends SimpleAggregator<BigInteger> {
-    private BigInteger min = null;
-
-    @Override
-    public void reset() {
-      min = null;
-    }
-
-    @Override
-    public void update(BigInteger next) {
-      if (min == null || min.compareTo(next) > 0) {
-        min = next;
-      }
-    }
-
-    @Override
-    public Iterable<BigInteger> results() {
-      return ImmutableList.of(min);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_BIGINTS()}
-   */
-  public static AggregatorFactory<BigInteger> MIN_BIGINTS = new AggregatorFactory<BigInteger>() {
-    public Aggregator<BigInteger> create() {
-      return new MinBigInts();
-    }
-  };
-
-  /**
-   * @deprecated Use {@link Aggregators#MAX_N(int, Class)}
-   */
-  public static class MaxNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
-    private final int arity;
-    private transient SortedSet<V> elements;
-
-    public MaxNAggregator(int arity) {
-      this.arity = arity;
-    }
-
-    @Override
-    public void reset() {
-      if (elements == null) {
-        elements = Sets.newTreeSet();
-      } else {
-        elements.clear();
-      }
-    }
-
-    @Override
-    public void update(V value) {
-      if (elements.size() < arity) {
-        elements.add(value);
-      } else if (value.compareTo(elements.first()) > 0) {
-        elements.remove(elements.first());
-        elements.add(value);
-      }
-    }
-
-    @Override
-    public Iterable<V> results() {
-      return ImmutableList.copyOf(elements);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#MIN_N(int, Class)}
-   */
-  public static class MinNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
-    private final int arity;
-    private transient SortedSet<V> elements;
-
-    public MinNAggregator(int arity) {
-      this.arity = arity;
-    }
-
-    @Override
-    public void reset() {
-      if (elements == null) {
-        elements = Sets.newTreeSet();
-      } else {
-        elements.clear();
-      }
-    }
-
-    @Override
-    public void update(V value) {
-      if (elements.size() < arity) {
-        elements.add(value);
-      } else if (value.compareTo(elements.last()) < 0) {
-        elements.remove(elements.last());
-        elements.add(value);
-      }
-    }
-
-    @Override
-    public Iterable<V> results() {
-      return ImmutableList.copyOf(elements);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#FIRST_N(int)}
-   */
-  public static class FirstNAggregator<V> extends SimpleAggregator<V> {
-    private final int arity;
-    private final List<V> elements;
-
-    public FirstNAggregator(int arity) {
-      this.arity = arity;
-      this.elements = Lists.newArrayList();
-    }
-
-    @Override
-    public void reset() {
-      elements.clear();
-    }
-
-    @Override
-    public void update(V value) {
-      if (elements.size() < arity) {
-        elements.add(value);
-      }
-    }
-
-    @Override
-    public Iterable<V> results() {
-      return ImmutableList.copyOf(elements);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#LAST_N(int)}
-   */
-  public static class LastNAggregator<V> extends SimpleAggregator<V> {
-    private final int arity;
-    private final LinkedList<V> elements;
-
-    public LastNAggregator(int arity) {
-      this.arity = arity;
-      this.elements = Lists.newLinkedList();
-    }
-
-    @Override
-    public void reset() {
-      elements.clear();
-    }
-
-    @Override
-    public void update(V value) {
-      elements.add(value);
-      if (elements.size() == arity + 1) {
-        elements.removeFirst();
-      }
-    }
-
-    @Override
-    public Iterable<V> results() {
-      return ImmutableList.copyOf(elements);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link Aggregators#STRING_CONCAT(String, boolean, long, long)}
-   */
-  public static class StringConcatAggregator extends SimpleAggregator<String> {
-    private final String separator;
-    private final boolean skipNulls;
-    private final long maxOutputLength;
-    private final long maxInputLength;
-    private long currentLength;
-    private final LinkedList<String> list = new LinkedList<String>();
-
-    private transient Joiner joiner;
-    
-    public StringConcatAggregator(final String separator, final boolean skipNulls) {
-      this.separator = separator;
-      this.skipNulls = skipNulls;
-      this.maxInputLength = 0;
-      this.maxOutputLength = 0;
-    }
-
-    public StringConcatAggregator(final String separator, final boolean skipNull, final long maxOutputLength, final long maxInputLength) {
-      this.separator = separator;
-      this.skipNulls = skipNull;
-      this.maxOutputLength = maxOutputLength;
-      this.maxInputLength = maxInputLength;
-      this.currentLength = -separator.length();
-    }
-
-    @Override
-    public void reset() {
-      if (joiner == null) {
-        joiner = skipNulls ? Joiner.on(separator).skipNulls() : Joiner.on(separator);
-      }
-      currentLength = -separator.length();
-      list.clear();
-    }
-
-    @Override
-    public void update(final String next) {
-      long length = (next == null) ? 0 : next.length() + separator.length();
-      if (maxOutputLength > 0 && currentLength + length > maxOutputLength || maxInputLength > 0 && next.length() > maxInputLength) {
-        return;
-      }
-      if (maxOutputLength > 0) {
-        currentLength += length;
-      }
-      list.add(next);
-    }
-
-    @Override
-    public Iterable<String> results() {
-      return ImmutableList.of(joiner.join(list));
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/cde5f34e/crunch-core/src/main/java/org/apache/crunch/FilterFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/FilterFn.java b/crunch-core/src/main/java/org/apache/crunch/FilterFn.java
index 440f122..79412f6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/FilterFn.java
+++ b/crunch-core/src/main/java/org/apache/crunch/FilterFn.java
@@ -60,185 +60,4 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
   public float scaleFactor() {
     return 0.5f;
   }
-
-  /**
-   * @deprecated Use {@link FilterFns#and(FilterFn...)}
-   */
-  public static <S> FilterFn<S> and(FilterFn<S>... fns) {
-    return new AndFn<S>(fns);
-  }
-
-  /**
-   * @deprecated Use {@link FilterFns#and(FilterFn...)}
-   */
-  public static class AndFn<S> extends FilterFn<S> {
-
-    private final List<FilterFn<S>> fns;
-
-    public AndFn(FilterFn<S>... fns) {
-      this.fns = ImmutableList.<FilterFn<S>> copyOf(fns);
-    }
-    
-    @Override
-    public void configure(Configuration conf) {
-      for (FilterFn<S> fn : fns) {
-        fn.configure(conf);
-      }
-    }
-
-    @Override
-    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-      for (FilterFn<S> fn : fns) {
-        fn.setContext(context);
-      }
-    }
-    
-    @Override
-    public void initialize() {
-      for (FilterFn<S> fn : fns) {
-        fn.initialize();
-      }
-    }
-
-    @Override
-    public void cleanup() {
-      for (FilterFn<S> fn : fns) {
-        fn.cleanup();
-      }
-    }
-
-    @Override
-    public boolean accept(S input) {
-      for (FilterFn<S> fn : fns) {
-        if (!fn.accept(input)) {
-          return false;
-        }
-      }
-      return true;
-    }
-    
-    @Override
-    public float scaleFactor() {
-      float scaleFactor = 1.0f;
-      for (FilterFn<S> fn : fns) {
-        scaleFactor *= fn.scaleFactor();
-      }
-      return scaleFactor;
-    }
-  }
-
-  /**
-   * @deprecated Use {@link FilterFns#or(FilterFn...)}
-   */
-  public static <S> FilterFn<S> or(FilterFn<S>... fns) {
-    return new OrFn<S>(fns);
-  }
-
-  /**
-   * @deprecated Use {@link FilterFns#or(FilterFn...)}
-   */
-  public static class OrFn<S> extends FilterFn<S> {
-
-    private final List<FilterFn<S>> fns;
-
-    public OrFn(FilterFn<S>... fns) {
-      this.fns = ImmutableList.<FilterFn<S>> copyOf(fns);
-    }
-    
-    @Override
-    public void configure(Configuration conf) {
-      for (FilterFn<S> fn : fns) {
-        fn.configure(conf);
-      }
-    }
-
-    @Override
-    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-      for (FilterFn<S> fn : fns) {
-        fn.setContext(context);
-      }
-    }
-    
-    @Override
-    public void initialize() {
-      for (FilterFn<S> fn : fns) {
-        fn.initialize();
-      }
-    }
-    
-    @Override
-    public void cleanup() {
-      for (FilterFn<S> fn : fns) {
-        fn.cleanup();
-      }
-    }
-
-    @Override
-    public boolean accept(S input) {
-      for (FilterFn<S> fn : fns) {
-        if (fn.accept(input)) {
-          return true;
-        }
-      }
-      return false;
-    }
-    
-    @Override
-    public float scaleFactor() {
-      float scaleFactor = 0.0f;
-      for (FilterFn<S> fn : fns) {
-        scaleFactor += fn.scaleFactor();
-      }
-      return Math.min(1.0f, scaleFactor);
-    }
-  }
-
-  /**
-   * @deprecated Use {@link FilterFns#not(FilterFn)}
-   */
-  public static <S> FilterFn<S> not(FilterFn<S> fn) {
-    return new NotFn<S>(fn);
-  }
-
-  /**
-   * @deprecated Use {@link FilterFns#not(FilterFn)}
-   */
-  public static class NotFn<S> extends FilterFn<S> {
-
-    private final FilterFn<S> base;
-
-    public NotFn(FilterFn<S> base) {
-      this.base = base;
-    }
-    
-    @Override
-    public void configure(Configuration conf) {
-        base.configure(conf);
-    }
-
-    @Override
-    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
-      base.setContext(context);
-    }
-    
-    @Override
-    public void initialize() {
-      base.initialize();
-    }
-    
-    @Override
-    public void cleanup() {
-      base.cleanup();
-    }
-    
-    @Override
-    public boolean accept(S input) {
-      return !base.accept(input);
-    }
-
-    @Override
-    public float scaleFactor() {
-      return 1.0f - base.scaleFactor();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/cde5f34e/crunch-core/src/main/java/org/apache/crunch/fn/FilterFns.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/FilterFns.java b/crunch-core/src/main/java/org/apache/crunch/fn/FilterFns.java
index 8dc4268..323613b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/fn/FilterFns.java
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/FilterFns.java
@@ -17,10 +17,12 @@
  */
 package org.apache.crunch.fn;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.crunch.FilterFn;
-import org.apache.crunch.FilterFn.AndFn;
-import org.apache.crunch.FilterFn.NotFn;
-import org.apache.crunch.FilterFn.OrFn;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import java.util.List;
 
 
 /**
@@ -97,6 +99,157 @@ public final class FilterFns {
     return not(new AcceptAllFn<S>());
   }
 
+  private static class AndFn<S> extends FilterFn<S> {
+
+    private final List<FilterFn<S>> fns;
+
+    public AndFn(FilterFn<S>... fns) {
+      this.fns = ImmutableList.<FilterFn<S>> copyOf(fns);
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      for (FilterFn<S> fn : fns) {
+        fn.configure(conf);
+      }
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      for (FilterFn<S> fn : fns) {
+        fn.setContext(context);
+      }
+    }
+
+    @Override
+    public void initialize() {
+      for (FilterFn<S> fn : fns) {
+        fn.initialize();
+      }
+    }
+
+    @Override
+    public void cleanup() {
+      for (FilterFn<S> fn : fns) {
+        fn.cleanup();
+      }
+    }
+
+    @Override
+    public boolean accept(S input) {
+      for (FilterFn<S> fn : fns) {
+        if (!fn.accept(input)) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public float scaleFactor() {
+      float scaleFactor = 1.0f;
+      for (FilterFn<S> fn : fns) {
+        scaleFactor *= fn.scaleFactor();
+      }
+      return scaleFactor;
+    }
+  }
+
+  private static class OrFn<S> extends FilterFn<S> {
+
+    private final List<FilterFn<S>> fns;
+
+    public OrFn(FilterFn<S>... fns) {
+      this.fns = ImmutableList.<FilterFn<S>> copyOf(fns);
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      for (FilterFn<S> fn : fns) {
+        fn.configure(conf);
+      }
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      for (FilterFn<S> fn : fns) {
+        fn.setContext(context);
+      }
+    }
+
+    @Override
+    public void initialize() {
+      for (FilterFn<S> fn : fns) {
+        fn.initialize();
+      }
+    }
+
+    @Override
+    public void cleanup() {
+      for (FilterFn<S> fn : fns) {
+        fn.cleanup();
+      }
+    }
+
+    @Override
+    public boolean accept(S input) {
+      for (FilterFn<S> fn : fns) {
+        if (fn.accept(input)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public float scaleFactor() {
+      float scaleFactor = 0.0f;
+      for (FilterFn<S> fn : fns) {
+        scaleFactor += fn.scaleFactor();
+      }
+      return Math.min(1.0f, scaleFactor);
+    }
+  }
+
+  private static class NotFn<S> extends FilterFn<S> {
+
+    private final FilterFn<S> base;
+
+    public NotFn(FilterFn<S> base) {
+      this.base = base;
+    }
+
+    @Override
+    public void configure(Configuration conf) {
+      base.configure(conf);
+    }
+
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      base.setContext(context);
+    }
+
+    @Override
+    public void initialize() {
+      base.initialize();
+    }
+
+    @Override
+    public void cleanup() {
+      base.cleanup();
+    }
+
+    @Override
+    public boolean accept(S input) {
+      return !base.accept(input);
+    }
+
+    @Override
+    public float scaleFactor() {
+      return 1.0f - base.scaleFactor();
+    }
+  }
+
   private static class AcceptAllFn<S> extends FilterFn<S> {
     @Override
     public boolean accept(S input) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/cde5f34e/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java
deleted file mode 100644
index 1dd8130..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/fn/MapKeysFn.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-
-/**
- * @deprecated Use {@link org.apache.crunch.PTable#mapKeys(org.apache.crunch.MapFn, org.apache.crunch.types.PType)}
- */
-public abstract class MapKeysFn<K1, K2, V> extends DoFn<Pair<K1, V>, Pair<K2, V>> {
-
-  @Override
-  public void process(Pair<K1, V> input, Emitter<Pair<K2, V>> emitter) {
-    emitter.emit(Pair.of(map(input.first()), input.second()));
-  }
-
-  public abstract K2 map(K1 k1);
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/cde5f34e/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java
deleted file mode 100644
index 9b171f4..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/fn/MapValuesFn.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.Pair;
-
-/**
- * @deprecated Use {@link org.apache.crunch.PTable#mapValues(org.apache.crunch.MapFn, org.apache.crunch.types.PType)}
- */
-public abstract class MapValuesFn<K, V1, V2> extends DoFn<Pair<K, V1>, Pair<K, V2>> {
-
-  @Override
-  public void process(Pair<K, V1> input, Emitter<Pair<K, V2>> emitter) {
-    emitter.emit(Pair.of(input.first(), map(input.second())));
-  }
-
-  public abstract V2 map(V1 v);
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/cde5f34e/crunch-core/src/test/java/org/apache/crunch/AndFnTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/AndFnTest.java b/crunch-core/src/test/java/org/apache/crunch/AndFnTest.java
deleted file mode 100644
index 4b00874..0000000
--- a/crunch-core/src/test/java/org/apache/crunch/AndFnTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.crunch.FilterFn.AndFn;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AndFnTest {
-
-  private FilterFn<Integer> fnA;
-  private FilterFn<Integer> fnB;
-  private AndFn<Integer> andFn;
-
-  @Before
-  public void setUp() {
-    fnA = mock(FilterFn.class);
-    fnB = mock(FilterFn.class);
-    andFn = new AndFn(fnA, fnB);
-  }
-
-  @Test
-  public void testSetContext() {
-    TaskInputOutputContext<?, ?, ?, ?> context = mock(TaskInputOutputContext.class);
-    andFn.setContext(context);
-
-    verify(fnA).setContext(context);
-    verify(fnB).setContext(context);
-  }
-
-  @Test
-  public void testAccept_False() {
-    when(fnA.accept(1)).thenReturn(true);
-    when(fnB.accept(1)).thenReturn(false);
-
-    assertFalse(andFn.accept(1));
-  }
-
-  @Test
-  public void testAccept_True() {
-    when(fnA.accept(1)).thenReturn(true);
-    when(fnB.accept(1)).thenReturn(true);
-
-    assertTrue(andFn.accept(1));
-  }
-
-  @Test
-  public void testCleanup() {
-    andFn.cleanup(mock(Emitter.class));
-
-    verify(fnA).cleanup();
-    verify(fnB).cleanup();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/cde5f34e/crunch-core/src/test/java/org/apache/crunch/CombineFnTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/CombineFnTest.java b/crunch-core/src/test/java/org/apache/crunch/CombineFnTest.java
deleted file mode 100644
index 39548e2..0000000
--- a/crunch-core/src/test/java/org/apache/crunch/CombineFnTest.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.apache.crunch.CombineFn.MAX_BIGINTS;
-import static org.apache.crunch.CombineFn.MAX_DOUBLES;
-import static org.apache.crunch.CombineFn.MAX_FLOATS;
-import static org.apache.crunch.CombineFn.MAX_INTS;
-import static org.apache.crunch.CombineFn.MAX_LONGS;
-import static org.apache.crunch.CombineFn.MIN_BIGINTS;
-import static org.apache.crunch.CombineFn.MIN_DOUBLES;
-import static org.apache.crunch.CombineFn.MIN_FLOATS;
-import static org.apache.crunch.CombineFn.MIN_INTS;
-import static org.apache.crunch.CombineFn.MIN_LONGS;
-import static org.apache.crunch.CombineFn.SUM_BIGINTS;
-import static org.apache.crunch.CombineFn.SUM_DOUBLES;
-import static org.apache.crunch.CombineFn.SUM_FLOATS;
-import static org.apache.crunch.CombineFn.SUM_INTS;
-import static org.apache.crunch.CombineFn.SUM_LONGS;
-import static org.junit.Assert.assertEquals;
-
-import java.math.BigInteger;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.crunch.CombineFn.Aggregator;
-import org.apache.crunch.CombineFn.AggregatorFactory;
-import org.apache.crunch.CombineFn.FirstNAggregator;
-import org.apache.crunch.CombineFn.LastNAggregator;
-import org.apache.crunch.CombineFn.MaxNAggregator;
-import org.apache.crunch.CombineFn.MinNAggregator;
-import org.apache.crunch.CombineFn.PairAggregator;
-import org.apache.crunch.CombineFn.QuadAggregator;
-import org.apache.crunch.CombineFn.StringConcatAggregator;
-import org.apache.crunch.CombineFn.TripAggregator;
-import org.apache.crunch.CombineFn.TupleNAggregator;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-
-public class CombineFnTest {
-
-  private <T> Iterable<T> applyAggregator(AggregatorFactory<T> a, Iterable<T> values) {
-    return applyAggregator(a.create(), values);
-  }
-
-  private <T> Iterable<T> applyAggregator(Aggregator<T> a, Iterable<T> values) {
-    a.reset();
-    for (T value : values) {
-      a.update(value);
-    }
-    return a.results();
-  }
-
-  @Test
-  public void testSums() {
-    assertEquals(ImmutableList.of(1775L), applyAggregator(SUM_LONGS, ImmutableList.of(29L, 17L, 1729L)));
-
-    assertEquals(ImmutableList.of(1765L), applyAggregator(SUM_LONGS, ImmutableList.of(29L, 7L, 1729L)));
-
-    assertEquals(ImmutableList.of(1775), applyAggregator(SUM_INTS, ImmutableList.of(29, 17, 1729)));
-
-    assertEquals(ImmutableList.of(1775.0f), applyAggregator(SUM_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
-
-    assertEquals(ImmutableList.of(1775.0), applyAggregator(SUM_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
-
-    assertEquals(
-        ImmutableList.of(new BigInteger("1775")),
-        applyAggregator(SUM_BIGINTS,
-            ImmutableList.of(new BigInteger("29"), new BigInteger("17"), new BigInteger("1729"))));
-  }
-
-  @Test
-  public void testMax() {
-    assertEquals(ImmutableList.of(1729L), applyAggregator(MAX_LONGS, ImmutableList.of(29L, 17L, 1729L)));
-
-    assertEquals(ImmutableList.of(1729), applyAggregator(MAX_INTS, ImmutableList.of(29, 17, 1729)));
-
-    assertEquals(ImmutableList.of(1729.0f), applyAggregator(MAX_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
-
-    assertEquals(ImmutableList.of(1729.0), applyAggregator(MAX_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
-
-    assertEquals(ImmutableList.of(1745.0f), applyAggregator(MAX_FLOATS, ImmutableList.of(29f, 1745f, 17f, 1729f)));
-
-    assertEquals(
-        ImmutableList.of(new BigInteger("1729")),
-        applyAggregator(MAX_BIGINTS,
-            ImmutableList.of(new BigInteger("29"), new BigInteger("17"), new BigInteger("1729"))));
-  }
-
-  @Test
-  public void testMin() {
-    assertEquals(ImmutableList.of(17L), applyAggregator(MIN_LONGS, ImmutableList.of(29L, 17L, 1729L)));
-
-    assertEquals(ImmutableList.of(17), applyAggregator(MIN_INTS, ImmutableList.of(29, 17, 1729)));
-
-    assertEquals(ImmutableList.of(17.0f), applyAggregator(MIN_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
-
-    assertEquals(ImmutableList.of(17.0), applyAggregator(MIN_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
-
-    assertEquals(ImmutableList.of(29), applyAggregator(MIN_INTS, ImmutableList.of(29, 170, 1729)));
-
-    assertEquals(
-        ImmutableList.of(new BigInteger("17")),
-        applyAggregator(MIN_BIGINTS,
-            ImmutableList.of(new BigInteger("29"), new BigInteger("17"), new BigInteger("1729"))));
-  }
-
-  @Test
-  public void testMaxN() {
-    assertEquals(ImmutableList.of(98, 1009),
-        applyAggregator(new MaxNAggregator<Integer>(2), ImmutableList.of(17, 34, 98, 29, 1009)));
-  }
-
-  @Test
-  public void testMinN() {
-    assertEquals(ImmutableList.of(17, 29),
-        applyAggregator(new MinNAggregator<Integer>(2), ImmutableList.of(17, 34, 98, 29, 1009)));
-  }
-
-  @Test
-  public void testFirstN() {
-    assertEquals(ImmutableList.of(17, 34),
-        applyAggregator(new FirstNAggregator<Integer>(2), ImmutableList.of(17, 34, 98, 29, 1009)));
-  }
-
-  @Test
-  public void testLastN() {
-    assertEquals(ImmutableList.of(29, 1009),
-        applyAggregator(new LastNAggregator<Integer>(2), ImmutableList.of(17, 34, 98, 29, 1009)));
-  }
-
-  @Test
-  public void testPairs() {
-    List<Pair<Long, Double>> input = ImmutableList.of(Pair.of(1720L, 17.29), Pair.of(9L, -3.14));
-    Aggregator<Pair<Long, Double>> a = new PairAggregator<Long, Double>(SUM_LONGS.create(), MIN_DOUBLES.create());
-    assertEquals(Pair.of(1729L, -3.14), Iterables.getOnlyElement(applyAggregator(a, input)));
-  }
-
-  @Test
-  public void testPairsTwoLongs() {
-    List<Pair<Long, Long>> input = ImmutableList.of(Pair.of(1720L, 1L), Pair.of(9L, 19L));
-    Aggregator<Pair<Long, Long>> a = new PairAggregator<Long, Long>(SUM_LONGS.create(), SUM_LONGS.create());
-    assertEquals(Pair.of(1729L, 20L), Iterables.getOnlyElement(applyAggregator(a, input)));
-  }
-
-  @Test
-  public void testTrips() {
-    List<Tuple3<Float, Double, Double>> input = ImmutableList.of(Tuple3.of(17.29f, 12.2, 0.1),
-        Tuple3.of(3.0f, 1.2, 3.14), Tuple3.of(-1.0f, 14.5, -0.98));
-    Aggregator<Tuple3<Float, Double, Double>> a = new TripAggregator<Float, Double, Double>(MAX_FLOATS.create(),
-        MAX_DOUBLES.create(), MIN_DOUBLES.create());
-    assertEquals(Tuple3.of(17.29f, 14.5, -0.98), Iterables.getOnlyElement(applyAggregator(a, input)));
-  }
-
-  @Test
-  public void testQuads() {
-    List<Tuple4<Float, Double, Double, Integer>> input = ImmutableList.of(Tuple4.of(17.29f, 12.2, 0.1, 1),
-        Tuple4.of(3.0f, 1.2, 3.14, 2), Tuple4.of(-1.0f, 14.5, -0.98, 3));
-    Aggregator<Tuple4<Float, Double, Double, Integer>> a = new QuadAggregator<Float, Double, Double, Integer>(
-        MAX_FLOATS.create(), MAX_DOUBLES.create(), MIN_DOUBLES.create(), SUM_INTS.create());
-    assertEquals(Tuple4.of(17.29f, 14.5, -0.98, 6), Iterables.getOnlyElement(applyAggregator(a, input)));
-  }
-
-  @Test
-  public void testTupleN() {
-    List<TupleN> input = ImmutableList.of(new TupleN(1, 3.0, 1, 2.0, 4L), new TupleN(4, 17.0, 1, 9.7, 12L));
-    Aggregator<TupleN> a = new TupleNAggregator(MIN_INTS.create(), SUM_DOUBLES.create(), MAX_INTS.create(),
-        MIN_DOUBLES.create(), MAX_LONGS.create());
-    assertEquals(new TupleN(1, 20.0, 1, 2.0, 12L), Iterables.getOnlyElement(applyAggregator(a, input)));
-  }
-
-  @Test
-  public void testConcatenation() {
-    String[] arrayNull = new String[] { null, "" };
-    assertEquals(ImmutableList.of("foofoobarbar"), applyAggregator(
-        new StringConcatAggregator("", true), ImmutableList.of("foo", "foobar", "bar")));
-    assertEquals(ImmutableList.of("foo/foobar/bar"), applyAggregator(
-        new StringConcatAggregator("/", false), ImmutableList.of("foo", "foobar", "bar")));
-    assertEquals(ImmutableList.of("  "), applyAggregator(
-        new StringConcatAggregator(" ", true), ImmutableList.of(" ", "")));
-    assertEquals(ImmutableList.of(""), applyAggregator(
-        new StringConcatAggregator(" ", true), Arrays.asList(arrayNull)));
-    assertEquals(ImmutableList.of("foo bar"), applyAggregator(
-        new StringConcatAggregator(" ", true, 20, 3), ImmutableList.of("foo", "foobar", "bar")));
-    assertEquals(ImmutableList.of("foo foobar"), applyAggregator(
-        new StringConcatAggregator(" ", true, 10, 6), ImmutableList.of("foo", "foobar", "bar")));
-    assertEquals(ImmutableList.of("foo bar"), applyAggregator(
-        new StringConcatAggregator(" ", true, 9, 6), ImmutableList.of("foo", "foobar", "bar")));
-  }
-
-  @Test
-  public void testConcatenationReset() {
-    StringConcatAggregator a = new StringConcatAggregator(" ", true, 10, 6);
-
-    assertEquals(ImmutableList.of("foo foobar"), applyAggregator(a, ImmutableList.of("foo", "foobar", "bar")));
-    assertEquals(ImmutableList.of("foo foobar"), applyAggregator(a, ImmutableList.of("foo", "foobar", "bar")));
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testConcatenationNullException() {
-    String[] arrayNull = new String[] { null, "" };
-    assertEquals(ImmutableList.of(""), applyAggregator(
-        new StringConcatAggregator(" ", false), Arrays.asList(arrayNull)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/cde5f34e/crunch-core/src/test/java/org/apache/crunch/NotFnTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/NotFnTest.java b/crunch-core/src/test/java/org/apache/crunch/NotFnTest.java
deleted file mode 100644
index 8af17a2..0000000
--- a/crunch-core/src/test/java/org/apache/crunch/NotFnTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.*;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.crunch.FilterFn.NotFn;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.junit.Before;
-import org.junit.Test;
-
-public class NotFnTest {
-  
-  private FilterFn<Integer> base;
-  private NotFn<Integer> notFn;
-  
-  @Before
-  public void setUp() {
-    base = mock(FilterFn.class);
-    notFn = new NotFn(base);
-  }
-
-  @Test
-  public void testSetContext() {
-    TaskInputOutputContext<?, ?, ?, ?> context = mock(TaskInputOutputContext.class);
-    
-    notFn.setContext(context);
-    
-    verify(base).setContext(context);
-  }
-
-  @Test
-  public void testAccept_True() {
-    when(base.accept(1)).thenReturn(true);
-    
-    assertFalse(notFn.accept(1));
-  }
-  
-  @Test
-  public void testAccept_False() {
-    when(base.accept(1)).thenReturn(false);
-    
-    assertTrue(notFn.accept(1));
-  }
-
-  @Test
-  public void testCleanupEmitterOfT() {
-    notFn.cleanup(mock(Emitter.class));
-    
-    verify(base).cleanup();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/cde5f34e/crunch-core/src/test/java/org/apache/crunch/OrFnTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/OrFnTest.java b/crunch-core/src/test/java/org/apache/crunch/OrFnTest.java
deleted file mode 100644
index fde2376..0000000
--- a/crunch-core/src/test/java/org/apache/crunch/OrFnTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.crunch.FilterFn.OrFn;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.junit.Before;
-import org.junit.Test;
-
-public class OrFnTest {
-
-  private FilterFn<Integer> fnA;
-  private FilterFn<Integer> fnB;
-  private OrFn<Integer> orFn;
-
-  @Before
-  public void setUp() {
-    fnA = mock(FilterFn.class);
-    fnB = mock(FilterFn.class);
-    orFn = new OrFn(fnA, fnB);
-  }
-
-  @Test
-  public void testSetContext() {
-    TaskInputOutputContext<?, ?, ?, ?> context = mock(TaskInputOutputContext.class);
-
-    orFn.setContext(context);
-
-    verify(fnA).setContext(context);
-    verify(fnB).setContext(context);
-  }
-
-  @Test
-  public void testAccept_True() {
-    when(fnA.accept(1)).thenReturn(false);
-    when(fnB.accept(1)).thenReturn(true);
-
-    assertTrue(orFn.accept(1));
-  }
-
-  @Test
-  public void testAccept_False() {
-    when(fnA.accept(1)).thenReturn(false);
-    when(fnB.accept(1)).thenReturn(false);
-
-    assertFalse(orFn.accept(1));
-  }
-
-  @Test
-  public void testCleanupEmitterOfT() {
-    orFn.cleanup(mock(Emitter.class));
-
-    verify(fnA).cleanup();
-    verify(fnB).cleanup();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/cde5f34e/crunch-core/src/test/java/org/apache/crunch/fn/AndFnTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/AndFnTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/AndFnTest.java
new file mode 100644
index 0000000..fdac6fc
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/AndFnTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.FilterFn;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AndFnTest {
+
+  private FilterFn<Integer> fnA;
+  private FilterFn<Integer> fnB;
+  private FilterFn<Integer> andFn;
+
+  @Before
+  public void setUp() {
+    fnA = mock(FilterFn.class);
+    fnB = mock(FilterFn.class);
+    andFn = FilterFns.and(fnA, fnB);
+  }
+
+  @Test
+  public void testSetContext() {
+    TaskInputOutputContext<?, ?, ?, ?> context = mock(TaskInputOutputContext.class);
+    andFn.setContext(context);
+
+    verify(fnA).setContext(context);
+    verify(fnB).setContext(context);
+  }
+
+  @Test
+  public void testAccept_False() {
+    when(fnA.accept(1)).thenReturn(true);
+    when(fnB.accept(1)).thenReturn(false);
+
+    assertFalse(andFn.accept(1));
+  }
+
+  @Test
+  public void testAccept_True() {
+    when(fnA.accept(1)).thenReturn(true);
+    when(fnB.accept(1)).thenReturn(true);
+
+    assertTrue(andFn.accept(1));
+  }
+
+  @Test
+  public void testCleanup() {
+    andFn.cleanup(mock(Emitter.class));
+
+    verify(fnA).cleanup();
+    verify(fnB).cleanup();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/cde5f34e/crunch-core/src/test/java/org/apache/crunch/fn/MapKeysTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/MapKeysTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/MapKeysTest.java
deleted file mode 100644
index 6b73700..0000000
--- a/crunch-core/src/test/java/org/apache/crunch/fn/MapKeysTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.crunch.Pair;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class MapKeysTest {
-
-  protected static final MapKeysFn<String, Integer, Integer> one = new MapKeysFn<String, Integer, Integer>() {
-    @Override
-    public Integer map(String input) {
-      return 1;
-    }
-  };
-
-  protected static final MapKeysFn<String, Integer, Integer> two = new MapKeysFn<String, Integer, Integer>() {
-    @Override
-    public Integer map(String input) {
-      return 2;
-    }
-  };
-
-  @Test
-  public void test() {
-    StoreLastEmitter<Pair<Integer, Integer>> emitter = StoreLastEmitter.create();
-    one.process(Pair.of("k", Integer.MAX_VALUE), emitter);
-    assertEquals(Pair.of(1, Integer.MAX_VALUE), emitter.getLast());
-    two.process(Pair.of("k", Integer.MAX_VALUE), emitter);
-    assertEquals(Pair.of(2, Integer.MAX_VALUE), emitter.getLast());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/cde5f34e/crunch-core/src/test/java/org/apache/crunch/fn/MapValuesTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/MapValuesTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/MapValuesTest.java
deleted file mode 100644
index 097b008..0000000
--- a/crunch-core/src/test/java/org/apache/crunch/fn/MapValuesTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.crunch.Pair;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class MapValuesTest {
-
-  static final MapValuesFn<String, String, Integer> one = new MapValuesFn<String, String, Integer>() {
-    @Override
-    public Integer map(String input) {
-      return 1;
-    }
-  };
-
-  static final MapValuesFn<String, String, Integer> two = new MapValuesFn<String, String, Integer>() {
-    @Override
-    public Integer map(String input) {
-      return 2;
-    }
-  };
-
-  @Test
-  public void test() {
-    StoreLastEmitter<Pair<String, Integer>> emitter = StoreLastEmitter.create();
-    one.process(Pair.of("k", "v"), emitter);
-    assertEquals(Pair.of("k", 1), emitter.getLast());
-    two.process(Pair.of("k", "v"), emitter);
-    assertEquals(Pair.of("k", 2), emitter.getLast());
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/cde5f34e/crunch-core/src/test/java/org/apache/crunch/fn/NotFnTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/NotFnTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/NotFnTest.java
new file mode 100644
index 0000000..952fd04
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/NotFnTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.FilterFn;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NotFnTest {
+  
+  private FilterFn<Integer> base;
+  private FilterFn<Integer> notFn;
+  
+  @Before
+  public void setUp() {
+    base = mock(FilterFn.class);
+    notFn = FilterFns.not(base);
+  }
+
+  @Test
+  public void testSetContext() {
+    TaskInputOutputContext<?, ?, ?, ?> context = mock(TaskInputOutputContext.class);
+    
+    notFn.setContext(context);
+    
+    verify(base).setContext(context);
+  }
+
+  @Test
+  public void testAccept_True() {
+    when(base.accept(1)).thenReturn(true);
+    
+    assertFalse(notFn.accept(1));
+  }
+  
+  @Test
+  public void testAccept_False() {
+    when(base.accept(1)).thenReturn(false);
+    
+    assertTrue(notFn.accept(1));
+  }
+
+  @Test
+  public void testCleanupEmitterOfT() {
+    notFn.cleanup(mock(Emitter.class));
+    
+    verify(base).cleanup();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/cde5f34e/crunch-core/src/test/java/org/apache/crunch/fn/OrFnTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/OrFnTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/OrFnTest.java
new file mode 100644
index 0000000..1a2f462
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/OrFnTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.crunch.Emitter;
+import org.apache.crunch.FilterFn;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.junit.Before;
+import org.junit.Test;
+
+public class OrFnTest {
+
+  private FilterFn<Integer> fnA;
+  private FilterFn<Integer> fnB;
+  private FilterFn<Integer> orFn;
+
+  @Before
+  public void setUp() {
+    fnA = mock(FilterFn.class);
+    fnB = mock(FilterFn.class);
+    orFn = FilterFns.or(fnA, fnB);
+  }
+
+  @Test
+  public void testSetContext() {
+    TaskInputOutputContext<?, ?, ?, ?> context = mock(TaskInputOutputContext.class);
+
+    orFn.setContext(context);
+
+    verify(fnA).setContext(context);
+    verify(fnB).setContext(context);
+  }
+
+  @Test
+  public void testAccept_True() {
+    when(fnA.accept(1)).thenReturn(false);
+    when(fnB.accept(1)).thenReturn(true);
+
+    assertTrue(orFn.accept(1));
+  }
+
+  @Test
+  public void testAccept_False() {
+    when(fnA.accept(1)).thenReturn(false);
+    when(fnB.accept(1)).thenReturn(false);
+
+    assertFalse(orFn.accept(1));
+  }
+
+  @Test
+  public void testCleanupEmitterOfT() {
+    orFn.cleanup(mock(Emitter.class));
+
+    verify(fnA).cleanup();
+    verify(fnB).cleanup();
+  }
+
+}


Mime
View raw message