incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject [3/10] Format all sources according to formatting profile
Date Sat, 14 Jul 2012 18:14:55 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/CombineFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/CombineFn.java b/crunch/src/main/java/org/apache/crunch/CombineFn.java
index e552286..fe65d74 100644
--- a/crunch/src/main/java/org/apache/crunch/CombineFn.java
+++ b/crunch/src/main/java/org/apache/crunch/CombineFn.java
@@ -24,57 +24,59 @@ import java.util.List;
 import java.util.SortedSet;
 
 import org.apache.crunch.util.Tuples;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 /**
- * A special {@link DoFn} implementation that converts an {@link Iterable}
- * of values into a single value. If a {@code CombineFn} instance is used
- * on a {@link PGroupedTable}, the function will be applied to the output
- * of the map stage before the data is passed to the reducer, which can
- * improve the runtime of certain classes of jobs.
- *
+ * A special {@link DoFn} implementation that converts an {@link Iterable} of
+ * values into a single value. If a {@code CombineFn} instance is used on a
+ * {@link PGroupedTable}, the function will be applied to the output of the map
+ * stage before the data is passed to the reducer, which can improve the runtime
+ * of certain classes of jobs.
+ * 
  */
 public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, T>> {
-  
+
   public static interface Aggregator<T> extends Serializable {
     /**
-     * Clears the internal state of this Aggregator and prepares it for the values associated
-     * with the next key.
+     * Clears the internal state of this Aggregator and prepares it for the
+     * values associated with the next key.
      */
     void reset();
 
     /**
-     * Incorporate the given value into the aggregate state maintained by this instance.
+     * Incorporate the given value into the aggregate state maintained by this
+     * instance.
      */
     void update(T value);
-    
+
     /**
      * Returns the current aggregated state of this instance.
      */
-    Iterable<T> results();    
+    Iterable<T> results();
   }
-  
+
   /**
    * Interface for constructing new aggregator instances.
    */
   public static interface AggregatorFactory<T> {
     Aggregator<T> create();
   }
-  
+
   /**
-   * A {@code CombineFn} that delegates all of the actual work to an {@code Aggregator}
-   * instance.
+   * A {@code CombineFn} that delegates all of the actual work to an
+   * {@code Aggregator} instance.
    */
   public static class AggregatorCombineFn<K, V> extends CombineFn<K, V> {
-    
+
     private final Aggregator<V> aggregator;
-    
+
     public AggregatorCombineFn(Aggregator<V> aggregator) {
       this.aggregator = aggregator;
     }
-    
+
     @Override
     public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
       aggregator.reset();
@@ -84,99 +86,99 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
       for (V v : aggregator.results()) {
         emitter.emit(Pair.of(input.first(), v));
       }
-    }    
+    }
   }
-  
-  private static abstract class TupleAggregator<T> implements Aggregator<T> {   
+
+  private static abstract class TupleAggregator<T> implements Aggregator<T> {
     private final List<Aggregator<Object>> aggregators;
-    
-    public TupleAggregator(Aggregator<?>...aggregators) {
+
+    public TupleAggregator(Aggregator<?>... aggregators) {
       this.aggregators = Lists.newArrayList();
       for (Aggregator<?> a : aggregators) {
         this.aggregators.add((Aggregator<Object>) a);
       }
     }
-    
+
     @Override
     public void reset() {
       for (Aggregator<?> a : aggregators) {
         a.reset();
       }
     }
-    
+
     protected void updateTuple(Tuple t) {
       for (int i = 0; i < aggregators.size(); i++) {
         aggregators.get(i).update(t.get(i));
       }
     }
-    
+
     protected Iterable<Object> results(int index) {
       return aggregators.get(index).results();
     }
   }
-  
+
   public static class PairAggregator<V1, V2> extends TupleAggregator<Pair<V1, V2>> {
-    
+
     public PairAggregator(Aggregator<V1> a1, Aggregator<V2> a2) {
       super(a1, a2);
     }
-    
+
     @Override
     public void update(Pair<V1, V2> value) {
       updateTuple(value);
     }
-    
+
     @Override
     public Iterable<Pair<V1, V2>> results() {
       return new Tuples.PairIterable<V1, V2>((Iterable<V1>) results(0), (Iterable<V2>) results(1));
     }
   }
-  
+
   public static class TripAggregator<A, B, C> extends TupleAggregator<Tuple3<A, B, C>> {
-    
+
     public TripAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3) {
       super(a1, a2, a3);
     }
-    
+
     @Override
     public void update(Tuple3<A, B, C> value) {
       updateTuple(value);
     }
-    
+
     @Override
     public Iterable<Tuple3<A, B, C>> results() {
-      return new Tuples.TripIterable<A, B, C>((Iterable<A>) results(0),
-          (Iterable<B>) results(1), (Iterable<C>) results(2));
+      return new Tuples.TripIterable<A, B, C>((Iterable<A>) results(0), (Iterable<B>) results(1),
+          (Iterable<C>) results(2));
     }
   }
 
   public static class QuadAggregator<A, B, C, D> extends TupleAggregator<Tuple4<A, B, C, D>> {
-    
+
     public QuadAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3, Aggregator<D> a4) {
       super(a1, a2, a3, a4);
     }
-    
+
     @Override
     public void update(Tuple4<A, B, C, D> value) {
       updateTuple(value);
     }
-    
+
     @Override
     public Iterable<Tuple4<A, B, C, D>> results() {
-      return new Tuples.QuadIterable<A, B, C, D>((Iterable<A>) results(0),
-          (Iterable<B>) results(1), (Iterable<C>) results(2), (Iterable<D>) results(3));
+      return new Tuples.QuadIterable<A, B, C, D>((Iterable<A>) results(0), (Iterable<B>) results(1),
+          (Iterable<C>) results(2), (Iterable<D>) results(3));
     }
   }
-  
+
   public static class TupleNAggregator extends TupleAggregator<TupleN> {
-    
+
     private final int size;
-    
+
     public TupleNAggregator(Aggregator<?>... aggregators) {
       super(aggregators);
       size = aggregators.length;
     }
-    
+
     @Override
     public void update(TupleN value) {
       updateTuple(value);
@@ -190,32 +192,30 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
       }
       return new Tuples.TupleNIterable(iterables);
     }
-    
+
   }
-  
+
   public static final <K, V> CombineFn<K, V> aggregator(Aggregator<V> aggregator) {
     return new AggregatorCombineFn<K, V>(aggregator);
   }
-  
+
   public static final <K, V> CombineFn<K, V> aggregatorFactory(AggregatorFactory<V> aggregator) {
     return new AggregatorCombineFn<K, V>(aggregator.create());
   }
