incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject [7/10] Format all sources according to formatting profile
Date Sat, 14 Jul 2012 18:14:55 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/lib/Sort.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Sort.java b/crunch/src/main/java/org/apache/crunch/lib/Sort.java
index 5a3f8e9..a8c5760 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Sort.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Sort.java
@@ -26,15 +26,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.io.BinaryData;
 import org.apache.avro.reflect.ReflectData;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.GroupingOptions;
@@ -52,6 +43,15 @@ import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.TupleWritable;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
@@ -62,35 +62,38 @@ import com.google.common.collect.Lists;
  * Utilities for sorting {@code PCollection} instances.
  */
 public class Sort {
-  
+
   public enum Order {
-    ASCENDING, DESCENDING, IGNORE
+    ASCENDING,
+    DESCENDING,
+    IGNORE
   }
-  
+
   /**
    * To sort by column 2 ascending then column 1 descending, you would use:
    * <code>
    * sortPairs(coll, by(2, ASCENDING), by(1, DESCENDING))
-   * </code>
-   * Column numbering is 1-based.
+   * </code> Column numbering is 1-based.
    */
   public static class ColumnOrder {
     int column;
     Order order;
+
     public ColumnOrder(int column, Order order) {
       this.column = column;
       this.order = order;
     }
+
     public static ColumnOrder by(int column, Order order) {
       return new ColumnOrder(column, order);
     }
-    
+
     @Override
     public String toString() {
-      return"ColumnOrder: column:" + column + ", Order: " + order;
+      return "ColumnOrder: column:" + column + ", Order: " + order;
     }
   }
-  
+
   /**
    * Sorts the {@link PCollection} using the natural ordering of its elements.
    * 
@@ -99,10 +102,10 @@ public class Sort {
   public static <T> PCollection<T> sort(PCollection<T> collection) {
     return sort(collection, Order.ASCENDING);
   }
-  
+
   /**
-   * Sorts the {@link PCollection} using the natural ordering of its elements
-   * in the order specified.
+   * Sorts the {@link PCollection} using the natural ordering of its elements in
+   * the order specified.
    * 
    * @return a {@link PCollection} representing the sorted collection.
    */
@@ -110,16 +113,13 @@ public class Sort {
     PTypeFamily tf = collection.getTypeFamily();
     PTableType<T, Void> type = tf.tableOf(collection.getPType(), tf.nulls());
     Configuration conf = collection.getPipeline().getConfiguration();
-    GroupingOptions options = buildGroupingOptions(conf, tf,
-        collection.getPType(), order);
-    PTable<T, Void> pt =
-      collection.parallelDo("sort-pre", new DoFn<T, Pair<T, Void>>() {
-        @Override
-        public void process(T input,
-            Emitter<Pair<T, Void>> emitter) {
-          emitter.emit(Pair.of(input, (Void) null));
-        }
-      }, type);
+    GroupingOptions options = buildGroupingOptions(conf, tf, collection.getPType(), order);
+    PTable<T, Void> pt = collection.parallelDo("sort-pre", new DoFn<T, Pair<T, Void>>() {
+      @Override
+      public void process(T input, Emitter<Pair<T, Void>> emitter) {
+        emitter.emit(Pair.of(input, (Void) null));
+      }
+    }, type);
     PTable<T, Void> sortedPt = pt.groupByKey(options).ungroup();
     return sortedPt.parallelDo("sort-post", new DoFn<Pair<T, Void>, T>() {
       @Override
@@ -128,7 +128,6 @@ public class Sort {
       }
     }, collection.getPType());
   }
-  
 
   /**
    * Sorts the {@link PTable} using the natural ordering of its keys.
@@ -140,8 +139,8 @@ public class Sort {
   }
 
   /**
-   * Sorts the {@link PTable} using the natural ordering of its keys
-   * in the order specified.
+   * Sorts the {@link PTable} using the natural ordering of its keys in the
+   * order specified.
    * 
    * @return a {@link PTable} representing the sorted collection.
    */
@@ -151,37 +150,34 @@ public class Sort {
     GroupingOptions options = buildGroupingOptions(conf, tf, table.getKeyType(), key);
     return table.groupByKey(options).ungroup();
   }
-  
+
   /**
    * Sorts the {@link PCollection} of {@link Pair}s using the specified column
    * ordering.
    * 
    * @return a {@link PCollection} representing the sorted collection.
    */
-  public static <U, V> PCollection<Pair<U, V>> sortPairs(
-      PCollection<Pair<U, V>> collection, ColumnOrder... columnOrders) {
-    // put U and V into a pair/tuple in the key so we can do grouping and sorting
+  public static <U, V> PCollection<Pair<U, V>> sortPairs(PCollection<Pair<U, V>> collection,
+      ColumnOrder... columnOrders) {
+    // put U and V into a pair/tuple in the key so we can do grouping and
+    // sorting
     PTypeFamily tf = collection.getTypeFamily();
     PType<Pair<U, V>> pType = collection.getPType();
     @SuppressWarnings("unchecked")
-    PTableType<Pair<U, V>, Void> type = tf.tableOf(
-        tf.pairs(pType.getSubTypes().get(0), pType.getSubTypes().get(1)),
+    PTableType<Pair<U, V>, Void> type = tf.tableOf(tf.pairs(pType.getSubTypes().get(0), pType.getSubTypes().get(1)),
         tf.nulls());
-    PTable<Pair<U, V>, Void> pt =
-      collection.parallelDo(new DoFn<Pair<U, V>, Pair<Pair<U, V>, Void>>() {
-        @Override
-        public void process(Pair<U, V> input,
-            Emitter<Pair<Pair<U, V>, Void>> emitter) {
-          emitter.emit(Pair.of(input, (Void) null));
-        }
-      }, type);
+    PTable<Pair<U, V>, Void> pt = collection.parallelDo(new DoFn<Pair<U, V>, Pair<Pair<U, V>, Void>>() {
+      @Override
+      public void process(Pair<U, V> input, Emitter<Pair<Pair<U, V>, Void>> emitter) {
+        emitter.emit(Pair.of(input, (Void) null));
+      }
+    }, type);
     Configuration conf = collection.getPipeline().getConfiguration();
     GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
     PTable<Pair<U, V>, Void> sortedPt = pt.groupByKey(options).ungroup();
-    return sortedPt.parallelDo(new DoFn<Pair<Pair<U, V>,Void>, Pair<U, V>>() {
+    return sortedPt.parallelDo(new DoFn<Pair<Pair<U, V>, Void>, Pair<U, V>>() {
       @Override
-      public void process(Pair<Pair<U, V>, Void> input,
-          Emitter<Pair<U, V>> emitter) {
+      public void process(Pair<Pair<U, V>, Void> input, Emitter<Pair<U, V>> emitter) {
         emitter.emit(input.first());
       }
     }, collection.getPType());
@@ -193,29 +189,26 @@ public class Sort {
    * 
    * @return a {@link PCollection} representing the sorted collection.
    */
-  public static <V1, V2, V3> PCollection<Tuple3<V1, V2, V3>> sortTriples(
-      PCollection<Tuple3<V1, V2, V3>> collection, ColumnOrder... columnOrders) {
+  public static <V1, V2, V3> PCollection<Tuple3<V1, V2, V3>> sortTriples(PCollection<Tuple3<V1, V2, V3>> collection,
+      ColumnOrder... columnOrders) {
     PTypeFamily tf = collection.getTypeFamily();
     PType<Tuple3<V1, V2, V3>> pType = collection.getPType();
     @SuppressWarnings("unchecked")
     PTableType<Tuple3<V1, V2, V3>, Void> type = tf.tableOf(
-        tf.triples(pType.getSubTypes().get(0), pType.getSubTypes().get(1), pType.getSubTypes().get(2)),
-        tf.nulls());
-    PTable<Tuple3<V1, V2, V3>, Void> pt =
-      collection.parallelDo(new DoFn<Tuple3<V1, V2, V3>, Pair<Tuple3<V1, V2, V3>, Void>>() {
-        @Override
-        public void process(Tuple3<V1, V2, V3> input,
-            Emitter<Pair<Tuple3<V1, V2, V3>, Void>> emitter) {
-          emitter.emit(Pair.of(input, (Void) null));
-        }
-      }, type);
+        tf.triples(pType.getSubTypes().get(0), pType.getSubTypes().get(1), pType.getSubTypes().get(2)), tf.nulls());
+    PTable<Tuple3<V1, V2, V3>, Void> pt = collection.parallelDo(
+        new DoFn<Tuple3<V1, V2, V3>, Pair<Tuple3<V1, V2, V3>, Void>>() {
+          @Override
+          public void process(Tuple3<V1, V2, V3> input, Emitter<Pair<Tuple3<V1, V2, V3>, Void>> emitter) {
+            emitter.emit(Pair.of(input, (Void) null));
+          }
+        }, type);
     Configuration conf = collection.getPipeline().getConfiguration();
     GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
     PTable<Tuple3<V1, V2, V3>, Void> sortedPt = pt.groupByKey(options).ungroup();
-    return sortedPt.parallelDo(new DoFn<Pair<Tuple3<V1, V2, V3>,Void>, Tuple3<V1, V2, V3>>() {
+    return sortedPt.parallelDo(new DoFn<Pair<Tuple3<V1, V2, V3>, Void>, Tuple3<V1, V2, V3>>() {
       @Override
-      public void process(Pair<Tuple3<V1, V2, V3>, Void> input,
-          Emitter<Tuple3<V1, V2, V3>> emitter) {
+      public void process(Pair<Tuple3<V1, V2, V3>, Void> input, Emitter<Tuple3<V1, V2, V3>> emitter) {
         emitter.emit(input.first());
       }
     }, collection.getPType());
@@ -232,24 +225,21 @@ public class Sort {
     PTypeFamily tf = collection.getTypeFamily();
     PType<Tuple4<V1, V2, V3, V4>> pType = collection.getPType();
     @SuppressWarnings("unchecked")
-    PTableType<Tuple4<V1, V2, V3, V4>, Void> type = tf.tableOf(
-        tf.quads(pType.getSubTypes().get(0), pType.getSubTypes().get(1), pType.getSubTypes().get(2),  pType.getSubTypes().get(3)),
-        tf.nulls());
-    PTable<Tuple4<V1, V2, V3, V4>, Void> pt =
-      collection.parallelDo(new DoFn<Tuple4<V1, V2, V3, V4>, Pair<Tuple4<V1, V2, V3, V4>, Void>>() {
-        @Override
-        public void process(Tuple4<V1, V2, V3, V4> input,
-            Emitter<Pair<Tuple4<V1, V2, V3, V4>, Void>> emitter) {
-          emitter.emit(Pair.of(input, (Void) null));
-        }
-      }, type);
+    PTableType<Tuple4<V1, V2, V3, V4>, Void> type = tf.tableOf(tf.quads(pType.getSubTypes().get(0), pType.getSubTypes()
+        .get(1), pType.getSubTypes().get(2), pType.getSubTypes().get(3)), tf.nulls());
+    PTable<Tuple4<V1, V2, V3, V4>, Void> pt = collection.parallelDo(
+        new DoFn<Tuple4<V1, V2, V3, V4>, Pair<Tuple4<V1, V2, V3, V4>, Void>>() {
+          @Override
+          public void process(Tuple4<V1, V2, V3, V4> input, Emitter<Pair<Tuple4<V1, V2, V3, V4>, Void>> emitter) {
+            emitter.emit(Pair.of(input, (Void) null));
+          }
+        }, type);
     Configuration conf = collection.getPipeline().getConfiguration();
     GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
     PTable<Tuple4<V1, V2, V3, V4>, Void> sortedPt = pt.groupByKey(options).ungroup();
-    return sortedPt.parallelDo(new DoFn<Pair<Tuple4<V1, V2, V3, V4>,Void>, Tuple4<V1, V2, V3, V4>>() {
+    return sortedPt.parallelDo(new DoFn<Pair<Tuple4<V1, V2, V3, V4>, Void>, Tuple4<V1, V2, V3, V4>>() {
       @Override
-      public void process(Pair<Tuple4<V1, V2, V3, V4>, Void> input,
-          Emitter<Tuple4<V1, V2, V3, V4>> emitter) {
+      public void process(Pair<Tuple4<V1, V2, V3, V4>, Void> input, Emitter<Tuple4<V1, V2, V3, V4>> emitter) {
         emitter.emit(input.first());
       }
     }, collection.getPType());
@@ -261,36 +251,30 @@ public class Sort {
    * 
    * @return a {@link PCollection} representing the sorted collection.
    */
-  public static PCollection<TupleN> sortTuples(PCollection<TupleN> collection,
-      ColumnOrder... columnOrders) {
+  public static PCollection<TupleN> sortTuples(PCollection<TupleN> collection, ColumnOrder... columnOrders) {
     PTypeFamily tf = collection.getTypeFamily();
     PType<TupleN> pType = collection.getPType();
-    PTableType<TupleN, Void> type = tf.tableOf(
-        tf.tuples(pType.getSubTypes().toArray(new PType[0])),
-        tf.nulls());
-    PTable<TupleN, Void> pt =
-      collection.parallelDo(new DoFn<TupleN, Pair<TupleN, Void>>() {
-        @Override
-        public void process(TupleN input,
-            Emitter<Pair<TupleN, Void>> emitter) {
-          emitter.emit(Pair.of(input, (Void) null));
-        }
-      }, type);
+    PTableType<TupleN, Void> type = tf.tableOf(tf.tuples(pType.getSubTypes().toArray(new PType[0])), tf.nulls());
+    PTable<TupleN, Void> pt = collection.parallelDo(new DoFn<TupleN, Pair<TupleN, Void>>() {
+      @Override
+      public void process(TupleN input, Emitter<Pair<TupleN, Void>> emitter) {
+        emitter.emit(Pair.of(input, (Void) null));
+      }
+    }, type);
     Configuration conf = collection.getPipeline().getConfiguration();
     GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
     PTable<TupleN, Void> sortedPt = pt.groupByKey(options).ungroup();
-    return sortedPt.parallelDo(new DoFn<Pair<TupleN,Void>, TupleN>() {
+    return sortedPt.parallelDo(new DoFn<Pair<TupleN, Void>, TupleN>() {
       @Override
-      public void process(Pair<TupleN, Void> input,
-          Emitter<TupleN> emitter) {
+      public void process(Pair<TupleN, Void> input, Emitter<TupleN> emitter) {
         emitter.emit(input.first());
       }
     }, collection.getPType());
   }
-  
+
   // TODO: move to type family?
-  private static <T> GroupingOptions buildGroupingOptions(Configuration conf,
-      PTypeFamily tf, PType<T> ptype, Order order) {
+  private static <T> GroupingOptions buildGroupingOptions(Configuration conf, PTypeFamily tf, PType<T> ptype,
+      Order order) {
     Builder builder = GroupingOptions.builder();
     if (order == Order.DESCENDING) {
       if (tf == WritableTypeFamily.getInstance()) {
@@ -306,9 +290,9 @@ public class Sort {
     }
     return builder.build();
   }
-  
-  private static <T> GroupingOptions buildGroupingOptions(Configuration conf,
-      PTypeFamily tf, PType<T> ptype, ColumnOrder[] columnOrders) {
+
+  private static <T> GroupingOptions buildGroupingOptions(Configuration conf, PTypeFamily tf, PType<T> ptype,
+      ColumnOrder[] columnOrders) {
     Builder builder = GroupingOptions.builder();
     if (tf == WritableTypeFamily.getInstance()) {
       TupleWritableComparator.configureOrdering(conf, columnOrders);
@@ -321,25 +305,23 @@ public class Sort {
     }
     return builder.build();
   }
-  
+
   static class ReverseWritableComparator<T> extends Configured implements RawComparator<T> {
-    
+
     RawComparator<T> comparator;
-    
+
     @SuppressWarnings("unchecked")
     @Override
     public void setConf(Configuration conf) {
       super.setConf(conf);
       if (conf != null) {
         JobConf jobConf = new JobConf(conf);
-        comparator = WritableComparator.get(
-            jobConf.getMapOutputKeyClass().asSubclass(WritableComparable.class));
+        comparator = WritableComparator.get(jobConf.getMapOutputKeyClass().asSubclass(WritableComparable.class));
       }
     }
 
     @Override
-    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4,
-        int arg5) {
+    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
       return -comparator.compare(arg0, arg1, arg2, arg3, arg4, arg5);
     }
 
@@ -349,11 +331,11 @@ public class Sort {
     }
 
   }
-  
+
   static class ReverseAvroComparator<T> extends Configured implements RawComparator<T> {
 
     Schema schema;
-    
+
     @Override
     public void setConf(Configuration conf) {
       super.setConf(conf);
@@ -368,47 +350,43 @@ public class Sort {
     }
 
     @Override
-    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4,
-        int arg5) {
+    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
       return -BinaryData.compare(arg0, arg1, arg2, arg3, arg4, arg5, schema);
     }
-    
+
   }
-  
+
   static class TupleWritableComparator extends WritableComparator implements Configurable {
-    
+
     private static final String CRUNCH_ORDERING_PROPERTY = "crunch.ordering";
-    
+
     Configuration conf;
     ColumnOrder[] columnOrders;
-    
+
     public TupleWritableComparator() {
       super(TupleWritable.class, true);
     }
-    
+
     public static void configureOrdering(Configuration conf, Order... orders) {
-      conf.set(CRUNCH_ORDERING_PROPERTY, Joiner.on(",").join(
-        Iterables.transform(Arrays.asList(orders),
-          new Function<Order, String>() {
-        @Override
-        public String apply(Order o) {
-          return o.name();
-        }
-      })));
+      conf.set(CRUNCH_ORDERING_PROPERTY,
+          Joiner.on(",").join(Iterables.transform(Arrays.asList(orders), new Function<Order, String>() {
+            @Override
+            public String apply(Order o) {
+              return o.name();
+            }
+          })));
     }
 
-    
     public static void configureOrdering(Configuration conf, ColumnOrder... columnOrders) {
-      conf.set(CRUNCH_ORDERING_PROPERTY, Joiner.on(",").join(
-        Iterables.transform(Arrays.asList(columnOrders),
-          new Function<ColumnOrder, String>() {
-        @Override
-        public String apply(ColumnOrder o) {
-          return o.column + ";" + o.order.name();
-        }
-      })));
+      conf.set(CRUNCH_ORDERING_PROPERTY,
+          Joiner.on(",").join(Iterables.transform(Arrays.asList(columnOrders), new Function<ColumnOrder, String>() {
+            @Override
+            public String apply(ColumnOrder o) {
+              return o.column + ";" + o.order.name();
+            }
+          })));
     }
-    
+
     @Override
     public int compare(WritableComparable a, WritableComparable b) {
       TupleWritable ta = (TupleWritable) a;
@@ -418,7 +396,7 @@ public class Sort {
         int order = 1;
         if (columnOrders[i].order == Order.ASCENDING) {
           order = 1;
-        } else  if (columnOrders[i].order == Order.DESCENDING) {
+        } else if (columnOrders[i].order == Order.DESCENDING) {
           order = -1;
         } else { // ignore
           continue;
@@ -433,10 +411,8 @@ public class Sort {
           Writable v1 = ta.get(index);
           Writable v2 = tb.get(index);
           if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
-            if (v1 instanceof WritableComparable
-                && v2 instanceof WritableComparable) {
-              int cmp = ((WritableComparable) v1)
-                  .compareTo((WritableComparable) v2);
+            if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
+              int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2);
               if (cmp != 0) {
                 return order * cmp;
               }
@@ -469,16 +445,16 @@ public class Sort {
           int column = Integer.parseInt(split[0]);
           Order order = Order.valueOf(split[1]);
           columnOrders[i] = ColumnOrder.by(column, order);
-          
+
         }
       }
     }
   }
-  
+
   static class TupleAvroComparator<T> extends Configured implements RawComparator<T> {
 
     Schema schema;
-    
+
     @Override
     public void setConf(Configuration conf) {
       super.setConf(conf);
@@ -487,15 +463,15 @@ public class Sort {
       }
     }
 
-    public static <S> void configureOrdering(Configuration conf, ColumnOrder[] columnOrders,
-        PType<S> ptype) {
+    public static <S> void configureOrdering(Configuration conf, ColumnOrder[] columnOrders, PType<S> ptype) {
       Schema orderedSchema = createOrderedTupleSchema(ptype, columnOrders);
       conf.set("crunch.schema", orderedSchema.toString());
     }
-    
+
     // TODO: move to Avros
-    // TODO: need to re-order columns in map output then switch back in the reduce
-    //       this will require more extensive changes in Crunch
+    // TODO: need to re-order columns in map output then switch back in the
+    // reduce
+    // this will require more extensive changes in Crunch
     private static <S> Schema createOrderedTupleSchema(PType<S> ptype, ColumnOrder[] orders) {
       // Guarantee each tuple schema has a globally unique name
       String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
@@ -503,17 +479,16 @@ public class Sort {
       List<Schema.Field> fields = Lists.newArrayList();
       AvroType<S> parentAvroType = (AvroType<S>) ptype;
       Schema parentAvroSchema = parentAvroType.getSchema();
-      
+
       BitSet orderedColumns = new BitSet();
       // First add any fields specified by ColumnOrder
       for (ColumnOrder columnOrder : orders) {
         int index = columnOrder.column - 1;
         AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(index);
-        Schema fieldSchema = Schema.createUnion(
-            ImmutableList.of(atype.getSchema(), Schema.create(Type.NULL)));
+        Schema fieldSchema = Schema.createUnion(ImmutableList.of(atype.getSchema(), Schema.create(Type.NULL)));
         String fieldName = parentAvroSchema.getFields().get(index).name();
-        fields.add(new Schema.Field(fieldName, fieldSchema, "", null,
-            Schema.Field.Order.valueOf(columnOrder.order.name())));
+        fields.add(new Schema.Field(fieldName, fieldSchema, "", null, Schema.Field.Order.valueOf(columnOrder.order
+            .name())));
         orderedColumns.set(index);
       }
       // Then add remaining fields from the ptypes, with no sort order
@@ -522,11 +497,9 @@ public class Sort {
           continue;
         }
         AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(i);
-        Schema fieldSchema = Schema.createUnion(
-            ImmutableList.of(atype.getSchema(), Schema.create(Type.NULL)));
+        Schema fieldSchema = Schema.createUnion(ImmutableList.of(atype.getSchema(), Schema.create(Type.NULL)));
         String fieldName = parentAvroSchema.getFields().get(i).name();
-        fields.add(new Schema.Field(fieldName, fieldSchema, "", null,
-            Schema.Field.Order.IGNORE));
+        fields.add(new Schema.Field(fieldName, fieldSchema, "", null, Schema.Field.Order.IGNORE));
       }
       schema.setFields(fields);
       return schema;
@@ -538,10 +511,9 @@ public class Sort {
     }
 
     @Override
-    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4,
-        int arg5) {
+    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
       return BinaryData.compare(arg0, arg1, arg2, arg3, arg4, arg5, schema);
     }
-    
+
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
index 4f0d5cc..8e8737c 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java
@@ -22,17 +22,21 @@ import java.util.List;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.Pair;
 import org.apache.crunch.types.PType;
+
 import com.google.common.collect.Lists;
 
 /**
  * Used to perform the last step of an full outer join.
- *
- * @param <K> Type of the keys.
- * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ * 
+ * @param <K>
+ *          Type of the keys.
+ * @param <U>
+ *          Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V>
+ *          Type of the second {@link org.apache.crunch.PTable}'s values
  */
 public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
-  
+
   private transient int lastId;
   private transient K lastKey;
   private transient List<U> leftValues;
@@ -51,8 +55,7 @@ public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
 
   /** {@inheritDoc} */
   @Override
-  public void join(K key, int id, Iterable<Pair<U, V>> pairs,
-      Emitter<Pair<K, Pair<U, V>>> emitter) {
+  public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
     if (!key.equals(lastKey)) {
       // Make sure that left side gets emitted.
       if (0 == lastId && 0 == id) {
@@ -95,5 +98,7 @@ public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
 
   /** {@inheritDoc} */
   @Override
-  public String getJoinType() { return "fullOuterJoin"; }
+  public String getJoinType() {
+    return "fullOuterJoin";
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
index 0c46bdf..11ddeaf 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java
@@ -22,20 +22,24 @@ import java.util.List;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.Pair;
 import org.apache.crunch.types.PType;
+
 import com.google.common.collect.Lists;
 
 /**
  * Used to perform the last step of an inner join.
- *
- * @param <K> Type of the keys.
- * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ * 
+ * @param <K>
+ *          Type of the keys.
+ * @param <U>
+ *          Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V>
+ *          Type of the second {@link org.apache.crunch.PTable}'s values
  */
 public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> {
 
   private transient K lastKey;
   private transient List<U> leftValues;
-  
+
   public InnerJoinFn(PType<U> leftValueType) {
     super(leftValueType);
   }
@@ -49,8 +53,7 @@ public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> {
 
   /** {@inheritDoc} */
   @Override
-  public void join(K key, int id, Iterable<Pair<U, V>> pairs,
-      Emitter<Pair<K, Pair<U, V>>> emitter) {
+  public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
     if (!key.equals(lastKey)) {
       lastKey = key;
       leftValues.clear();
@@ -71,5 +74,7 @@ public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> {
 
   /** {@inheritDoc} */
   @Override
-  public String getJoinType() { return "innerJoin"; }
+  public String getJoinType() {
+    return "innerJoin";
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
index 494031c..7ecaaf4 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java
@@ -24,14 +24,16 @@ import org.apache.crunch.types.PType;
 
 /**
  * Represents a {@link org.apache.crunch.DoFn} for performing joins.
- *
- * @param <K> Type of the keys.
- * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ * 
+ * @param <K>
+ *          Type of the keys.
+ * @param <U>
+ *          Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V>
+ *          Type of the second {@link org.apache.crunch.PTable}'s values
  */
-public abstract class JoinFn<K, U, V>
-    extends DoFn<Pair<Pair<K, Integer>, Iterable<Pair<U, V>>>, Pair<K, Pair<U, V>>> {
-  
+public abstract class JoinFn<K, U, V> extends DoFn<Pair<Pair<K, Integer>, Iterable<Pair<U, V>>>, Pair<K, Pair<U, V>>> {
+
   protected PType<U> leftValueType;
 
   /**
@@ -50,24 +52,29 @@ public abstract class JoinFn<K, U, V>
 
   /**
    * Performs the actual joining.
-   *
-   * @param key The key for this grouping of values.
-   * @param id The side that this group of values is from (0 -> left, 1 -> right).
-   * @param pairs The group of values associated with this key and id pair.
-   * @param emitter The emitter to send the output to.
+   * 
+   * @param key
+   *          The key for this grouping of values.
+   * @param id
+   *          The side that this group of values is from (0 -> left, 1 ->
+   *          right).
+   * @param pairs
+   *          The group of values associated with this key and id pair.
+   * @param emitter
+   *          The emitter to send the output to.
    */
-  public abstract void join(K key, int id, Iterable<Pair<U, V>> pairs,
-      Emitter<Pair<K, Pair<U, V>>> emitter);
+  public abstract void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter);
 
   /**
    * Split up the input record to make coding a bit more manageable.
-   *
-   * @param input The input record.
-   * @param emitter The emitter to send the output to.
+   * 
+   * @param input
+   *          The input record.
+   * @param emitter
+   *          The emitter to send the output to.
    */
   @Override
-  public void process(Pair<Pair<K, Integer>, Iterable<Pair<U, V>>> input,
-      Emitter<Pair<K, Pair<U, V>>> emitter) {
+  public void process(Pair<Pair<K, Integer>, Iterable<Pair<U, V>>> input, Emitter<Pair<K, Pair<U, V>>> emitter) {
     join(input.first().first(), input.first().second(), input.second(), emitter);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/JoinUtils.java b/crunch/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
index c282642..6efeccb 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
@@ -25,6 +25,9 @@ import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
 import org.apache.avro.mapred.AvroWrapper;
 import org.apache.avro.reflect.ReflectData;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.writable.TupleWritable;
+import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -33,13 +36,9 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Partitioner;
 
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.writable.TupleWritable;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-
 /**
  * Utilities that are useful in joining multiple data sets via a MapReduce.
- *
+ * 
  */
 public class JoinUtils {
 
@@ -50,7 +49,7 @@ public class JoinUtils {
       return AvroIndexedRecordPartitioner.class;
     }
   }
-  
+
   public static Class<? extends RawComparator> getGroupingComparator(PTypeFamily typeFamily) {
     if (typeFamily == WritableTypeFamily.getInstance()) {
       return TupleWritableComparator.class;
@@ -58,23 +57,23 @@ public class JoinUtils {
       return AvroPairGroupingComparator.class;
     }
   }
-  
+
   public static class TupleWritablePartitioner extends Partitioner<TupleWritable, Writable> {
     @Override
     public int getPartition(TupleWritable key, Writable value, int numPartitions) {
       return (Math.abs(key.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
     }
   }
-  
+
   public static class TupleWritableComparator implements RawComparator<TupleWritable> {
-    
+
     private DataInputBuffer buffer = new DataInputBuffer();
     private TupleWritable key1 = new TupleWritable();
     private TupleWritable key2 = new TupleWritable();
-    
+
     @Override
     public int compare(TupleWritable o1, TupleWritable o2) {
-      return ((WritableComparable)o1.get(0)).compareTo((WritableComparable)o2.get(0));
+      return ((WritableComparable) o1.get(0)).compareTo((WritableComparable) o2.get(0));
     }
 
     @Override
@@ -82,17 +81,17 @@ public class JoinUtils {
       try {
         buffer.reset(b1, s1, l1);
         key1.readFields(buffer);
-      
+
         buffer.reset(b2, s2, l2);
         key2.readFields(buffer);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
-      
+
       return compare(key1, key2);
     }
   }
-  
+
   public static class AvroIndexedRecordPartitioner<K, V> extends Partitioner<AvroKey<K>, AvroValue<V>> {
     @Override
     public int getPartition(AvroKey<K> key, AvroValue<V> value, int numPartitions) {
@@ -100,7 +99,7 @@ public class JoinUtils {
       return (Math.abs(record.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
     }
   }
-  
+
   public static class AvroPairGroupingComparator<T> extends Configured implements RawComparator<AvroWrapper<T>> {
     private Schema schema;
 
@@ -113,7 +112,7 @@ public class JoinUtils {
         schema = keySchema.getFields().get(0).schema();
       }
     }
-    
+
     @Override
     public int compare(AvroWrapper<T> x, AvroWrapper<T> y) {
       return ReflectData.get().compare(x.datum(), y.datum(), schema);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
index 6e4d3c6..8da372b 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java
@@ -22,17 +22,21 @@ import java.util.List;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.Pair;
 import org.apache.crunch.types.PType;
+
 import com.google.common.collect.Lists;
 
 /**
  * Used to perform the last step of an left outer join.
- *
- * @param <K> Type of the keys.
- * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ * 
+ * @param <K>
+ *          Type of the keys.
+ * @param <U>
+ *          Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V>
+ *          Type of the second {@link org.apache.crunch.PTable}'s values
  */
 public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
-  
+
   private transient int lastId;
   private transient K lastKey;
   private transient List<U> leftValues;
@@ -51,8 +55,7 @@ public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
 
   /** {@inheritDoc} */
   @Override
-  public void join(K key, int id, Iterable<Pair<U, V>> pairs,
-      Emitter<Pair<K, Pair<U, V>>> emitter) {
+  public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
     if (!key.equals(lastKey)) {
       // Make sure that left side always gets emitted.
       if (0 == lastId && 0 == id) {
@@ -91,5 +94,7 @@ public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
 
   /** {@inheritDoc} */
   @Override
-  public String getJoinType() { return "leftOuterJoin"; }
+  public String getJoinType() {
+    return "leftOuterJoin";
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
index 28a0829..226ad90 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/MapsideJoin.java
@@ -19,10 +19,6 @@ package org.apache.crunch.lib.join;
 
 import java.io.IOException;
 
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.PTable;
@@ -33,6 +29,10 @@ import org.apache.crunch.io.ReadableSourceTarget;
 import org.apache.crunch.io.impl.SourcePathTargetImpl;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 
@@ -76,8 +76,7 @@ public class MapsideJoin {
     // optimize his by running the setup of multiple map-side joins concurrently
     pipeline.run();
 
-    ReadableSourceTarget<Pair<K, V>> readableSourceTarget = pipeline
-        .getMaterializeSourceTarget(right);
+    ReadableSourceTarget<Pair<K, V>> readableSourceTarget = pipeline.getMaterializeSourceTarget(right);
     if (!(readableSourceTarget instanceof SourcePathTargetImpl)) {
       throw new CrunchRuntimeException("Right-side contents can't be read from a path");
     }
@@ -89,14 +88,10 @@ public class MapsideJoin {
     Path path = sourcePathTarget.getPath();
     DistributedCache.addCacheFile(path.toUri(), pipeline.getConfiguration());
 
-    MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(path.toString(),
-        right.getPType());
+    MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U, V>(path.toString(), right.getPType());
     PTypeFamily typeFamily = left.getTypeFamily();
-    return left.parallelDo(
-        "mapjoin",
-        mapJoinDoFn,
-        typeFamily.tableOf(left.getKeyType(),
-            typeFamily.pairs(left.getValueType(), right.getValueType())));
+    return left.parallelDo("mapjoin", mapJoinDoFn,
+        typeFamily.tableOf(left.getKeyType(), typeFamily.pairs(left.getValueType(), right.getValueType())));
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
index a7b2a0d..15a8930 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java
@@ -22,14 +22,18 @@ import java.util.List;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.Pair;
 import org.apache.crunch.types.PType;
+
 import com.google.common.collect.Lists;
 
 /**
  * Used to perform the last step of an right outer join.
- *
- * @param <K> Type of the keys.
- * @param <U> Type of the first {@link org.apache.crunch.PTable}'s values
- * @param <V> Type of the second {@link org.apache.crunch.PTable}'s values
+ * 
+ * @param <K>
+ *          Type of the keys.
+ * @param <U>
+ *          Type of the first {@link org.apache.crunch.PTable}'s values
+ * @param <V>
+ *          Type of the second {@link org.apache.crunch.PTable}'s values
  */
 public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
 
@@ -49,8 +53,7 @@ public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
 
   /** {@inheritDoc} */
   @Override
-  public void join(K key, int id, Iterable<Pair<U, V>> pairs,
-      Emitter<Pair<K, Pair<U, V>>> emitter) {
+  public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) {
     if (!key.equals(lastKey)) {
       lastKey = key;
       leftValues.clear();
@@ -76,5 +79,7 @@ public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
 
   /** {@inheritDoc} */
   @Override
-  public String getJoinType() { return "rightOuterJoin"; }
+  public String getJoinType() {
+    return "rightOuterJoin";
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
index c713d90..3830616 100644
--- a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
+++ b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableIterable.java
@@ -22,7 +22,6 @@ import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
 import org.apache.crunch.io.ReadableSourceTarget;
@@ -30,21 +29,21 @@ import org.apache.crunch.io.ReadableSourceTarget;
 public class MaterializableIterable<E> implements Iterable<E> {
 
   private static final Log LOG = LogFactory.getLog(MaterializableIterable.class);
-  
+
   private final Pipeline pipeline;
   private final ReadableSourceTarget<E> sourceTarget;
   private Iterable<E> materialized;
-  
+
   public MaterializableIterable(Pipeline pipeline, ReadableSourceTarget<E> source) {
-	this.pipeline = pipeline;
-	this.sourceTarget = source;
-	this.materialized = null;
+    this.pipeline = pipeline;
+    this.sourceTarget = source;
+    this.materialized = null;
   }
-  
+
   public ReadableSourceTarget<E> getSourceTarget() {
     return sourceTarget;
   }
-  
+
   @Override
   public Iterator<E> iterator() {
     if (materialized == null) {
@@ -55,11 +54,11 @@ public class MaterializableIterable<E> implements Iterable<E> {
   }
 
   public void materialize() {
-	try {
-	  materialized = sourceTarget.read(pipeline.getConfiguration());
-	} catch (IOException e) {
-	  LOG.error("Could not materialize: " + sourceTarget, e);
-	  throw new CrunchRuntimeException(e);
-	}	
+    try {
+      materialized = sourceTarget.read(pipeline.getConfiguration());
+    } catch (IOException e) {
+      LOG.error("Could not materialize: " + sourceTarget, e);
+      throw new CrunchRuntimeException(e);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableMap.java b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
index e751f50..69082e2 100644
--- a/crunch/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
+++ b/crunch/src/main/java/org/apache/crunch/materialize/MaterializableMap.java
@@ -25,14 +25,14 @@ import java.util.Set;
 import org.apache.crunch.Pair;
 
 public class MaterializableMap<K, V> extends AbstractMap<K, V> {
-  
+
   private Iterable<Pair<K, V>> iterable;
   private Set<Map.Entry<K, V>> entrySet;
-  
+
   public MaterializableMap(Iterable<Pair<K, V>> iterable) {
-  	this.iterable = iterable;
+    this.iterable = iterable;
   }
-  
+
   private Set<Map.Entry<K, V>> toMapEntries(Iterable<Pair<K, V>> xs) {
     HashMap<K, V> m = new HashMap<K, V>();
     for (Pair<K, V> x : xs)

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/test/FileHelper.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/test/FileHelper.java b/crunch/src/main/java/org/apache/crunch/test/FileHelper.java
index 7f5d0c7..ca5a4b3 100644
--- a/crunch/src/main/java/org/apache/crunch/test/FileHelper.java
+++ b/crunch/src/main/java/org/apache/crunch/test/FileHelper.java
@@ -28,12 +28,12 @@ import com.google.common.io.Files;
 public class FileHelper {
 
   public static String createTempCopyOf(String fileResource) throws IOException {
-	File tmpFile = File.createTempFile("tmp", "");
-	tmpFile.deleteOnExit();
-	Files.copy(newInputStreamSupplier(getResource(fileResource)), tmpFile);
-	return tmpFile.getAbsolutePath();
+    File tmpFile = File.createTempFile("tmp", "");
+    tmpFile.deleteOnExit();
+    Files.copy(newInputStreamSupplier(getResource(fileResource)), tmpFile);
+    return tmpFile.getAbsolutePath();
   }
-  
+
   public static File createOutputPath() throws IOException {
     File output = File.createTempFile("output", "");
     output.delete();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/test/InMemoryEmitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/test/InMemoryEmitter.java b/crunch/src/main/java/org/apache/crunch/test/InMemoryEmitter.java
index 46e90ae..1e0acb9 100644
--- a/crunch/src/main/java/org/apache/crunch/test/InMemoryEmitter.java
+++ b/crunch/src/main/java/org/apache/crunch/test/InMemoryEmitter.java
@@ -20,25 +20,27 @@ package org.apache.crunch.test;
 import java.util.List;
 
 import org.apache.crunch.Emitter;
+
 import com.google.common.collect.Lists;
 
 /**
- * An {@code Emitter} instance that writes emitted records to a backing {@code List}.
- *
+ * An {@code Emitter} instance that writes emitted records to a backing
+ * {@code List}.
+ * 
  * @param <T>
  */
 public class InMemoryEmitter<T> implements Emitter<T> {
-  
+
   private final List<T> output;
-  
+
   public InMemoryEmitter() {
-    this(Lists.<T>newArrayList());
+    this(Lists.<T> newArrayList());
   }
-  
+
   public InMemoryEmitter(List<T> output) {
     this.output = output;
   }
-  
+
   @Override
   public void emit(T emitted) {
     output.add(emitted);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/test/TestCounters.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/test/TestCounters.java b/crunch/src/main/java/org/apache/crunch/test/TestCounters.java
index 8c8823c..bcb4da1 100644
--- a/crunch/src/main/java/org/apache/crunch/test/TestCounters.java
+++ b/crunch/src/main/java/org/apache/crunch/test/TestCounters.java
@@ -26,16 +26,16 @@ import org.apache.hadoop.mapreduce.Counters;
 public class TestCounters {
 
   private static Counters COUNTERS = new Counters();
-  
+
   public static Counter getCounter(Enum<?> e) {
     return COUNTERS.findCounter(e);
   }
-  
+
   public static Counter getCounter(String group, String name) {
     return COUNTERS.findCounter(group, name);
   }
-  
+
   public static void clearCounters() {
-	  COUNTERS = new Counters();
-  }   
+    COUNTERS = new Counters();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/tool/CrunchTool.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/tool/CrunchTool.java b/crunch/src/main/java/org/apache/crunch/tool/CrunchTool.java
index a788520..00d458c 100644
--- a/crunch/src/main/java/org/apache/crunch/tool/CrunchTool.java
+++ b/crunch/src/main/java/org/apache/crunch/tool/CrunchTool.java
@@ -19,10 +19,6 @@ package org.apache.crunch.tool;
 
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.Tool;
-
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pipeline;
@@ -34,71 +30,74 @@ import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
 import org.apache.crunch.io.From;
 import org.apache.crunch.io.To;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
 
 /**
  * An extension of the {@code Tool} interface that creates a {@code Pipeline}
  * instance and provides methods for working with the Pipeline from inside of
  * the Tool's run method.
- *
+ * 
  */
 public abstract class CrunchTool extends Configured implements Tool {
 
   protected static final From from = new From();
   protected static final To to = new To();
   protected static final At at = new At();
-  
+
   private Pipeline pipeline;
 
   public CrunchTool() throws IOException {
-	this(false);
+    this(false);
   }
-  
+
   public CrunchTool(boolean inMemory) throws IOException {
-    this.pipeline = inMemory ? MemPipeline.getInstance() : new MRPipeline(getClass());  
+    this.pipeline = inMemory ? MemPipeline.getInstance() : new MRPipeline(getClass());
   }
-  
+
   @Override
   public void setConf(Configuration conf) {
-	super.setConf(conf);
-	if (conf != null && pipeline != null) {
-	  pipeline.setConfiguration(conf);
-	}
+    super.setConf(conf);
+    if (conf != null && pipeline != null) {
+      pipeline.setConfiguration(conf);
+    }
   }
-  
+
   @Override
   public Configuration getConf() {
-	return pipeline.getConfiguration();
+    return pipeline.getConfiguration();
   }
-  
+
   public void enableDebug() {
     pipeline.enableDebug();
   }
-  
+
   public <T> PCollection<T> read(Source<T> source) {
-	return pipeline.read(source);
+    return pipeline.read(source);
   }
-  
+
   public <K, V> PTable<K, V> read(TableSource<K, V> tableSource) {
-	return pipeline.read(tableSource);
+    return pipeline.read(tableSource);
   }
-  
+
   public PCollection<String> readTextFile(String pathName) {
-	return pipeline.readTextFile(pathName);
+    return pipeline.readTextFile(pathName);
   }
-  
+
   public void write(PCollection<?> pcollection, Target target) {
-	pipeline.write(pcollection, target);
+    pipeline.write(pcollection, target);
   }
-  
+
   public void writeTextFile(PCollection<?> pcollection, String pathName) {
-	pipeline.writeTextFile(pcollection, pathName);
+    pipeline.writeTextFile(pcollection, pathName);
   }
-  
+
   public void run() {
-	pipeline.run();
+    pipeline.run();
   }
-  
+
   public void done() {
-	pipeline.done();
+    pipeline.done();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/Converter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/Converter.java b/crunch/src/main/java/org/apache/crunch/types/Converter.java
index 70705f0..a0dbb16 100644
--- a/crunch/src/main/java/org/apache/crunch/types/Converter.java
+++ b/crunch/src/main/java/org/apache/crunch/types/Converter.java
@@ -23,19 +23,19 @@ import org.apache.crunch.DoFn;
 
 /**
  * Converts the input key/value from a MapReduce task into the input to a
- * {@link DoFn}, or takes the output of a {@code DoFn} and write it to
- * the output key/values.
+ * {@link DoFn}, or takes the output of a {@code DoFn} and write it to the
+ * output key/values.
  */
 public interface Converter<K, V, S, T> extends Serializable {
   S convertInput(K key, V value);
-  
+
   T convertIterableInput(K key, Iterable<V> value);
-  
+
   K outputKey(S value);
-  
+
   V outputValue(S value);
-  
+
   Class<K> getKeyClass();
-  
+
   Class<V> getValueClass();
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
index e29018a..b4ac1e6 100644
--- a/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
@@ -20,20 +20,19 @@ package org.apache.crunch.types;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
 import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PGroupedTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
 
 /**
- * The {@code PType} instance for {@link PGroupedTable} instances. Its settings are
- * derived from the {@code PTableType} that was grouped to create the
+ * The {@code PType} instance for {@link PGroupedTable} instances. Its settings
+ * are derived from the {@code PTableType} that was grouped to create the
  * {@code PGroupedTable} instance.
- *
+ * 
  */
 public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable<V>>> {
 
@@ -65,8 +64,7 @@ public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable<
     }
   }
 
-  public static class PairIterableMapFn<K, V> extends
-      MapFn<Pair<Object, Iterable<Object>>, Pair<K, Iterable<V>>> {    
+  public static class PairIterableMapFn<K, V> extends MapFn<Pair<Object, Iterable<Object>>, Pair<K, Iterable<V>>> {
     private final MapFn<Object, K> keys;
     private final MapFn<Object, V> values;
 
@@ -83,13 +81,12 @@ public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable<
 
     @Override
     public Pair<K, Iterable<V>> map(Pair<Object, Iterable<Object>> input) {
-      return Pair.<K, Iterable<V>>of(keys.map(input.first()),
-          new PTypeIterable(values, input.second()));
+      return Pair.<K, Iterable<V>> of(keys.map(input.first()), new PTypeIterable(values, input.second()));
     }
   }
 
   protected final PTableType<K, V> tableType;
-  
+
   public PGroupedTableType(PTableType<K, V> tableType) {
     this.tableType = tableType;
   }
@@ -97,7 +94,7 @@ public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable<
   public PTableType<K, V> getTableType() {
     return tableType;
   }
-  
+
   @Override
   public PTypeFamily getFamily() {
     return tableType.getFamily();
@@ -107,16 +104,16 @@ public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable<
   public List<PType> getSubTypes() {
     return tableType.getSubTypes();
   }
-  
+
   @Override
   public Converter getConverter() {
     return tableType.getConverter();
   }
-  
+
   public abstract Converter getGroupingConverter();
-  
+
   public abstract void configureShuffle(Job job, GroupingOptions options);
-  
+
   @Override
   public SourceTarget<Pair<K, Iterable<V>>> getDefaultFileSource(Path path) {
     throw new UnsupportedOperationException("Grouped tables cannot be written out directly");

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/PTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PTableType.java b/crunch/src/main/java/org/apache/crunch/types/PTableType.java
index b9e946f..3d06f8b 100644
--- a/crunch/src/main/java/org/apache/crunch/types/PTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/PTableType.java
@@ -24,7 +24,7 @@ import org.apache.crunch.Pair;
  * An extension of {@code PType} specifically for {@link PTable} objects. It
  * allows separate access to the {@code PType}s of the key and value for the
  * {@code PTable}.
- *
+ * 
  */
 public interface PTableType<K, V> extends PType<Pair<K, V>> {
   /**
@@ -35,8 +35,8 @@ public interface PTableType<K, V> extends PType<Pair<K, V>> {
   /**
    * Returns the value type for the table.
    */
-  PType<V> getValueType(); 
-  
+  PType<V> getValueType();
+
   /**
    * Returns the grouped table version of this type.
    */

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/PType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PType.java b/crunch/src/main/java/org/apache/crunch/types/PType.java
index 126b165..a60ce62 100644
--- a/crunch/src/main/java/org/apache/crunch/types/PType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/PType.java
@@ -20,12 +20,11 @@ package org.apache.crunch.types;
 import java.io.Serializable;
 import java.util.List;
 
-import org.apache.hadoop.fs.Path;
-
 import org.apache.crunch.DoFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.fs.Path;
 
 /**
  * A {@code PType} defines a mapping between a data type that is used in a

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/PTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PTypeFamily.java b/crunch/src/main/java/org/apache/crunch/types/PTypeFamily.java
index 2ae9493..9458f14 100644
--- a/crunch/src/main/java/org/apache/crunch/types/PTypeFamily.java
+++ b/crunch/src/main/java/org/apache/crunch/types/PTypeFamily.java
@@ -35,7 +35,7 @@ import org.apache.crunch.TupleN;
  */
 public interface PTypeFamily {
   PType<Void> nulls();
-  
+
   PType<String> strings();
 
   PType<Long> longs();
@@ -47,33 +47,31 @@ public interface PTypeFamily {
   PType<Double> doubles();
 
   PType<Boolean> booleans();
-  
+
   PType<ByteBuffer> bytes();
-  
+
   <T> PType<T> records(Class<T> clazz);
 
   <T> PType<Collection<T>> collections(PType<T> ptype);
 
   <T> PType<Map<String, T>> maps(PType<T> ptype);
-  
+
   <V1, V2> PType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2);
 
-  <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2,
-      PType<V3> p3);
+  <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3);
 
-  <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1,
-      PType<V2> p2, PType<V3> p3, PType<V4> p4);
+  <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4);
 
   PType<TupleN> tuples(PType<?>... ptypes);
 
   <T extends Tuple> PType<T> tuples(Class<T> clazz, PType<?>... ptypes);
-  
+
   <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base);
-  
+
   <K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value);
-  
+
   /**
    * Returns the equivalent of the given ptype for this family, if it exists.
    */
-  <T> PType<T> as(PType<T> ptype);  
+  <T> PType<T> as(PType<T> ptype);
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/PTypeUtils.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PTypeUtils.java b/crunch/src/main/java/org/apache/crunch/types/PTypeUtils.java
index 4dda78d..e61b98b 100644
--- a/crunch/src/main/java/org/apache/crunch/types/PTypeUtils.java
+++ b/crunch/src/main/java/org/apache/crunch/types/PTypeUtils.java
@@ -27,9 +27,9 @@ import org.apache.crunch.Tuple4;
 import org.apache.crunch.TupleN;
 
 /**
- * Utilities for converting between {@code PType}s from different {@code PTypeFamily}
- * implementations.
- *
+ * Utilities for converting between {@code PType}s from different
+ * {@code PTypeFamily} implementations.
+ * 
  */
 public class PTypeUtils {
 
@@ -46,8 +46,7 @@ public class PTypeUtils {
       } else if (Tuple3.class.equals(typeClass)) {
         return tf.triples(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)), tf.as(subTypes.get(2)));
       } else if (Tuple4.class.equals(typeClass)) {
-        return tf.quads(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)),
-            tf.as(subTypes.get(2)), tf.as(subTypes.get(3)));
+        return tf.quads(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)), tf.as(subTypes.get(2)), tf.as(subTypes.get(3)));
       } else if (TupleN.class.equals(typeClass)) {
         PType[] newPTypes = subTypes.toArray(new PType[0]);
         for (int i = 0; i < newPTypes.length; i++) {
@@ -61,6 +60,7 @@ public class PTypeUtils {
     }
     return tf.records(typeClass);
   }
-  
-  private PTypeUtils() {}
+
+  private PTypeUtils() {
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java b/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
index 49aae0d..16c3bcd 100644
--- a/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java
@@ -29,10 +29,11 @@ import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
 
 public abstract class TupleFactory<T extends Tuple> implements Serializable {
 
-  public void initialize() { }
-  
-  public abstract T makeTuple(Object...values);
-  
+  public void initialize() {
+  }
+
+  public abstract T makeTuple(Object... values);
+
   public static final TupleFactory<Pair> PAIR = new TupleFactory<Pair>() {
     @Override
     public Pair makeTuple(Object... values) {
@@ -46,7 +47,7 @@ public abstract class TupleFactory<T extends Tuple> implements Serializable {
       return Tuple3.of(values[0], values[1], values[2]);
     }
   };
-  
+
   public static final TupleFactory<Tuple4> TUPLE4 = new TupleFactory<Tuple4>() {
     @Override
     public Tuple4 makeTuple(Object... values) {
@@ -64,19 +65,19 @@ public abstract class TupleFactory<T extends Tuple> implements Serializable {
   public static <T extends Tuple> TupleFactory<T> create(Class<T> clazz, Class... typeArgs) {
     return new CustomTupleFactory<T>(clazz, typeArgs);
   }
-  
+
   private static class CustomTupleFactory<T extends Tuple> extends TupleFactory<T> {
 
     private final Class<T> clazz;
     private final Class[] typeArgs;
-    
+
     private transient Constructor<T> constructor;
-    
+
     public CustomTupleFactory(Class<T> clazz, Class[] typeArgs) {
       this.clazz = clazz;
       this.typeArgs = typeArgs;
     }
-    
+
     @Override
     public void initialize() {
       try {
@@ -85,7 +86,7 @@ public abstract class TupleFactory<T extends Tuple> implements Serializable {
         throw new CrunchRuntimeException(e);
       }
     }
-    
+
     @Override
     public T makeTuple(Object... values) {
       try {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
index be9f5e9..078353a 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java
@@ -35,7 +35,6 @@ import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
-
 import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
 
 /**
@@ -128,8 +127,7 @@ public abstract class AvroDeepCopier<T> implements Serializable {
     try {
       datumWriter.write(source, binaryEncoder);
       binaryEncoder.flush();
-      binaryDecoder = DecoderFactory.get()
-          .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
+      binaryDecoder = DecoderFactory.get().binaryDecoder(byteOutStream.toByteArray(), binaryDecoder);
       datumReader.read(target, binaryDecoder);
     } catch (Exception e) {
       throw new CrunchRuntimeException("Error while deep copying avro value " + source, e);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
index 3b0c3ec..f4e407a 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java
@@ -95,8 +95,7 @@ public class AvroGroupedTableType<K, V> extends PGroupedTableType<K, V> {
 
     Avros.configureReflectDataFactory(conf);
 
-    Collection<String> serializations = job.getConfiguration().getStringCollection(
-        "io.serializations");
+    Collection<String> serializations = job.getConfiguration().getStringCollection("io.serializations");
     if (!serializations.contains(SafeAvroSerialization.class.getName())) {
       serializations.add(SafeAvroSerialization.class.getName());
       job.getConfiguration().setStrings("io.serializations", serializations.toArray(new String[0]));

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java
index da5bbb2..59c1862 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java
@@ -35,24 +35,24 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 /** An {@link org.apache.hadoop.mapreduce.InputFormat} for Avro data files. */
 public class AvroInputFormat<T> extends FileInputFormat<AvroWrapper<T>, NullWritable> {
 
-	@Override
-	protected List<FileStatus> listStatus(JobContext job) throws IOException {
-	  List<FileStatus> result = new ArrayList<FileStatus>();
-      for (FileStatus file : super.listStatus(job)) {
-        if (file.getPath().getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) {
-          result.add(file);
-		}
+  @Override
+  protected List<FileStatus> listStatus(JobContext job) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    for (FileStatus file : super.listStatus(job)) {
+      if (file.getPath().getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) {
+        result.add(file);
       }
-      return result;
-	}
+    }
+    return result;
+  }
 
-	@Override
-	public RecordReader<AvroWrapper<T>, NullWritable> createRecordReader(InputSplit split,
-		TaskAttemptContext context) throws IOException, InterruptedException {
-      context.setStatus(split.toString());
-      String jsonSchema = context.getConfiguration().get(AvroJob.INPUT_SCHEMA);
-      Schema schema = new Schema.Parser().parse(jsonSchema);
-      return new AvroRecordReader<T>(schema);
-	}
+  @Override
+  public RecordReader<AvroWrapper<T>, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    context.setStatus(split.toString());
+    String jsonSchema = context.getConfiguration().get(AvroJob.INPUT_SCHEMA);
+    Schema schema = new Schema.Parser().parse(jsonSchema);
+    return new AvroRecordReader<T>(schema);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
index 60aa69b..fb03c1f 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
@@ -18,14 +18,13 @@
 package org.apache.crunch.types.avro;
 
 import org.apache.avro.mapred.AvroWrapper;
-import org.apache.hadoop.io.NullWritable;
-
 import org.apache.crunch.types.Converter;
+import org.apache.hadoop.io.NullWritable;
 
 public class AvroKeyConverter<K> implements Converter<AvroWrapper<K>, NullWritable, K, Iterable<K>> {
-  
+
   private transient AvroWrapper<K> wrapper = null;
-  
+
   @Override
   public K convertInput(AvroWrapper<K> key, NullWritable value) {
     return key.datum();
@@ -60,8 +59,7 @@ public class AvroKeyConverter<K> implements Converter<AvroWrapper<K>, NullWritab
   }
 
   @Override
-  public Iterable<K> convertIterableInput(AvroWrapper<K> key,
-      Iterable<NullWritable> value) {
+  public Iterable<K> convertIterableInput(AvroWrapper<K> key, Iterable<NullWritable> value) {
     throw new UnsupportedOperationException("Should not be possible");
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
index 838c21a..9a5a073 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java
@@ -34,8 +34,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
 
   @Override
-  public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(
-      TaskAttemptContext context) throws IOException, InterruptedException {
+  public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException,
+      InterruptedException {
 
     Configuration conf = context.getConfiguration();
     Schema schema = null;
@@ -45,24 +45,21 @@ public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWr
     } else {
       schema = AvroJob.getOutputSchema(context.getConfiguration());
     }
-    
+
     ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
-    final DataFileWriter<T> WRITER = new DataFileWriter<T>(factory.<T>getWriter());
+    final DataFileWriter<T> WRITER = new DataFileWriter<T>(factory.<T> getWriter());
 
-    Path path = getDefaultWorkFile(context,
-        org.apache.avro.mapred.AvroOutputFormat.EXT);
-    WRITER.create(schema,
-        path.getFileSystem(context.getConfiguration()).create(path));
+    Path path = getDefaultWorkFile(context, org.apache.avro.mapred.AvroOutputFormat.EXT);
+    WRITER.create(schema, path.getFileSystem(context.getConfiguration()).create(path));
 
     return new RecordWriter<AvroWrapper<T>, NullWritable>() {
       @Override
-      public void write(AvroWrapper<T> wrapper, NullWritable ignore)
-        throws IOException {
+      public void write(AvroWrapper<T> wrapper, NullWritable ignore) throws IOException {
         WRITER.append(wrapper.datum());
       }
+
       @Override
-      public void close(TaskAttemptContext context) throws IOException,
-          InterruptedException {
+      public void close(TaskAttemptContext context) throws IOException, InterruptedException {
         WRITER.close();
       }
     };

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java
index 6ec3972..b4dca51 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java
@@ -21,15 +21,14 @@ import java.util.Iterator;
 
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
-
 import org.apache.crunch.Pair;
 import org.apache.crunch.types.Converter;
 
 public class AvroPairConverter<K, V> implements Converter<AvroKey<K>, AvroValue<V>, Pair<K, V>, Pair<K, Iterable<V>>> {
-  
+
   private transient AvroKey<K> keyWrapper = null;
   private transient AvroValue<V> valueWrapper = null;
-  
+
   @Override
   public Pair<K, V> convertInput(AvroKey<K> key, AvroValue<V> value) {
     return Pair.of(key.datum(), value.datum());
@@ -37,9 +36,9 @@ public class AvroPairConverter<K, V> implements Converter<AvroKey<K>, AvroValue<
 
   public Pair<K, Iterable<V>> convertIterableInput(AvroKey<K> key, Iterable<AvroValue<V>> iter) {
     Iterable<V> it = new AvroWrappedIterable<V>(iter);
-    return Pair.of(key.datum(), it);  
+    return Pair.of(key.datum(), it);
   }
-  
+
   @Override
   public AvroKey<K> outputKey(Pair<K, V> value) {
     getKeyWrapper().datum(value.first());
@@ -61,29 +60,29 @@ public class AvroPairConverter<K, V> implements Converter<AvroKey<K>, AvroValue<
   public Class<AvroValue<V>> getValueClass() {
     return (Class<AvroValue<V>>) getValueWrapper().getClass();
   }
-  
+
   private AvroKey<K> getKeyWrapper() {
     if (keyWrapper == null) {
       keyWrapper = new AvroKey<K>();
     }
     return keyWrapper;
   }
-  
+
   private AvroValue<V> getValueWrapper() {
     if (valueWrapper == null) {
       valueWrapper = new AvroValue<V>();
     }
     return valueWrapper;
   }
-  
+
   private static class AvroWrappedIterable<V> implements Iterable<V> {
 
     private final Iterable<AvroValue<V>> iters;
-    
+
     public AvroWrappedIterable(Iterable<AvroValue<V>> iters) {
       this.iters = iters;
     }
-    
+
     @Override
     public Iterator<V> iterator() {
       return new Iterator<V>() {
@@ -102,7 +101,7 @@ public class AvroPairConverter<K, V> implements Converter<AvroKey<K>, AvroValue<
         @Override
         public void remove() {
           it.remove();
-        }  
+        }
       };
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
index 3bcab5c..00bd995 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
@@ -38,78 +38,77 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 /** An {@link RecordReader} for Avro data files. */
 public class AvroRecordReader<T> extends RecordReader<AvroWrapper<T>, NullWritable> {
 
-	private FileReader<T> reader;
-	private long start;
-	private long end;
-	private AvroWrapper<T> key;
-	private NullWritable value;
-	private Schema schema;
+  private FileReader<T> reader;
+  private long start;
+  private long end;
+  private AvroWrapper<T> key;
+  private NullWritable value;
+  private Schema schema;
 
-	public AvroRecordReader(Schema schema) {
-		this.schema = schema;
-	}
+  public AvroRecordReader(Schema schema) {
+    this.schema = schema;
+  }
 
-	@Override
-	public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException,
-			InterruptedException {
-		FileSplit split = (FileSplit) genericSplit;
-		Configuration conf = context.getConfiguration();
-		SeekableInput in = new FsInput(split.getPath(), conf);
-		DatumReader<T> datumReader = null;
-		if (context.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true)) {
-		  ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
-			datumReader = factory.getReader(schema);
-		} else {
-			datumReader = new SpecificDatumReader<T>(schema);
-		}
-		this.reader = DataFileReader.openReader(in, datumReader);
-		reader.sync(split.getStart()); // sync to start
-		this.start = reader.tell();
-		this.end = split.getStart() + split.getLength();
-	}
+  @Override
+  public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+    FileSplit split = (FileSplit) genericSplit;
+    Configuration conf = context.getConfiguration();
+    SeekableInput in = new FsInput(split.getPath(), conf);
+    DatumReader<T> datumReader = null;
+    if (context.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true)) {
+      ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
+      datumReader = factory.getReader(schema);
+    } else {
+      datumReader = new SpecificDatumReader<T>(schema);
+    }
+    this.reader = DataFileReader.openReader(in, datumReader);
+    reader.sync(split.getStart()); // sync to start
+    this.start = reader.tell();
+    this.end = split.getStart() + split.getLength();
+  }
 
-	@Override
-	public boolean nextKeyValue() throws IOException, InterruptedException {
-		if (!reader.hasNext() || reader.pastSync(end)) {
-			key = null;
-			value = null;
-			return false;
-		}
-		if (key == null) {
-			key = new AvroWrapper<T>();
-		}
-		if (value == null) {
-			value = NullWritable.get();
-		}
-		key.datum(reader.next(key.datum()));
-		return true;
-	}
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (!reader.hasNext() || reader.pastSync(end)) {
+      key = null;
+      value = null;
+      return false;
+    }
+    if (key == null) {
+      key = new AvroWrapper<T>();
+    }
+    if (value == null) {
+      value = NullWritable.get();
+    }
+    key.datum(reader.next(key.datum()));
+    return true;
+  }
 
-	@Override
-	public AvroWrapper<T> getCurrentKey() throws IOException, InterruptedException {
-		return key;
-	}
+  @Override
+  public AvroWrapper<T> getCurrentKey() throws IOException, InterruptedException {
+    return key;
+  }
 
-	@Override
-	public NullWritable getCurrentValue() throws IOException, InterruptedException {
-		return value;
-	}
+  @Override
+  public NullWritable getCurrentValue() throws IOException, InterruptedException {
+    return value;
+  }
 
-	@Override
-	public float getProgress() throws IOException {
-		if (end == start) {
-			return 0.0f;
-		} else {
-			return Math.min(1.0f, (getPos() - start) / (float) (end - start));
-		}
-	}
+  @Override
+  public float getProgress() throws IOException {
+    if (end == start) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f, (getPos() - start) / (float) (end - start));
+    }
+  }
 
-	public long getPos() throws IOException {
-		return reader.tell();
-	}
+  public long getPos() throws IOException {
+    return reader.tell();
+  }
 
-	@Override
-	public void close() throws IOException {
-		reader.close();
-	}
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
index a7a2d0a..eb26bf1 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
@@ -65,8 +65,8 @@ public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableT
     public void initialize() {
       keyMapFn.setContext(getContext());
       valueMapFn.setContext(getContext());
-      pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema(
-          new Schema.Parser().parse(firstJson), new Schema.Parser().parse(secondJson)).toString();
+      pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema(new Schema.Parser().parse(firstJson),
+          new Schema.Parser().parse(secondJson)).toString();
     }
 
     @Override
@@ -119,9 +119,9 @@ public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableT
   private final AvroType<V> valueType;
 
   public AvroTableType(AvroType<K> keyType, AvroType<V> valueType, Class<Pair<K, V>> pairClass) {
-    super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(),
-        valueType.getSchema()), new IndexedRecordToPair(keyType.getInputMapFn(),
-        valueType.getInputMapFn()), new PairToAvroPair(keyType, valueType), keyType, valueType);
+    super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(), valueType.getSchema()),
+        new IndexedRecordToPair(keyType.getInputMapFn(), valueType.getInputMapFn()), new PairToAvroPair(keyType,
+            valueType), keyType, valueType);
     this.keyType = keyType;
     this.valueType = valueType;
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
index 32a4334..b3ce576 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java
@@ -56,8 +56,7 @@ public class AvroType<T> implements PType<T> {
     this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), ptypes);
   }
 
-  public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn,
-      PType... ptypes) {
+  public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn, PType... ptypes) {
     this.typeClass = typeClass;
     this.schema = Preconditions.checkNotNull(schema);
     this.schemaString = schema.toString();
@@ -126,8 +125,7 @@ public class AvroType<T> implements PType<T> {
       return false;
     }
 
-    return !(typeClass.equals(GenericData.Record.class) || SpecificRecord.class
-        .isAssignableFrom(typeClass));
+    return !(typeClass.equals(GenericData.Record.class) || SpecificRecord.class.isAssignableFrom(typeClass));
   }
 
   public MapFn<Object, T> getInputMapFn() {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
index f8645c3..e09e173 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java
@@ -121,8 +121,7 @@ public class AvroTypeFamily implements PTypeFamily {
   }
 
   @Override
-  public <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2,
-      PType<V3> p3, PType<V4> p4) {
+  public <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4) {
     return Avros.quads(p1, p2, p3, p4);
   }
 
@@ -159,8 +158,7 @@ public class AvroTypeFamily implements PTypeFamily {
   }
 
   @Override
-  public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn,
-      PType<S> base) {
+  public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
     return Avros.derived(clazz, inputFn, outputFn, base);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java
index 207fe8d..9460fa5 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java
@@ -34,8 +34,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
 
 /**
- * An {@link org.apache.hadoop.mapred.InputFormat} for text files.
- * Each line is a {@link Utf8} key; values are null.
+ * An {@link org.apache.hadoop.mapred.InputFormat} for text files. Each line is
+ * a {@link Utf8} key; values are null.
  */
 public class AvroUtf8InputFormat extends FileInputFormat<AvroWrapper<Utf8>, NullWritable> {
 
@@ -44,7 +44,7 @@ public class AvroUtf8InputFormat extends FileInputFormat<AvroWrapper<Utf8>, Null
     private LineRecordReader lineRecordReader;
 
     private AvroWrapper<Utf8> currentKey = new AvroWrapper<Utf8>();
-    
+
     public Utf8LineRecordReader() throws IOException {
       this.lineRecordReader = new LineRecordReader();
     }
@@ -58,22 +58,19 @@ public class AvroUtf8InputFormat extends FileInputFormat<AvroWrapper<Utf8>, Null
     }
 
     @Override
-    public AvroWrapper<Utf8> getCurrentKey() throws IOException,
-        InterruptedException {
+    public AvroWrapper<Utf8> getCurrentKey() throws IOException, InterruptedException {
       Text txt = lineRecordReader.getCurrentValue();
       currentKey.datum(new Utf8(txt.toString()));
       return currentKey;
     }
 
     @Override
-    public NullWritable getCurrentValue() throws IOException,
-        InterruptedException {
+    public NullWritable getCurrentValue() throws IOException, InterruptedException {
       return NullWritable.get();
     }
 
     @Override
-    public void initialize(InputSplit split, TaskAttemptContext context)
-        throws IOException, InterruptedException {
+    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
       lineRecordReader.initialize(split, context);
     }
 
@@ -94,9 +91,8 @@ public class AvroUtf8InputFormat extends FileInputFormat<AvroWrapper<Utf8>, Null
   }
 
   @Override
-  public RecordReader<AvroWrapper<Utf8>, NullWritable> createRecordReader(
-      InputSplit split, TaskAttemptContext context) throws IOException,
-      InterruptedException {
+  public RecordReader<AvroWrapper<Utf8>, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
     return new Utf8LineRecordReader();
   }
 }


Mime
View raw message