-  
-  public static final <K, V1, V2> CombineFn<K, Pair<V1, V2>> pairAggregator(
-      AggregatorFactory<V1> a1, AggregatorFactory<V2> a2) {
+
+  public static final <K, V1, V2> CombineFn<K, Pair<V1, V2>> pairAggregator(AggregatorFactory<V1> a1,
+      AggregatorFactory<V2> a2) {
     return aggregator(new PairAggregator<V1, V2>(a1.create(), a2.create()));
   }
-  
-  public static final <K, A, B, C> CombineFn<K, Tuple3<A, B, C>> tripAggregator(
-      AggregatorFactory<A> a1, AggregatorFactory<B> a2, AggregatorFactory<C> a3) {
+
+  public static final <K, A, B, C> CombineFn<K, Tuple3<A, B, C>> tripAggregator(AggregatorFactory<A> a1,
+      AggregatorFactory<B> a2, AggregatorFactory<C> a3) {
     return aggregator(new TripAggregator<A, B, C>(a1.create(), a2.create(), a3.create()));
   }
 
-  public static final <K, A, B, C, D> CombineFn<K, Tuple4<A, B, C, D>> quadAggregator(
-      AggregatorFactory<A> a1, AggregatorFactory<B> a2, AggregatorFactory<C> a3,
-      AggregatorFactory<D> a4) {
-    return aggregator(new QuadAggregator<A, B, C, D>(a1.create(), a2.create(), a3.create(),
-        a4.create()));
+  public static final <K, A, B, C, D> CombineFn<K, Tuple4<A, B, C, D>> quadAggregator(AggregatorFactory<A> a1,
+      AggregatorFactory<B> a2, AggregatorFactory<C> a3, AggregatorFactory<D> a4) {
+    return aggregator(new QuadAggregator<A, B, C, D>(a1.create(), a2.create(), a3.create(), a4.create()));
   }
 
   public static final <K> CombineFn<K, TupleN> tupleAggregator(AggregatorFactory<?>... factories) {
@@ -225,7 +225,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
     return aggregator(new TupleNAggregator(aggs));
   }
-  
+
   public static final <K> CombineFn<K, Long> SUM_LONGS() {
     return aggregatorFactory(SUM_LONGS);
   }
@@ -241,11 +241,11 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
   public static final <K> CombineFn<K, Double> SUM_DOUBLES() {
     return aggregatorFactory(SUM_DOUBLES);
   }
- 
+
   public static final <K> CombineFn<K, BigInteger> SUM_BIGINTS() {
     return aggregatorFactory(SUM_BIGINTS);
   }
-  
+
   public static final <K> CombineFn<K, Long> MAX_LONGS() {
     return aggregatorFactory(MAX_LONGS);
   }
@@ -253,7 +253,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
   public static final <K> CombineFn<K, Long> MAX_LONGS(int n) {
     return aggregator(new MaxNAggregator<Long>(n));
   }
-  
+
   public static final <K> CombineFn<K, Integer> MAX_INTS() {
     return aggregatorFactory(MAX_INTS);
   }
@@ -277,15 +277,15 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
   public static final <K> CombineFn<K, Double> MAX_DOUBLES(int n) {
     return aggregator(new MaxNAggregator<Double>(n));
   }
-  
+
   public static final <K> CombineFn<K, BigInteger> MAX_BIGINTS() {
     return aggregatorFactory(MAX_BIGINTS);
   }
-  
+
   public static final <K> CombineFn<K, BigInteger> MAX_BIGINTS(int n) {
     return aggregator(new MaxNAggregator<BigInteger>(n));
   }
-  
+
   public static final <K> CombineFn<K, Long> MIN_LONGS() {
     return aggregatorFactory(MIN_LONGS);
   }
@@ -301,7 +301,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
   public static final <K> CombineFn<K, Integer> MIN_INTS(int n) {
     return aggregator(new MinNAggregator<Integer>(n));
   }
-  
+
   public static final <K> CombineFn<K, Float> MIN_FLOATS() {
     return aggregatorFactory(MIN_FLOATS);
   }
@@ -309,7 +309,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
   public static final <K> CombineFn<K, Float> MIN_FLOATS(int n) {
     return aggregator(new MinNAggregator<Float>(n));
   }
-  
+
   public static final <K> CombineFn<K, Double> MIN_DOUBLES() {
     return aggregatorFactory(MIN_DOUBLES);
   }
@@ -317,15 +317,15 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
   public static final <K> CombineFn<K, Double> MIN_DOUBLES(int n) {
     return aggregator(new MinNAggregator<Double>(n));
   }
-  
+
   public static final <K> CombineFn<K, BigInteger> MIN_BIGINTS() {
     return aggregatorFactory(MIN_BIGINTS);
   }
-  
+
   public static final <K> CombineFn<K, BigInteger> MIN_BIGINTS(int n) {
     return aggregator(new MinNAggregator<BigInteger>(n));
   }
-  
+
   public static final <K, V> CombineFn<K, V> FIRST_N(int n) {
     return aggregator(new FirstNAggregator<V>(n));
   }
@@ -333,10 +333,10 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
   public static final <K, V> CombineFn<K, V> LAST_N(int n) {
     return aggregator(new LastNAggregator<V>(n));
   }
-  
-  public static class SumLongs implements Aggregator<Long> {    
+
+  public static class SumLongs implements Aggregator<Long> {
     private long sum = 0;
-    
+
     @Override
     public void reset() {
       sum = 0;
@@ -346,19 +346,22 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     public void update(Long next) {
       sum += next;
     }
-    
+
     @Override
     public Iterable<Long> results() {
       return ImmutableList.of(sum);
     }
   }
+
   public static AggregatorFactory<Long> SUM_LONGS = new AggregatorFactory<Long>() {
-    public Aggregator<Long> create() { return new SumLongs(); }
+    public Aggregator<Long> create() {
+      return new SumLongs();
+    }
   };
-  
+
   public static class SumInts implements Aggregator<Integer> {
     private int sum = 0;
-    
+
     @Override
     public void reset() {
       sum = 0;
@@ -368,19 +371,22 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     public void update(Integer next) {
       sum += next;
     }
-    
+
     @Override
     public Iterable<Integer> results() {
       return ImmutableList.of(sum);
     }
   }
+
   public static AggregatorFactory<Integer> SUM_INTS = new AggregatorFactory<Integer>() {
-    public Aggregator<Integer> create() { return new SumInts(); }
+    public Aggregator<Integer> create() {
+      return new SumInts();
+    }
   };
-  
-  public static class SumFloats implements Aggregator<Float> {    
+
+  public static class SumFloats implements Aggregator<Float> {
     private float sum = 0;
-    
+
     @Override
     public void reset() {
       sum = 0f;
@@ -390,19 +396,22 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     public void update(Float next) {
       sum += next;
     }
-    
+
     @Override
     public Iterable<Float> results() {
       return ImmutableList.of(sum);
     }
   }
+
   public static AggregatorFactory<Float> SUM_FLOATS = new AggregatorFactory<Float>() {
-    public Aggregator<Float> create() { return new SumFloats(); }
+    public Aggregator<Float> create() {
+      return new SumFloats();
+    }
   };
-  
-  public static class SumDoubles implements Aggregator<Double> {    
+
+  public static class SumDoubles implements Aggregator<Double> {
     private double sum = 0;
-    
+
     @Override
     public void reset() {
       sum = 0f;
@@ -412,19 +421,22 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     public void update(Double next) {
       sum += next;
     }
-    
+
     @Override
     public Iterable<Double> results() {
       return ImmutableList.of(sum);
     }
   }
+
   public static AggregatorFactory<Double> SUM_DOUBLES = new AggregatorFactory<Double>() {
-    public Aggregator<Double> create() { return new SumDoubles(); }
+    public Aggregator<Double> create() {
+      return new SumDoubles();
+    }
   };
-  
-  public static class SumBigInts implements Aggregator<BigInteger> {    
+
+  public static class SumBigInts implements Aggregator<BigInteger> {
     private BigInteger sum = BigInteger.ZERO;
-    
+
     @Override
     public void reset() {
       sum = BigInteger.ZERO;
@@ -434,256 +446,289 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     public void update(BigInteger next) {
       sum = sum.add(next);
     }
-    
+
     @Override
     public Iterable<BigInteger> results() {
       return ImmutableList.of(sum);
     }
   }
+
   public static AggregatorFactory<BigInteger> SUM_BIGINTS = new AggregatorFactory<BigInteger>() {
-    public Aggregator<BigInteger> create() { return new SumBigInts(); }
+    public Aggregator<BigInteger> create() {
+      return new SumBigInts();
+    }
   };
-  
+
   public static class MaxLongs implements Aggregator<Long> {
     private Long max = null;
-    
+
     @Override
     public void reset() {
       max = null;
     }
-    
+
     @Override
     public void update(Long next) {
       if (max == null || max < next) {
         max = next;
       }
     }
-    
+
     @Override
     public Iterable<Long> results() {
       return ImmutableList.of(max);
     }
   }
+
   public static AggregatorFactory<Long> MAX_LONGS = new AggregatorFactory<Long>() {
-    public Aggregator<Long> create() { return new MaxLongs(); }
+    public Aggregator<Long> create() {
+      return new MaxLongs();
+    }
   };
-  
+
   public static class MaxInts implements Aggregator<Integer> {
     private Integer max = null;
-    
+
     @Override
     public void reset() {
       max = null;
     }
-    
+
     @Override
     public void update(Integer next) {
       if (max == null || max < next) {
         max = next;
       }
     }
-    
+
     @Override
     public Iterable<Integer> results() {
       return ImmutableList.of(max);
     }
   }
+
   public static AggregatorFactory<Integer> MAX_INTS = new AggregatorFactory<Integer>() {
-    public Aggregator<Integer> create() { return new MaxInts(); }
+    public Aggregator<Integer> create() {
+      return new MaxInts();
+    }
   };
-  
+
   public static class MaxFloats implements Aggregator<Float> {
     private Float max = null;
-    
+
     @Override
     public void reset() {
       max = null;
     }
-    
+
     @Override
     public void update(Float next) {
       if (max == null || max < next) {
         max = next;
       }
     }
-    
+
     @Override
     public Iterable<Float> results() {
       return ImmutableList.of(max);
     }
   }
+
   public static AggregatorFactory<Float> MAX_FLOATS = new AggregatorFactory<Float>() {
-    public Aggregator<Float> create() { return new MaxFloats(); }
+    public Aggregator<Float> create() {
+      return new MaxFloats();
+    }
   };
-  
+
   public static class MaxDoubles implements Aggregator<Double> {
     private Double max = null;
-    
+
     @Override
     public void reset() {
       max = null;
     }
-    
+
     @Override
     public void update(Double next) {
       if (max == null || max < next) {
         max = next;
       }
     }
-    
+
     @Override
     public Iterable<Double> results() {
       return ImmutableList.of(max);
     }
   }
+
   public static AggregatorFactory<Double> MAX_DOUBLES = new AggregatorFactory<Double>() {
-    public Aggregator<Double> create() { return new MaxDoubles(); }
+    public Aggregator<Double> create() {
+      return new MaxDoubles();
+    }
   };
-  
+
   public static class MaxBigInts implements Aggregator<BigInteger> {
     private BigInteger max = null;
-    
+
     @Override
     public void reset() {
       max = null;
     }
-    
+
     @Override
     public void update(BigInteger next) {
       if (max == null || max.compareTo(next) < 0) {
         max = next;
       }
     }
-    
+
     @Override
     public Iterable<BigInteger> results() {
       return ImmutableList.of(max);
     }
   }
+
   public static AggregatorFactory<BigInteger> MAX_BIGINTS = new AggregatorFactory<BigInteger>() {
-    public Aggregator<BigInteger> create() { return new MaxBigInts(); }
+    public Aggregator<BigInteger> create() {
+      return new MaxBigInts();
+    }
   };
-  
+
   public static class MinLongs implements Aggregator<Long> {
     private Long min = null;
-    
+
     @Override
     public void reset() {
       min = null;
     }
-    
+
     @Override
     public void update(Long next) {
       if (min == null || min > next) {
         min = next;
       }
     }
-    
+
     @Override
     public Iterable<Long> results() {
       return ImmutableList.of(min);
     }
   }
+
   public static AggregatorFactory<Long> MIN_LONGS = new AggregatorFactory<Long>() {
-    public Aggregator<Long> create() { return new MinLongs(); }
+    public Aggregator<Long> create() {
+      return new MinLongs();
+    }
   };
-  
-  public static class MinInts implements Aggregator<Integer> {    
+
+  public static class MinInts implements Aggregator<Integer> {
     private Integer min = null;
-    
+
     @Override
     public void reset() {
       min = null;
     }
-    
+
     @Override
     public void update(Integer next) {
       if (min == null || min > next) {
         min = next;
       }
     }
-    
+
     @Override
     public Iterable<Integer> results() {
       return ImmutableList.of(min);
     }
   }
+
   public static AggregatorFactory<Integer> MIN_INTS = new AggregatorFactory<Integer>() {
-    public Aggregator<Integer> create() { return new MinInts(); }
+    public Aggregator<Integer> create() {
+      return new MinInts();
+    }
   };
-  
+
   public static class MinFloats implements Aggregator<Float> {
     private Float min = null;
-    
+
     @Override
     public void reset() {
       min = null;
     }
-    
+
     @Override
     public void update(Float next) {
       if (min == null || min > next) {
         min = next;
       }
     }
-    
+
     @Override
     public Iterable<Float> results() {
       return ImmutableList.of(min);
     }
   }
+
   public static AggregatorFactory<Float> MIN_FLOATS = new AggregatorFactory<Float>() {
-    public Aggregator<Float> create() { return new MinFloats(); }
+    public Aggregator<Float> create() {
+      return new MinFloats();
+    }
   };
-  
+
   public static class MinDoubles implements Aggregator<Double> {
     private Double min = null;
-    
+
     @Override
     public void reset() {
       min = null;
     }
-    
+
     @Override
     public void update(Double next) {
       if (min == null || min > next) {
         min = next;
       }
     }
-    
+
     @Override
     public Iterable<Double> results() {
       return ImmutableList.of(min);
     }
   }
+
   public static AggregatorFactory<Double> MIN_DOUBLES = new AggregatorFactory<Double>() {
-    public Aggregator<Double> create() { return new MinDoubles(); }
+    public Aggregator<Double> create() {
+      return new MinDoubles();
+    }
   };
 
   public static class MinBigInts implements Aggregator<BigInteger> {
     private BigInteger min = null;
-    
+
     @Override
     public void reset() {
       min = null;
     }
-    
+
     @Override
     public void update(BigInteger next) {
       if (min == null || min.compareTo(next) > 0) {
         min = next;
       }
     }
-    
+
     @Override
     public Iterable<BigInteger> results() {
       return ImmutableList.of(min);
     }
   }
+
   public static AggregatorFactory<BigInteger> MIN_BIGINTS = new AggregatorFactory<BigInteger>() {
-    public Aggregator<BigInteger> create() { return new MinBigInts(); }
+    public Aggregator<BigInteger> create() {
+      return new MinBigInts();
+    }
   };
-  
+
   public static class MaxNAggregator<V extends Comparable<V>> implements Aggregator<V> {
     private final int arity;
     private transient SortedSet<V> elements;
@@ -700,7 +745,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
         elements.clear();
       }
     }
-    
+
     @Override
     public void update(V value) {
       if (elements.size() < arity) {
@@ -710,17 +755,17 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
         elements.add(value);
       }
     }
-    
+
     @Override
     public Iterable<V> results() {
       return ImmutableList.copyOf(elements);
     }
   }
-  
+
   public static class MinNAggregator<V extends Comparable<V>> implements Aggregator<V> {
     private final int arity;
     private transient SortedSet<V> elements;
-    
+
     public MinNAggregator(int arity) {
       this.arity = arity;
     }
@@ -733,7 +778,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
         elements.clear();
       }
     }
-    
+
     @Override
     public void update(V value) {
       if (elements.size() < arity) {
@@ -743,17 +788,17 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
         elements.add(value);
       }
     }
-    
+
     @Override
     public Iterable<V> results() {
       return ImmutableList.copyOf(elements);
     }
   }
-  
+
   public static class FirstNAggregator<V> implements Aggregator<V> {
     private final int arity;
     private final List<V> elements;
-    
+
     public FirstNAggregator(int arity) {
       this.arity = arity;
       this.elements = Lists.newArrayList();
@@ -763,14 +808,14 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     public void reset() {
       elements.clear();
     }
-    
+
     @Override
     public void update(V value) {
       if (elements.size() < arity) {
         elements.add(value);
       }
     }
-    
+
     @Override
     public Iterable<V> results() {
       return ImmutableList.copyOf(elements);
@@ -780,7 +825,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
   public static class LastNAggregator<V> implements Aggregator<V> {
     private final int arity;
     private final LinkedList<V> elements;
-    
+
     public LastNAggregator(int arity) {
       this.arity = arity;
       this.elements = Lists.newLinkedList();
@@ -790,7 +835,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     public void reset() {
       elements.clear();
     }
-    
+
     @Override
     public void update(V value) {
       elements.add(value);
@@ -798,7 +843,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
         elements.removeFirst();
       }
     }
-    
+
     @Override
     public Iterable<V> results() {
       return ImmutableList.copyOf(elements);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/DoFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/DoFn.java b/crunch/src/main/java/org/apache/crunch/DoFn.java
index 30fa70c..3d1e785 100644
--- a/crunch/src/main/java/org/apache/crunch/DoFn.java
+++ b/crunch/src/main/java/org/apache/crunch/DoFn.java
@@ -19,38 +19,39 @@ package org.apache.crunch;
 
 import java.io.Serializable;
 
+import org.apache.crunch.test.TestCounters;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
-import org.apache.crunch.test.TestCounters;
-
 /**
  * Base class for all data processing functions in Crunch.
  * 
- * <p>Note that all {@code DoFn} instances implement {@link Serializable},
- * and thus all of their non-transient member variables must implement
+ * <p>
+ * Note that all {@code DoFn} instances implement {@link Serializable}, and thus
+ * all of their non-transient member variables must implement
  * {@code Serializable} as well. If your DoFn depends on non-serializable
- * classes for data processing, they may be declared as {@code transient}
- * and initialized in the DoFn's {@code initialize} method.
- *
+ * classes for data processing, they may be declared as {@code transient} and
+ * initialized in the DoFn's {@code initialize} method.
+ * 
  */
 public abstract class DoFn<S, T> implements Serializable {
   private transient TaskInputOutputContext<?, ?, ?, ?> context;
   private transient Configuration testConf;
   private transient String internalStatus;
-  
+
   /**
-   * Called during the job planning phase. Subclasses may override
-   * this method in order to modify the configuration of the Job
-   * that this DoFn instance belongs to.
+   * Called during the job planning phase. Subclasses may override this method
+   * in order to modify the configuration of the Job that this DoFn instance
+   * belongs to.
    * 
-   * @param conf The Configuration instance for the Job.
+   * @param conf
+   *          The Configuration instance for the Job.
    */
-  public void configure(Configuration conf) {  
+  public void configure(Configuration conf) {
   }
-  
+
   /**
    * Processes the records from a {@link PCollection}.
    * 
@@ -58,9 +59,11 @@ public abstract class DoFn<S, T> implements Serializable {
    * <br/>
    * <b>Note:</b> Crunch can reuse a single input record object whose content
    * changes on each {@link #process(Object, Emitter)} method call. This
-   * functionality is imposed by Hadoop's <a href="http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/Reducer.html">Reducer</a> implementation: 
-   * <i>The framework will reuse the key and value objects that are passed into the reduce, therefore the application
-   * should clone the objects they want to keep a copy of.</i>
+   * functionality is imposed by Hadoop's <a href=
+   * "http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/Reducer.html"
+   * >Reducer</a> implementation: <i>The framework will reuse the key and value
+   * objects that are passed into the reduce, therefore the application should
+   * clone the objects they want to keep a copy of.</i>
    * 
    * @param input
    *          The input record.
@@ -70,26 +73,27 @@ public abstract class DoFn<S, T> implements Serializable {
   public abstract void process(S input, Emitter<T> emitter);
 
   /**
-   * Called during the setup of the MapReduce job this {@code DoFn}
-   * is associated with. Subclasses may override this method to
-   * do appropriate initialization.
+   * Called during the setup of the MapReduce job this {@code DoFn} is
+   * associated with. Subclasses may override this method to do appropriate
+   * initialization.
    */
   public void initialize() {
   }
 
   /**
-   * Called during the cleanup of the MapReduce job this {@code DoFn}
-   * is associated with. Subclasses may override this method to do
-   * appropriate cleanup.
+   * Called during the cleanup of the MapReduce job this {@code DoFn} is
+   * associated with. Subclasses may override this method to do appropriate
+   * cleanup.
    * 
-   * @param emitter The emitter that was used for output
+   * @param emitter
+   *          The emitter that was used for output
    */
   public void cleanup(Emitter<T> emitter) {
   }
 
   /**
-   * Called during setup to pass the {@link TaskInputOutputContext} to
-   * this {@code DoFn} instance.
+   * Called during setup to pass the {@link TaskInputOutputContext} to this
+   * {@code DoFn} instance.
    */
   public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
     this.context = context;
@@ -98,12 +102,14 @@ public abstract class DoFn<S, T> implements Serializable {
 
   /**
    * Sets a {@code Configuration} instance to be used during unit tests.
-   * @param conf The Configuration instance.
+   * 
+   * @param conf
+   *          The Configuration instance.
    */
   public void setConfigurationForTest(Configuration conf) {
     this.testConf = conf;
   }
-  
+
   /**
    * Returns an estimate of how applying this function to a {@link PCollection}
    * will cause it to change in side. The optimizer uses these estimates to
@@ -117,11 +123,11 @@ public abstract class DoFn<S, T> implements Serializable {
   public float scaleFactor() {
     return 1.2f;
   }
-  
+
   protected TaskInputOutputContext<?, ?, ?, ?> getContext() {
     return context;
   }
-  
+
   protected Configuration getConfiguration() {
     if (context != null) {
       return context.getConfiguration();
@@ -130,35 +136,35 @@ public abstract class DoFn<S, T> implements Serializable {
     }
     return null;
   }
-  
+
   protected Counter getCounter(Enum<?> counterName) {
     if (context == null) {
       return TestCounters.getCounter(counterName);
     }
     return context.getCounter(counterName);
   }
-  
+
   protected Counter getCounter(String groupName, String counterName) {
     if (context == null) {
       return TestCounters.getCounter(groupName, counterName);
     }
     return context.getCounter(groupName, counterName);
   }
-  
+
   protected void increment(Enum<?> counterName) {
     increment(counterName, 1);
   }
-  
+
   protected void increment(Enum<?> counterName, long value) {
     getCounter(counterName).increment(value);
   }
-  
+
   protected void progress() {
     if (context != null) {
       context.progress();
     }
   }
-  
+
   protected TaskAttemptID getTaskAttemptID() {
     if (context != null) {
       return context.getTaskAttemptID();
@@ -166,19 +172,19 @@ public abstract class DoFn<S, T> implements Serializable {
       return new TaskAttemptID();
     }
   }
-  
+
   protected void setStatus(String status) {
     if (context != null) {
       context.setStatus(status);
     }
     this.internalStatus = status;
   }
-  
+
   protected String getStatus() {
     if (context != null) {
       return context.getStatus();
     }
     return internalStatus;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/Emitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Emitter.java b/crunch/src/main/java/org/apache/crunch/Emitter.java
index b9d4ff2..d104a09 100644
--- a/crunch/src/main/java/org/apache/crunch/Emitter.java
+++ b/crunch/src/main/java/org/apache/crunch/Emitter.java
@@ -19,19 +19,19 @@ package org.apache.crunch;
 
 /**
  * Interface for writing outputs from a {@link DoFn}.
- *
+ * 
  */
 public interface Emitter<T> {
   /**
    * Write the emitted value to the next stage of the pipeline.
    * 
-   * @param emitted The value to write
+   * @param emitted
+   *          The value to write
    */
   void emit(T emitted);
 
   /**
-   * Flushes any values cached by this emitter. Called during the
-   * cleanup stage.
+   * Flushes any values cached by this emitter. Called during the cleanup stage.
    */
   void flush();
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/FilterFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/FilterFn.java b/crunch/src/main/java/org/apache/crunch/FilterFn.java
index d471710..a9612ac 100644
--- a/crunch/src/main/java/org/apache/crunch/FilterFn.java
+++ b/crunch/src/main/java/org/apache/crunch/FilterFn.java
@@ -22,9 +22,9 @@ import java.util.List;
 import com.google.common.collect.ImmutableList;
 
 /**
- * A {@link DoFn} for the common case of filtering the members of
- * a {@link PCollection} based on a boolean condition.
- *
+ * A {@link DoFn} for the common case of filtering the members of a
+ * {@link PCollection} based on a boolean condition.
+ * 
  */
 public abstract class FilterFn<T> extends DoFn<T, T> {
 
@@ -44,19 +44,19 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
   public float scaleFactor() {
     return 0.5f;
   }
-  
-  public static <S> FilterFn<S> and(FilterFn<S>...fns) {
+
+  public static <S> FilterFn<S> and(FilterFn<S>... fns) {
     return new AndFn<S>(fns);
   }
-  
+
   public static class AndFn<S> extends FilterFn<S> {
-    
+
     private final List<FilterFn<S>> fns;
-    
+
     public AndFn(FilterFn<S>... fns) {
-      this.fns = ImmutableList.<FilterFn<S>>copyOf(fns);
+      this.fns = ImmutableList.<FilterFn<S>> copyOf(fns);
     }
-    
+
     @Override
     public boolean accept(S input) {
       for (FilterFn<S> fn : fns) {
@@ -66,7 +66,7 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
       }
       return true;
     }
-    
+
     @Override
     public float scaleFactor() {
       float scaleFactor = 1.0f;
@@ -74,21 +74,21 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
         scaleFactor *= fn.scaleFactor();
       }
       return scaleFactor;
-    }    
+    }
   }
-  
-  public static <S> FilterFn<S> or(FilterFn<S>...fns) {
+
+  public static <S> FilterFn<S> or(FilterFn<S>... fns) {
     return new OrFn<S>(fns);
   }
-  
+
   public static class OrFn<S> extends FilterFn<S> {
-    
+
     private final List<FilterFn<S>> fns;
-    
+
     public OrFn(FilterFn<S>... fns) {
-      this.fns = ImmutableList.<FilterFn<S>>copyOf(fns);
+      this.fns = ImmutableList.<FilterFn<S>> copyOf(fns);
     }
-    
+
     @Override
     public boolean accept(S input) {
       for (FilterFn<S> fn : fns) {
@@ -98,7 +98,7 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
       }
       return false;
     }
-    
+
     @Override
     public float scaleFactor() {
       float scaleFactor = 0.0f;
@@ -106,26 +106,26 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
         scaleFactor += fn.scaleFactor();
       }
       return Math.min(1.0f, scaleFactor);
-    }    
+    }
   }
-  
+
   public static <S> FilterFn<S> not(FilterFn<S> fn) {
     return new NotFn<S>(fn);
   }
-  
+
   public static class NotFn<S> extends FilterFn<S> {
-    
+
     private final FilterFn<S> base;
-    
+
     public NotFn(FilterFn<S> base) {
       this.base = base;
     }
-    
+
     @Override
     public boolean accept(S input) {
       return !base.accept(input);
     }
-    
+
     @Override
     public float scaleFactor() {
       return 1.0f - base.scaleFactor();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/GroupingOptions.java b/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
index d6cd12e..e58b666 100644
--- a/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
+++ b/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
@@ -22,10 +22,10 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Partitioner;
 
 /**
- * Options that can be passed to a {@code groupByKey} operation in order to exercise
- * finer control over how the partitioning, grouping, and sorting of keys is
- * performed.
- *
+ * Options that can be passed to a {@code groupByKey} operation in order to
+ * exercise finer control over how the partitioning, grouping, and sorting of
+ * keys is performed.
+ * 
  */
 public class GroupingOptions {
 
@@ -35,8 +35,8 @@ public class GroupingOptions {
   private final int numReducers;
 
   private GroupingOptions(Class<? extends Partitioner> partitionerClass,
-      Class<? extends RawComparator> groupingComparatorClass,
-      Class<? extends RawComparator> sortComparatorClass, int numReducers) {
+      Class<? extends RawComparator> groupingComparatorClass, Class<? extends RawComparator> sortComparatorClass,
+      int numReducers) {
     this.partitionerClass = partitionerClass;
     this.groupingComparatorClass = groupingComparatorClass;
     this.sortComparatorClass = sortComparatorClass;
@@ -46,11 +46,11 @@ public class GroupingOptions {
   public int getNumReducers() {
     return numReducers;
   }
-  
+
   public Class<? extends RawComparator> getSortComparatorClass() {
     return sortComparatorClass;
   }
-  
+
   public void configure(Job job) {
     if (partitionerClass != null) {
       job.setPartitionerClass(partitionerClass);
@@ -85,7 +85,7 @@ public class GroupingOptions {
 
   /**
    * Builder class for creating {@code GroupingOptions} instances.
-   *
+   * 
    */
   public static class Builder {
     private Class<? extends Partitioner> partitionerClass;
@@ -96,20 +96,17 @@ public class GroupingOptions {
     public Builder() {
     }
 
-    public Builder partitionerClass(
-        Class<? extends Partitioner> partitionerClass) {
+    public Builder partitionerClass(Class<? extends Partitioner> partitionerClass) {
       this.partitionerClass = partitionerClass;
       return this;
     }
 
-    public Builder groupingComparatorClass(
-        Class<? extends RawComparator> groupingComparatorClass) {
+    public Builder groupingComparatorClass(Class<? extends RawComparator> groupingComparatorClass) {
       this.groupingComparatorClass = groupingComparatorClass;
       return this;
     }
 
-    public Builder sortComparatorClass(
-        Class<? extends RawComparator> sortComparatorClass) {
+    public Builder sortComparatorClass(Class<? extends RawComparator> sortComparatorClass) {
       this.sortComparatorClass = sortComparatorClass;
       return this;
     }
@@ -123,8 +120,7 @@ public class GroupingOptions {
     }
 
     public GroupingOptions build() {
-      return new GroupingOptions(partitionerClass, groupingComparatorClass,
-          sortComparatorClass, numReducers);
+      return new GroupingOptions(partitionerClass, groupingComparatorClass, sortComparatorClass, numReducers);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/MapFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/MapFn.java b/crunch/src/main/java/org/apache/crunch/MapFn.java
index b1c5520..dbf172e 100644
--- a/crunch/src/main/java/org/apache/crunch/MapFn.java
+++ b/crunch/src/main/java/org/apache/crunch/MapFn.java
@@ -18,12 +18,12 @@
 package org.apache.crunch;
 
 /**
- * A {@link DoFn} for the common case of emitting exactly one value
- * for each input record.
- *
+ * A {@link DoFn} for the common case of emitting exactly one value for each
+ * input record.
+ * 
  */
 public abstract class MapFn<S, T> extends DoFn<S, T> {
-  
+
   /**
    * Maps the given input into an instance of the output type.
    */

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PCollection.java b/crunch/src/main/java/org/apache/crunch/PCollection.java
index cd25bdd..4a3f043 100644
--- a/crunch/src/main/java/org/apache/crunch/PCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/PCollection.java
@@ -22,19 +22,19 @@ import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 
 /**
- * A representation of an immutable, distributed collection of elements
- * that is the fundamental target of computations in Crunch.
- *
+ * A representation of an immutable, distributed collection of elements that is
+ * the fundamental target of computations in Crunch.
+ * 
  */
 public interface PCollection<S> {
   /**
    * Returns the {@code Pipeline} associated with this PCollection.
    */
   Pipeline getPipeline();
-  
+
   /**
-   * Returns a {@code PCollection} instance that acts as the union
-   * of this {@code PCollection} and the input {@code PCollection}s.
+   * Returns a {@code PCollection} instance that acts as the union of this
+   * {@code PCollection} and the input {@code PCollection}s.
    */
   PCollection<S> union(PCollection<S>... collections);
 
@@ -42,19 +42,24 @@ public interface PCollection<S> {
    * Applies the given doFn to the elements of this {@code PCollection} and
    * returns a new {@code PCollection} that is the output of this processing.
    * 
-   * @param doFn The {@code DoFn} to apply
-   * @param type The {@link PType} of the resulting {@code PCollection}
+   * @param doFn
+   *          The {@code DoFn} to apply
+   * @param type
+   *          The {@link PType} of the resulting {@code PCollection}
    * @return a new {@code PCollection}
    */
   <T> PCollection<T> parallelDo(DoFn<S, T> doFn, PType<T> type);
-  
+
   /**
    * Applies the given doFn to the elements of this {@code PCollection} and
    * returns a new {@code PCollection} that is the output of this processing.
    * 
-   * @param name An identifier for this processing step, useful for debugging
-   * @param doFn The {@code DoFn} to apply
-   * @param type The {@link PType} of the resulting {@code PCollection}
+   * @param name
+   *          An identifier for this processing step, useful for debugging
+   * @param doFn
+   *          The {@code DoFn} to apply
+   * @param type
+   *          The {@link PType} of the resulting {@code PCollection}
    * @return a new {@code PCollection}
    */
   <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T> type);
@@ -63,38 +68,43 @@ public interface PCollection<S> {
    * Similar to the other {@code parallelDo} instance, but returns a
    * {@code PTable} instance instead of a {@code PCollection}.
    * 
-   * @param doFn The {@code DoFn} to apply
-   * @param type The {@link PTableType} of the resulting {@code PTable}
+   * @param doFn
+   *          The {@code DoFn} to apply
+   * @param type
+   *          The {@link PTableType} of the resulting {@code PTable}
    * @return a new {@code PTable}
    */
   <K, V> PTable<K, V> parallelDo(DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type);
-  
+
   /**
    * Similar to the other {@code parallelDo} instance, but returns a
    * {@code PTable} instance instead of a {@code PCollection}.
    * 
-   * @param name An identifier for this processing step
-   * @param doFn The {@code DoFn} to apply
-   * @param type The {@link PTableType} of the resulting {@code PTable}
+   * @param name
+   *          An identifier for this processing step
+   * @param doFn
+   *          The {@code DoFn} to apply
+   * @param type
+   *          The {@link PTableType} of the resulting {@code PTable}
    * @return a new {@code PTable}
    */
-  <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn,
-      PTableType<K, V> type);
+  <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type);
 
   /**
    * Write the contents of this {@code PCollection} to the given {@code Target},
    * using the storage format specified by the target.
    * 
-   * @param target The target to write to
+   * @param target
+   *          The target to write to
    */
   PCollection<S> write(Target target);
-  
+
   /**
    * Returns a reference to the data set represented by this PCollection that
    * may be used by the client to read the data locally.
    */
   Iterable<S> materialize();
-  
+
   /**
    * Returns the {@code PType} of this {@code PCollection}.
    */
@@ -106,7 +116,8 @@ public interface PCollection<S> {
   PTypeFamily getTypeFamily();
 
   /**
-   * Returns the size of the data represented by this {@code PCollection} in bytes.
+   * Returns the size of the data represented by this {@code PCollection} in
+   * bytes.
    */
   long getSize();
 
@@ -114,67 +125,71 @@ public interface PCollection<S> {
    * Returns a shorthand name for this PCollection.
    */
   String getName();
-  
+
   /**
-   * Apply the given filter function to this instance and return the
-   * resulting {@code PCollection}.
+   * Apply the given filter function to this instance and return the resulting
+   * {@code PCollection}.
    */
   PCollection<S> filter(FilterFn<S> filterFn);
-  
+
   /**
-   * Apply the given filter function to this instance and return the
-   * resulting {@code PCollection}.
+   * Apply the given filter function to this instance and return the resulting
+   * {@code PCollection}.
    * 
-   * @param name An identifier for this processing step
-   * @param filterFn The {@code FilterFn} to apply
+   * @param name
+   *          An identifier for this processing step
+   * @param filterFn
+   *          The {@code FilterFn} to apply
    */
   PCollection<S> filter(String name, FilterFn<S> filterFn);
-  
+
   /**
-   * Apply the given map function to each element of this instance in order
-   * to create a {@code PTable}.
+   * Apply the given map function to each element of this instance in order to
+   * create a {@code PTable}.
    */
   <K> PTable<K, S> by(MapFn<S, K> extractKeyFn, PType<K> keyType);
- 
+
   /**
-   * Apply the given map function to each element of this instance in order
-   * to create a {@code PTable}.
-   *   
-   * @param name An identifier for this processing step
-   * @param extractKeyFn The {@code MapFn} to apply
+   * Apply the given map function to each element of this instance in order to
+   * create a {@code PTable}.
+   * 
+   * @param name
+   *          An identifier for this processing step
+   * @param extractKeyFn
+   *          The {@code MapFn} to apply
    */
   <K> PTable<K, S> by(String name, MapFn<S, K> extractKeyFn, PType<K> keyType);
-  
+
   /**
-   * Returns a {@code PCollection} instance that contains all of the elements
-   * of this instance in sorted order.
+   * Returns a {@code PCollection} instance that contains all of the elements of
+   * this instance in sorted order.
    */
   PCollection<S> sort(boolean ascending);
-  
+
   /**
    * Returns a {@code PTable} instance that contains the counts of each unique
    * element of this PCollection.
    */
   PTable<S, Long> count();
-  
+
   /**
    * Returns a {@code PCollection} made up of only the maximum element of this
    * instance.
    */
   PCollection<S> max();
-  
+
   /**
    * Returns a {@code PCollection} made up of only the minimum element of this
    * instance.
    */
   PCollection<S> min();
-  
+
   /**
    * Randomly sample items from this PCollection instance with the given
    * probability of an item being accepted.
    */
   PCollection<S> sample(double acceptanceProbability);
-  
+
   /**
    * Randomly sample items from this PCollection instance with the given
    * probability of an item being accepted and using the given seed.

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/PGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PGroupedTable.java b/crunch/src/main/java/org/apache/crunch/PGroupedTable.java
index 96918a5..e727b70 100644
--- a/crunch/src/main/java/org/apache/crunch/PGroupedTable.java
+++ b/crunch/src/main/java/org/apache/crunch/PGroupedTable.java
@@ -19,13 +19,14 @@ package org.apache.crunch;
 
 /**
  * The Crunch representation of a grouped {@link PTable}.
- *
+ * 
  */
 public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
   /**
    * Combines the values of this grouping using the given {@code CombineFn}.
    * 
-   * @param combineFn The combiner function
+   * @param combineFn
+   *          The combiner function
    * @return A {@code PTable} where each key has a single value
    */
   PTable<K, V> combineValues(CombineFn<K, V> combineFn);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/PTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PTable.java b/crunch/src/main/java/org/apache/crunch/PTable.java
index b673f6d..cd10e46 100644
--- a/crunch/src/main/java/org/apache/crunch/PTable.java
+++ b/crunch/src/main/java/org/apache/crunch/PTable.java
@@ -26,17 +26,18 @@ import org.apache.crunch.types.PType;
 /**
  * A sub-interface of {@code PCollection} that represents an immutable,
  * distributed multi-map of keys and values.
- *
+ * 
  */
 public interface PTable<K, V> extends PCollection<Pair<K, V>> {
   /**
-   * Returns a {@code PTable} instance that acts as the union
-   * of this {@code PTable} and the input {@code PTable}s.
+   * Returns a {@code PTable} instance that acts as the union of this
+   * {@code PTable} and the input {@code PTable}s.
    */
   PTable<K, V> union(PTable<K, V>... others);
 
   /**
    * Performs a grouping operation on the keys of this table.
+   * 
    * @return a {@code PGroupedTable} instance that represents the grouping
    */
   PGroupedTable<K, V> groupByKey();
@@ -45,17 +46,18 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
    * Performs a grouping operation on the keys of this table, using the given
    * number of partitions.
    * 
-   * @param numPartitions The number of partitions for the data.
+   * @param numPartitions
+   *          The number of partitions for the data.
    * @return a {@code PGroupedTable} instance that represents this grouping
    */
   PGroupedTable<K, V> groupByKey(int numPartitions);
-  
+
   /**
    * Performs a grouping operation on the keys of this table, using the
-   * additional {@code GroupingOptions} to control how the grouping is
-   * executed.
+   * additional {@code GroupingOptions} to control how the grouping is executed.
    * 
-   * @param options The grouping options to use
+   * @param options
+   *          The grouping options to use
    * @return a {@code PGroupedTable} instance that represents the grouping
    */
   PGroupedTable<K, V> groupByKey(GroupingOptions options);
@@ -64,12 +66,12 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
    * Writes this {@code PTable} to the given {@code Target}.
    */
   PTable<K, V> write(Target target);
-  
+
   /**
    * Returns the {@code PTableType} of this {@code PTable}.
    */
   PTableType<K, V> getPTableType();
-  
+
   /**
    * Returns the {@code PType} of the key.
    */
@@ -79,33 +81,37 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
    * Returns the {@code PType} of the value.
    */
   PType<V> getValueType();
-  
+
   /**
-   * Aggregate all of the values with the same key into a single
-   * key-value pair in the returned PTable.
+   * Aggregate all of the values with the same key into a single key-value pair
+   * in the returned PTable.
    */
   PTable<K, Collection<V>> collectValues();
-  
+
   /**
-   * Returns a PTable made up of the pairs in this PTable with the
-   * largest value field.
-   * @param count The number of pairs to return
+   * Returns a PTable made up of the pairs in this PTable with the largest value
+   * field.
+   * 
+   * @param count
+   *          The number of pairs to return
    */
   PTable<K, V> top(int count);
-  
+
   /**
-   * Returns a PTable made up of the pairs in this PTable with the
-   * smallest value field.
-   * @param count The number of pairs to return
+   * Returns a PTable made up of the pairs in this PTable with the smallest
+   * value field.
+   * 
+   * @param count
+   *          The number of pairs to return
    */
   PTable<K, V> bottom(int count);
-  
+
   /**
-   * Perform an inner join on this table and the one passed in as
-   * an argument on their common keys.
+   * Perform an inner join on this table and the one passed in as an argument on
+   * their common keys.
    */
   <U> PTable<K, Pair<V, U>> join(PTable<K, U> other);
-  
+
   /**
    * Co-group operation with the given table on common keys.
    */
@@ -113,14 +119,14 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
 
   /**
    * Returns a {@link PCollection} made up of the keys in this PTable.
-   */  
+   */
   PCollection<K> keys();
-  
+
   /**
    * Returns a {@link PCollection} made up of the values in this PTable.
    */
   PCollection<V> values();
-  
+
   /**
    * Returns a Map<K, V> made up of the keys and values in this PTable.
    * <p>

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/Pair.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Pair.java b/crunch/src/main/java/org/apache/crunch/Pair.java
index 9d319ab..fd058b6 100644
--- a/crunch/src/main/java/org/apache/crunch/Pair.java
+++ b/crunch/src/main/java/org/apache/crunch/Pair.java
@@ -58,7 +58,7 @@ public class Pair<K, V> implements Tuple, Comparable<Pair<K, V>> {
   public int size() {
     return 2;
   }
-  
+
   @Override
   public int hashCode() {
     HashCodeBuilder hcb = new HashCodeBuilder();
@@ -74,15 +74,15 @@ public class Pair<K, V> implements Tuple, Comparable<Pair<K, V>> {
     if (getClass() != obj.getClass())
       return false;
     Pair<?, ?> other = (Pair<?, ?>) obj;
-    return (first == other.first || (first != null && first.equals(other.first))) &&
-    	(second == other.second || (second != null && second.equals(other.second)));
+    return (first == other.first || (first != null && first.equals(other.first)))
+        && (second == other.second || (second != null && second.equals(other.second)));
   }
 
   @Override
   public String toString() {
-	StringBuilder sb = new StringBuilder("[");
-	sb.append(first).append(",").append(second).append("]");
-	return sb.toString();
+    StringBuilder sb = new StringBuilder("[");
+    sb.append(first).append(",").append(second).append("]");
+    return sb.toString();
   }
 
   private int cmp(Object lhs, Object rhs) {
@@ -93,8 +93,8 @@ public class Pair<K, V> implements Tuple, Comparable<Pair<K, V>> {
     }
     return (lhs == null ? 0 : lhs.hashCode()) - (rhs == null ? 0 : rhs.hashCode());
   }
-  
-  @Override  
+
+  @Override
   public int compareTo(Pair<K, V> o) {
     int diff = cmp(first, o.first);
     if (diff == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Pipeline.java b/crunch/src/main/java/org/apache/crunch/Pipeline.java
index fd60695..bcf8727 100644
--- a/crunch/src/main/java/org/apache/crunch/Pipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/Pipeline.java
@@ -24,68 +24,74 @@ import org.apache.hadoop.conf.Configuration;
  * 
  */
 public interface Pipeline {
-  
+
   /**
    * Set the {@code Configuration} to use with this pipeline.
    */
   void setConfiguration(Configuration conf);
-  
+
   /**
    * Returns the name of this pipeline.
+   * 
    * @return Name of the pipeline
    */
   String getName();
-  
+
   /**
    * Returns the {@code Configuration} instance associated with this pipeline.
    */
   Configuration getConfiguration();
-  
+
   /**
    * Converts the given {@code Source} into a {@code PCollection} that is
    * available to jobs run using this {@code Pipeline} instance.
    * 
-   * @param source The source of data
+   * @param source
+   *          The source of data
    * @return A PCollection that references the given source
    */
   <T> PCollection<T> read(Source<T> source);
 
   /**
-   * A version of the read method for {@code TableSource} instances that
-   * map to {@code PTable}s.
-   * @param tableSource The source of the data
+   * A version of the read method for {@code TableSource} instances that map to
+   * {@code PTable}s.
+   * 
+   * @param tableSource
+   *          The source of the data
    * @return A PTable that references the given source
    */
   <K, V> PTable<K, V> read(TableSource<K, V> tableSource);
-  
+
   /**
-   * Write the given collection to the given target on the next
-   * pipeline run.
+   * Write the given collection to the given target on the next pipeline run.
    * 
-   * @param collection The collection
-   * @param target The output target
+   * @param collection
+   *          The collection
+   * @param target
+   *          The output target
    */
   void write(PCollection<?> collection, Target target);
 
   /**
-   * Create the given PCollection and read the data it contains
-   * into the returned Collection instance for client use.
-   *
-   * @param pcollection The PCollection to materialize
+   * Create the given PCollection and read the data it contains into the
+   * returned Collection instance for client use.
+   * 
+   * @param pcollection
+   *          The PCollection to materialize
    * @return the data from the PCollection as a read-only Collection
    */
   <T> Iterable<T> materialize(PCollection<T> pcollection);
-  
+
   /**
-   * Constructs and executes a series of MapReduce jobs in order
-   * to write data to the output targets.
+   * Constructs and executes a series of MapReduce jobs in order to write data
+   * to the output targets.
    */
   PipelineResult run();
 
   /**
-   * Run any remaining jobs required to generate outputs and then
-   * clean up any intermediate data files that were created in
-   * this run or previous calls to {@code run}.
+   * Run any remaining jobs required to generate outputs and then clean up any
+   * intermediate data files that were created in this run or previous calls to
+   * {@code run}.
    */
   PipelineResult done();
 
@@ -98,7 +104,7 @@ public interface Pipeline {
    * A convenience method for writing a text file.
    */
   <T> void writeTextFile(PCollection<T> collection, String pathName);
-  
+
   /**
    * Turn on debug logging for jobs that are run from this pipeline.
    */

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/PipelineResult.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PipelineResult.java b/crunch/src/main/java/org/apache/crunch/PipelineResult.java
index 6f97fe8..90b1067 100644
--- a/crunch/src/main/java/org/apache/crunch/PipelineResult.java
+++ b/crunch/src/main/java/org/apache/crunch/PipelineResult.java
@@ -25,50 +25,51 @@ import org.apache.hadoop.mapreduce.Counters;
 import com.google.common.collect.ImmutableList;
 
 /**
- * Container for the results of a call to {@code run} or {@code done} on the Pipeline interface that includes
- * details and statistics about the component stages of the data pipeline.
+ * Container for the results of a call to {@code run} or {@code done} on the
+ * Pipeline interface that includes details and statistics about the component
+ * stages of the data pipeline.
  */
 public class PipelineResult {
 
   public static class StageResult {
-    
+
     private final String stageName;
     private final Counters counters;
-    
+
     public StageResult(String stageName, Counters counters) {
       this.stageName = stageName;
       this.counters = counters;
     }
-    
+
     public String getStageName() {
       return stageName;
     }
-    
+
     public Counters getCounters() {
       return counters;
     }
-    
+
     public Counter findCounter(Enum<?> key) {
       return counters.findCounter(key);
     }
-    
+
     public long getCounterValue(Enum<?> key) {
       return findCounter(key).getValue();
     }
   }
-  
-  public static final PipelineResult EMPTY = new PipelineResult(ImmutableList.<StageResult>of());
-  
+
+  public static final PipelineResult EMPTY = new PipelineResult(ImmutableList.<StageResult> of());
+
   private final List<StageResult> stageResults;
-  
+
   public PipelineResult(List<StageResult> stageResults) {
     this.stageResults = ImmutableList.copyOf(stageResults);
   }
-  
+
   public boolean succeeded() {
     return !stageResults.isEmpty();
   }
-  
+
   public List<StageResult> getStageResults() {
     return stageResults;
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/Source.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Source.java b/crunch/src/main/java/org/apache/crunch/Source.java
index a77a656..f54d135 100644
--- a/crunch/src/main/java/org/apache/crunch/Source.java
+++ b/crunch/src/main/java/org/apache/crunch/Source.java
@@ -19,17 +19,16 @@ package org.apache.crunch;
 
 import java.io.IOException;
 
+import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
 
-import org.apache.crunch.types.PType;
-
 /**
- * A {@code Source} represents an input data set that is an input to one
- * or more MapReduce jobs.
- *
+ * A {@code Source} represents an input data set that is an input to one or more
+ * MapReduce jobs.
+ * 
  */
-public interface Source<T> { 
+public interface Source<T> {
   /**
    * Returns the {@code PType} for this source.
    */
@@ -38,8 +37,10 @@ public interface Source<T> {
   /**
    * Configure the given job to use this source as an input.
    * 
-   * @param job The job to configure
-   * @param inputId For a multi-input job, an identifier for this input to the job
+   * @param job
+   *          The job to configure
+   * @param inputId
+   *          For a multi-input job, an identifier for this input to the job
    * @throws IOException
    */
   void configureSource(Job job, int inputId) throws IOException;
@@ -47,5 +48,5 @@ public interface Source<T> {
   /**
    * Returns the number of bytes in this {@code Source}.
    */
-  long getSize(Configuration configuration);  
+  long getSize(Configuration configuration);
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/SourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/SourceTarget.java b/crunch/src/main/java/org/apache/crunch/SourceTarget.java
index 3d06392..5d0cddc 100644
--- a/crunch/src/main/java/org/apache/crunch/SourceTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/SourceTarget.java
@@ -17,11 +17,10 @@
  */
 package org.apache.crunch;
 
-
 /**
- * An interface for classes that implement both the {@code Source} and
- * the {@code Target} interfaces.
- *
+ * An interface for classes that implement both the {@code Source} and the
+ * {@code Target} interfaces.
+ * 
  */
 public interface SourceTarget<T> extends Source<T>, Target {
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/TableSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/TableSource.java b/crunch/src/main/java/org/apache/crunch/TableSource.java
index 9ebade6..ff27346 100644
--- a/crunch/src/main/java/org/apache/crunch/TableSource.java
+++ b/crunch/src/main/java/org/apache/crunch/TableSource.java
@@ -21,7 +21,7 @@ import org.apache.crunch.types.PTableType;
 
 /**
  * The interface {@code Source} implementations that return a {@link PTable}.
- *
+ * 
  */
 public interface TableSource<K, V> extends Source<Pair<K, V>> {
   PTableType<K, V> getTableType();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/Target.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Target.java b/crunch/src/main/java/org/apache/crunch/Target.java
index 75de874..ea6fd9d 100644
--- a/crunch/src/main/java/org/apache/crunch/Target.java
+++ b/crunch/src/main/java/org/apache/crunch/Target.java
@@ -22,10 +22,10 @@ import org.apache.crunch.types.PType;
 
 /**
  * A {@code Target} represents the output destination of a Crunch job.
- *
+ * 
  */
 public interface Target {
   boolean accept(OutputHandler handler, PType<?> ptype);
-  
+
   <T> SourceTarget<T> asSourceTarget(PType<T> ptype);
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/Tuple.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Tuple.java b/crunch/src/main/java/org/apache/crunch/Tuple.java
index 38ce188..4e602ff 100644
--- a/crunch/src/main/java/org/apache/crunch/Tuple.java
+++ b/crunch/src/main/java/org/apache/crunch/Tuple.java
@@ -18,9 +18,9 @@
 package org.apache.crunch;
 
 /**
- * A fixed-size collection of Objects, used in Crunch for representing
- * joins between {@code PCollection}s.
- *
+ * A fixed-size collection of Objects, used in Crunch for representing joins
+ * between {@code PCollection}s.
+ * 
  */
 public interface Tuple {
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/Tuple3.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Tuple3.java b/crunch/src/main/java/org/apache/crunch/Tuple3.java
index d06f17c..4372811 100644
--- a/crunch/src/main/java/org/apache/crunch/Tuple3.java
+++ b/crunch/src/main/java/org/apache/crunch/Tuple3.java
@@ -31,7 +31,7 @@ public class Tuple3<V1, V2, V3> implements Tuple {
   public static <A, B, C> Tuple3<A, B, C> of(A a, B b, C c) {
     return new Tuple3<A, B, C>(a, b, c);
   }
-  
+
   public Tuple3(V1 first, V2 second, V3 third) {
     this.first = first;
     this.second = second;
@@ -66,7 +66,7 @@ public class Tuple3<V1, V2, V3> implements Tuple {
   public int size() {
     return 3;
   }
-  
+
   @Override
   public int hashCode() {
     HashCodeBuilder hcb = new HashCodeBuilder();
@@ -82,15 +82,15 @@ public class Tuple3<V1, V2, V3> implements Tuple {
     if (getClass() != obj.getClass())
       return false;
     Tuple3<?, ?, ?> other = (Tuple3<?, ?, ?>) obj;
-    return (first == other.first || (first != null && first.equals(other.first))) &&
-    	(second == other.second || (second != null && second.equals(other.second))) &&
-    	(third == other.third || (third != null && third.equals(other.third)));
+    return (first == other.first || (first != null && first.equals(other.first)))
+        && (second == other.second || (second != null && second.equals(other.second)))
+        && (third == other.third || (third != null && third.equals(other.third)));
   }
 
   @Override
   public String toString() {
-	StringBuilder sb = new StringBuilder("Tuple3[");
-	sb.append(first).append(",").append(second).append(",").append(third);
-	return sb.append("]").toString();
+    StringBuilder sb = new StringBuilder("Tuple3[");
+    sb.append(first).append(",").append(second).append(",").append(third);
+    return sb.append("]").toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/Tuple4.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Tuple4.java b/crunch/src/main/java/org/apache/crunch/Tuple4.java
index a30cbb1..f161371 100644
--- a/crunch/src/main/java/org/apache/crunch/Tuple4.java
+++ b/crunch/src/main/java/org/apache/crunch/Tuple4.java
@@ -32,7 +32,7 @@ public class Tuple4<V1, V2, V3, V4> implements Tuple {
   public static <A, B, C, D> Tuple4<A, B, C, D> of(A a, B b, C c, D d) {
     return new Tuple4<A, B, C, D>(a, b, c, d);
   }
-  
+
   public Tuple4(V1 first, V2 second, V3 third, V4 fourth) {
     this.first = first;
     this.second = second;
@@ -74,12 +74,11 @@ public class Tuple4<V1, V2, V3, V4> implements Tuple {
   public int size() {
     return 4;
   }
-  
+
   @Override
   public int hashCode() {
     HashCodeBuilder hcb = new HashCodeBuilder();
-    return hcb.append(first).append(second).append(third)
-    	.append(fourth).toHashCode();
+    return hcb.append(first).append(second).append(third).append(fourth).toHashCode();
   }
 
   @Override
@@ -91,16 +90,16 @@ public class Tuple4<V1, V2, V3, V4> implements Tuple {
     if (getClass() != obj.getClass())
       return false;
     Tuple4<?, ?, ?, ?> other = (Tuple4<?, ?, ?, ?>) obj;
-    return (first == other.first || (first != null && first.equals(other.first))) &&
-    	(second == other.second || (second != null && second.equals(other.second))) &&
-    	(third == other.third || (third != null && third.equals(other.third))) &&
-    	(fourth == other.fourth || (fourth != null && fourth.equals(other.fourth)));
+    return (first == other.first || (first != null && first.equals(other.first)))
+        && (second == other.second || (second != null && second.equals(other.second)))
+        && (third == other.third || (third != null && third.equals(other.third)))
+        && (fourth == other.fourth || (fourth != null && fourth.equals(other.fourth)));
   }
 
   @Override
   public String toString() {
-	StringBuilder sb = new StringBuilder("Tuple4[");
-	sb.append(first).append(",").append(second).append(",").append(third);
-	return sb.append(",").append(fourth).append("]").toString();
+    StringBuilder sb = new StringBuilder("Tuple4[");
+    sb.append(first).append(",").append(second).append(",").append(third);
+    return sb.append(",").append(fourth).append("]").toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/TupleN.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/TupleN.java b/crunch/src/main/java/org/apache/crunch/TupleN.java
index 8e7524c..8c77994 100644
--- a/crunch/src/main/java/org/apache/crunch/TupleN.java
+++ b/crunch/src/main/java/org/apache/crunch/TupleN.java
@@ -40,16 +40,16 @@ public class TupleN implements Tuple {
   public int size() {
     return values.length;
   }
-  
+
   @Override
   public int hashCode() {
-  	HashCodeBuilder hcb = new HashCodeBuilder();
-  	for (Object v : values) {
-  	  hcb.append(v);
-  	}
-  	return hcb.toHashCode();
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    for (Object v : values) {
+      hcb.append(v);
+    }
+    return hcb.toHashCode();
   }
-  
+
   @Override
   public boolean equals(Object obj) {
     if (this == obj)

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java b/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
index f30801d..a41daf4 100644
--- a/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
+++ b/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
@@ -17,40 +17,39 @@
  */
 package org.apache.crunch.fn;
 
-import org.apache.hadoop.conf.Configuration;
-
 import org.apache.crunch.Emitter;
 import org.apache.crunch.MapFn;
+import org.apache.hadoop.conf.Configuration;
 
 public class CompositeMapFn<R, S, T> extends MapFn<R, T> {
-  
+
   private final MapFn<R, S> first;
   private final MapFn<S, T> second;
-  
+
   public CompositeMapFn(MapFn<R, S> first, MapFn<S, T> second) {
     this.first = first;
     this.second = second;
   }
-  
+
   @Override
   public void initialize() {
     first.setContext(getContext());
     second.setContext(getContext());
   }
-  
+
   public MapFn<R, S> getFirst() {
     return first;
   }
-  
+
   public MapFn<S, T> getSecond() {
     return second;
   }
-  
+
   @Override
   public T map(R input) {
     return second.map(first.map(input));
   }
-  
+
   @Override
   public void cleanup(Emitter<T> emitter) {
     first.cleanup(null);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java b/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
index f5056c2..99ce277 100644
--- a/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
+++ b/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
@@ -21,22 +21,22 @@ import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
 
 /**
- * Wrapper function for converting a {@code MapFn} into a key-value pair that
- * is used to convert from a {@code PCollection<V>} to a {@code PTable<K, V>}.
+ * Wrapper function for converting a {@code MapFn} into a key-value pair that is
+ * used to convert from a {@code PCollection<V>} to a {@code PTable<K, V>}.
  */
 public class ExtractKeyFn<K, V> extends MapFn<V, Pair<K, V>> {
-  
+
   private final MapFn<V, K> mapFn;
-  
+
   public ExtractKeyFn(MapFn<V, K> mapFn) {
     this.mapFn = mapFn;
   }
-  
+
   @Override
   public void initialize() {
     this.mapFn.setContext(getContext());
   }
-  
+
   @Override
   public Pair<K, V> map(V input) {
     return Pair.of(mapFn.map(input), input);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/fn/IdentityFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/IdentityFn.java b/crunch/src/main/java/org/apache/crunch/fn/IdentityFn.java
index 3d0abc5..0eadb06 100644
--- a/crunch/src/main/java/org/apache/crunch/fn/IdentityFn.java
+++ b/crunch/src/main/java/org/apache/crunch/fn/IdentityFn.java
@@ -20,7 +20,7 @@ package org.apache.crunch.fn;
 import org.apache.crunch.MapFn;
 
 public class IdentityFn<T> extends MapFn<T, T> {
-  
+
   private static final IdentityFn<Object> INSTANCE = new IdentityFn<Object>();
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/fn/MapKeysFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/MapKeysFn.java b/crunch/src/main/java/org/apache/crunch/fn/MapKeysFn.java
index 45ddf52..cbaf24d 100644
--- a/crunch/src/main/java/org/apache/crunch/fn/MapKeysFn.java
+++ b/crunch/src/main/java/org/apache/crunch/fn/MapKeysFn.java
@@ -22,7 +22,7 @@ import org.apache.crunch.Emitter;
 import org.apache.crunch.Pair;
 
 public abstract class MapKeysFn<K1, K2, V> extends DoFn<Pair<K1, V>, Pair<K2, V>> {
-  
+
   @Override
   public void process(Pair<K1, V> input, Emitter<Pair<K2, V>> emitter) {
     emitter.emit(Pair.of(map(input.first()), input.second()));

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/fn/MapValuesFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/MapValuesFn.java b/crunch/src/main/java/org/apache/crunch/fn/MapValuesFn.java
index 3c7065e..b90f5ff 100644
--- a/crunch/src/main/java/org/apache/crunch/fn/MapValuesFn.java
+++ b/crunch/src/main/java/org/apache/crunch/fn/MapValuesFn.java
@@ -22,7 +22,7 @@ import org.apache.crunch.Emitter;
 import org.apache.crunch.Pair;
 
 public abstract class MapValuesFn<K, V1, V2> extends DoFn<Pair<K, V1>, Pair<K, V2>> {
-  
+
   @Override
   public void process(Pair<K, V1> input, Emitter<Pair<K, V2>> emitter) {
     emitter.emit(Pair.of(input.first(), map(input.second())));

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java b/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
index 2cfd17b..63634ef 100644
--- a/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
+++ b/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
@@ -17,14 +17,13 @@
  */
 package org.apache.crunch.fn;
 
-import org.apache.hadoop.conf.Configuration;
-
 import org.apache.crunch.Emitter;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
+import org.apache.hadoop.conf.Configuration;
 
 public class PairMapFn<K, V, S, T> extends MapFn<Pair<K, V>, Pair<S, T>> {
-  
+
   private MapFn<K, S> keys;
   private MapFn<V, T> values;
 
@@ -38,7 +37,7 @@ public class PairMapFn<K, V, S, T> extends MapFn<Pair<K, V>, Pair<S, T>> {
     keys.configure(conf);
     values.configure(conf);
   }
-  
+
   @Override
   public void initialize() {
     keys.setContext(getContext());
@@ -49,7 +48,7 @@ public class PairMapFn<K, V, S, T> extends MapFn<Pair<K, V>, Pair<S, T>> {
   public Pair<S, T> map(Pair<K, V> input) {
     return Pair.of(keys.map(input.first()), values.map(input.second()));
   }
-  
+
   @Override
   public void cleanup(Emitter<Pair<S, T>> emitter) {
     keys.cleanup(null);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 6305fcb..767524c 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -22,12 +22,6 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
@@ -43,35 +37,40 @@ import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 public class MemPipeline implements Pipeline {
 
   private static final Log LOG = LogFactory.getLog(MemPipeline.class);
-  
+
   private static final MemPipeline INSTANCE = new MemPipeline();
-  
+
   public static Pipeline getInstance() {
     return INSTANCE;
   }
-  
-  public static <T> PCollection<T> collectionOf(T...ts) {
-    return new MemCollection<T>(ImmutableList.copyOf(ts));  
+
+  public static <T> PCollection<T> collectionOf(T... ts) {
+    return new MemCollection<T>(ImmutableList.copyOf(ts));
   }
-  
+
   public static <T> PCollection<T> collectionOf(Iterable<T> collect) {
     return new MemCollection<T>(collect);
   }
-  
+
   public static <T> PCollection<T> typedCollectionOf(PType<T> ptype, T... ts) {
-    return new MemCollection<T>(ImmutableList.copyOf(ts), ptype, null);  
+    return new MemCollection<T>(ImmutableList.copyOf(ts), ptype, null);
   }
-  
+
   public static <T> PCollection<T> typedCollectionOf(PType<T> ptype, Iterable<T> collect) {
-    return new MemCollection<T>(collect, ptype, null);  
+    return new MemCollection<T>(collect, ptype, null);
   }
-  
+
   public static <S, T> PTable<S, T> tableOf(S s, T t, Object... more) {
     List<Pair<S, T>> pairs = Lists.newArrayList();
     pairs.add(Pair.of(s, t));
@@ -80,7 +79,7 @@ public class MemPipeline implements Pipeline {
     }
     return new MemTable<S, T>(pairs);
   }
-  
+
   public static <S, T> PTable<S, T> typedTableOf(PTableType<S, T> ptype, S s, T t, Object... more) {
     List<Pair<S, T>> pairs = Lists.newArrayList();
     pairs.add(Pair.of(s, t));
@@ -89,20 +88,20 @@ public class MemPipeline implements Pipeline {
     }
     return new MemTable<S, T>(pairs, ptype, null);
   }
-  
+
   public static <S, T> PTable<S, T> tableOf(Iterable<Pair<S, T>> pairs) {
     return new MemTable<S, T>(pairs);
   }
-  
+
   public static <S, T> PTable<S, T> typedTableOf(PTableType<S, T> ptype, Iterable<Pair<S, T>> pairs) {
     return new MemTable<S, T>(pairs, ptype, null);
   }
-  
+
   private Configuration conf = new Configuration();
 
   private MemPipeline() {
   }
-  
+
   @Override
   public void setConfiguration(Configuration conf) {
     this.conf = conf;
@@ -199,11 +198,11 @@ public class MemPipeline implements Pipeline {
 
   @Override
   public void enableDebug() {
-	LOG.info("Note: in-memory pipelines do not have debug logging");
+    LOG.info("Note: in-memory pipelines do not have debug logging");
   }
-  
+
   @Override
   public String getName() {
-	  return "Memory Pipeline";
+    return "Memory Pipeline";
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index 7291e50..76c0c7e 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -36,30 +36,30 @@ import org.apache.crunch.test.InMemoryEmitter;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
-
 public class MemCollection<S> implements PCollection<S> {
 
   private final Collection<S> collect;
   private final PType<S> ptype;
   private String name;
-  
+
   public MemCollection(Iterable<S> collect) {
     this(collect, null, null);
   }
-  
+
   public MemCollection(Iterable<S> collect, PType<S> ptype) {
     this(collect, ptype, null);
   }
-  
+
   public MemCollection(Iterable<S> collect, PType<S> ptype, String name) {
     this.collect = ImmutableList.copyOf(collect);
     this.ptype = ptype;
     this.name = name;
   }
-  
+
   @Override
   public Pipeline getPipeline() {
     return MemPipeline.getInstance();
@@ -67,7 +67,7 @@ public class MemCollection<S> implements PCollection<S> {
 
   @Override
   public PCollection<S> union(PCollection<S>... collections) {
-    Collection<S> output = Lists.newArrayList();    
+    Collection<S> output = Lists.newArrayList();
     for (PCollection<S> pcollect : collections) {
       for (S s : pcollect.materialize()) {
         output.add(s);
@@ -99,8 +99,7 @@ public class MemCollection<S> implements PCollection<S> {
   }
 
   @Override
-  public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn,
-      PTableType<K, V> type) {
+  public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type) {
     InMemoryEmitter<Pair<K, V>> emitter = new InMemoryEmitter<Pair<K, V>>();
     doFn.initialize();
     for (S s : collect) {
@@ -124,7 +123,7 @@ public class MemCollection<S> implements PCollection<S> {
   public Collection<S> getCollection() {
     return collect;
   }
-  
+
   @Override
   public PType<S> getPType() {
     return ptype;
@@ -147,7 +146,7 @@ public class MemCollection<S> implements PCollection<S> {
   public String getName() {
     return name;
   }
-  
+
   @Override
   public String toString() {
     return collect.toString();
@@ -155,44 +154,44 @@ public class MemCollection<S> implements PCollection<S> {
 
   @Override
   public PTable<S, Long> count() {
-	return Aggregate.count(this);
+    return Aggregate.count(this);
   }
 
   @Override
   public PCollection<S> sample(double acceptanceProbability) {
-	return Sample.sample(this, acceptanceProbability);
+    return Sample.sample(this, acceptanceProbability);
   }
 
   @Override
   public PCollection<S> sample(double acceptanceProbability, long seed) {
-	return Sample.sample(this, seed, acceptanceProbability);
+    return Sample.sample(this, seed, acceptanceProbability);
   }
 
   @Override
   public PCollection<S> max() {
-	return Aggregate.max(this);
+    return Aggregate.max(this);
   }
 
   @Override
   public PCollection<S> min() {
-	return Aggregate.min(this);
+    return Aggregate.min(this);
   }
 
   @Override
   public PCollection<S> sort(boolean ascending) {
-	return Sort.sort(this, ascending ? Sort.Order.ASCENDING : Sort.Order.DESCENDING);
+    return Sort.sort(this, ascending ? Sort.Order.ASCENDING : Sort.Order.DESCENDING);
   }
 
   @Override
   public PCollection<S> filter(FilterFn<S> filterFn) {
     return parallelDo(filterFn, getPType());
   }
-  
+
   @Override
   public PCollection<S> filter(String name, FilterFn<S> filterFn) {
     return parallelDo(name, filterFn, getPType());
   }
-  
+
   @Override
   public <K> PTable<K, S> by(MapFn<S, K> mapFn, PType<K> keyType) {
     return parallelDo(new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));


Mime
View raw message