incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject [8/10] Format all sources according to formatting profile
Date Sat, 14 Jul 2012 18:14:55 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
index a6d7169..f8214a1 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -77,8 +77,7 @@ public class Avros {
   public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory";
 
   public static void configureReflectDataFactory(Configuration conf) {
-    conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(),
-        ReflectDataFactory.class);
+    conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(), ReflectDataFactory.class);
   }
 
   public static ReflectDataFactory getReflectDataFactory(Configuration conf) {
@@ -110,8 +109,8 @@ public class Avros {
     }
   };
 
-  private static final AvroType<String> strings = new AvroType<String>(String.class,
-      Schema.create(Schema.Type.STRING), UTF8_TO_STRING, STRING_TO_UTF8);
+  private static final AvroType<String> strings = new AvroType<String>(String.class, Schema.create(Schema.Type.STRING),
+      UTF8_TO_STRING, STRING_TO_UTF8);
   private static final AvroType<Void> nulls = create(Void.class, Schema.Type.NULL);
   private static final AvroType<Long> longs = create(Long.class, Schema.Type.LONG);
   private static final AvroType<Integer> ints = create(Integer.class, Schema.Type.INT);
@@ -121,10 +120,9 @@ public class Avros {
   private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(ByteBuffer.class,
       Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance());
 
-  private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap
-      .<Class<?>, PType<?>> builder().put(String.class, strings).put(Long.class, longs)
-      .put(Integer.class, ints).put(Float.class, floats).put(Double.class, doubles)
-      .put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
+  private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>> builder()
+      .put(String.class, strings).put(Long.class, longs).put(Integer.class, ints).put(Float.class, floats)
+      .put(Double.class, doubles).put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
 
   private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps.newHashMap();
 
@@ -208,8 +206,8 @@ public class Avros {
     public T map(ByteBuffer input) {
       T instance = ReflectionUtils.newInstance(writableClazz, getConfiguration());
       try {
-        instance.readFields(new DataInputStream(new ByteArrayInputStream(input.array(), input
-            .arrayOffset(), input.limit())));
+        instance.readFields(new DataInputStream(new ByteArrayInputStream(input.array(), input.arrayOffset(), input
+            .limit())));
       } catch (IOException e) {
         LOG.error("Exception thrown reading instance of: " + writableClazz, e);
       }
@@ -234,8 +232,8 @@ public class Avros {
   }
 
   public static final <T extends Writable> AvroType<T> writables(Class<T> clazz) {
-    return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(
-        clazz), new WritableToBytesMapFn<T>());
+    return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(clazz),
+        new WritableToBytesMapFn<T>());
   }
 
   private static class GenericDataArrayToCollection<T> extends MapFn<Object, Collection<T>> {
@@ -279,8 +277,7 @@ public class Avros {
     }
   }
 
-  private static class CollectionToGenericDataArray extends
-      MapFn<Collection<?>, GenericData.Array<?>> {
+  private static class CollectionToGenericDataArray extends MapFn<Collection<?>, GenericData.Array<?>> {
 
     private final MapFn mapFn;
     private final String jsonSchema;
@@ -322,10 +319,8 @@ public class Avros {
   public static final <T> AvroType<Collection<T>> collections(PType<T> ptype) {
     AvroType<T> avroType = (AvroType<T>) ptype;
     Schema collectionSchema = Schema.createArray(allowNulls(avroType.getSchema()));
-    GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(
-        avroType.getInputMapFn());
-    CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema,
-        avroType.getOutputMapFn());
+    GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(avroType.getInputMapFn());
+    CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema, avroType.getOutputMapFn());
     return new AvroType(Collection.class, collectionSchema, input, output, ptype);
   }
 
@@ -525,25 +520,23 @@ public class Avros {
     return new AvroType(Pair.class, schema, input, output, p1, p2);
   }
 
-  public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2,
-      PType<V3> p3) {
+  public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) {
     Schema schema = createTupleSchema(p1, p2, p3);
-    return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2,
-        p3), new TupleToGenericRecord(schema, p1, p2, p3), p1, p2, p3);
+    return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2, p3),
+        new TupleToGenericRecord(schema, p1, p2, p3), p1, p2, p3);
   }
 
-  public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1,
-      PType<V2> p2, PType<V3> p3, PType<V4> p4) {
+  public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3,
+      PType<V4> p4) {
     Schema schema = createTupleSchema(p1, p2, p3, p4);
-    return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2,
-        p3, p4), new TupleToGenericRecord(schema, p1, p2, p3, p4), p1, p2, p3, p4);
+    return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2, p3, p4),
+        new TupleToGenericRecord(schema, p1, p2, p3, p4), p1, p2, p3, p4);
   }
 
   public static final AvroType<TupleN> tuples(PType... ptypes) {
     Schema schema = createTupleSchema(ptypes);
-    return new AvroType(TupleN.class, schema,
-        new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes), new TupleToGenericRecord(schema,
-            ptypes), ptypes);
+    return new AvroType(TupleN.class, schema, new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes),
+        new TupleToGenericRecord(schema, ptypes), ptypes);
   }
 
   public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz, PType... ptypes) {
@@ -553,8 +546,8 @@ public class Avros {
       typeArgs[i] = ptypes[i].getTypeClass();
     }
     TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
-    return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes),
-        new TupleToGenericRecord(schema, ptypes), ptypes);
+    return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes), new TupleToGenericRecord(schema,
+        ptypes), ptypes);
   }
 
   private static Schema createTupleSchema(PType<?>... ptypes) {
@@ -571,12 +564,11 @@ public class Avros {
     return schema;
   }
 
-  public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn,
-      MapFn<T, S> outputFn, PType<S> base) {
+  public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn,
+      PType<S> base) {
     AvroType<S> abase = (AvroType<S>) base;
-    return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(),
-        inputFn), new CompositeMapFn(outputFn, abase.getOutputMapFn()), base.getSubTypes().toArray(
-        new PType[0]));
+    return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(), inputFn),
+        new CompositeMapFn(outputFn, abase.getOutputMapFn()), base.getSubTypes().toArray(new PType[0]));
   }
 
   public static <T> PType<T> jsons(Class<T> clazz) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java b/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
index 7c952ec..c19a168 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java
@@ -27,12 +27,14 @@ import org.apache.avro.reflect.ReflectDatumWriter;
  */
 public class ReflectDataFactory {
 
-  public ReflectData getReflectData() { return ReflectData.AllowNull.get(); }
-  
+  public ReflectData getReflectData() {
+    return ReflectData.AllowNull.get();
+  }
+
   public <T> ReflectDatumReader<T> getReader(Schema schema) {
     return new ReflectDatumReader<T>(schema);
   }
-  
+
   public <T> ReflectDatumWriter<T> getWriter() {
     return new ReflectDatumWriter<T>();
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java b/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
index b5f27fe..266179e 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java
@@ -43,56 +43,54 @@ import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /** The {@link Serialization} used by jobs configured with {@link AvroJob}. */
-public class SafeAvroSerialization<T> extends Configured 
-  implements Serialization<AvroWrapper<T>> {
+public class SafeAvroSerialization<T> extends Configured implements Serialization<AvroWrapper<T>> {
 
   public boolean accept(Class<?> c) {
     return AvroWrapper.class.isAssignableFrom(c);
   }
-  
-  /** Returns the specified map output deserializer.  Defaults to the final
-   * output deserializer if no map output schema was specified. */
+
+  /**
+   * Returns the specified map output deserializer. Defaults to the final output
+   * deserializer if no map output schema was specified.
+   */
   public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c) {
     boolean isKey = AvroKey.class.isAssignableFrom(c);
     Configuration conf = getConf();
-    Schema schema = isKey 
-        ? Pair.getKeySchema(AvroJob.getMapOutputSchema(conf))
-            : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf));
+    Schema schema = isKey ? Pair.getKeySchema(AvroJob.getMapOutputSchema(conf)) : Pair.getValueSchema(AvroJob
+        .getMapOutputSchema(conf));
 
     DatumReader<T> datumReader = null;
-    if (conf.getBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, false)) {        
-        ReflectDataFactory factory = (ReflectDataFactory) ReflectionUtils.newInstance(
-            conf.getClass("crunch.reflectdatafactory", ReflectDataFactory.class), conf);
-        datumReader = factory.getReader(schema);
+    if (conf.getBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, false)) {
+      ReflectDataFactory factory = (ReflectDataFactory) ReflectionUtils.newInstance(
+          conf.getClass("crunch.reflectdatafactory", ReflectDataFactory.class), conf);
+      datumReader = factory.getReader(schema);
     } else {
-        datumReader = new SpecificDatumReader<T>(schema);
+      datumReader = new SpecificDatumReader<T>(schema);
     }
     return new AvroWrapperDeserializer(datumReader, isKey);
   }
-  
+
   private static final DecoderFactory FACTORY = DecoderFactory.get();
 
-  private class AvroWrapperDeserializer
-    implements Deserializer<AvroWrapper<T>> {
+  private class AvroWrapperDeserializer implements Deserializer<AvroWrapper<T>> {
 
     private DatumReader<T> reader;
     private BinaryDecoder decoder;
     private boolean isKey;
-    
+
     public AvroWrapperDeserializer(DatumReader<T> reader, boolean isKey) {
       this.reader = reader;
       this.isKey = isKey;
     }
-    
+
     public void open(InputStream in) {
       this.decoder = FACTORY.directBinaryDecoder(in, decoder);
     }
-    
-    public AvroWrapper<T> deserialize(AvroWrapper<T> wrapper)
-      throws IOException {
+
+    public AvroWrapper<T> deserialize(AvroWrapper<T> wrapper) throws IOException {
       T datum = reader.read(wrapper == null ? null : wrapper.datum(), decoder);
       if (wrapper == null) {
-        wrapper = isKey? new AvroKey<T>(datum) : new AvroValue<T>(datum);
+        wrapper = isKey ? new AvroKey<T>(datum) : new AvroValue<T>(datum);
       } else {
         wrapper.datum(datum);
       }
@@ -103,16 +101,14 @@ public class SafeAvroSerialization<T> extends Configured
       decoder.inputStream().close();
     }
   }
-  
+
   /** Returns the specified output serializer. */
   public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c) {
     // AvroWrapper used for final output, AvroKey or AvroValue for map output
     boolean isFinalOutput = c.equals(AvroWrapper.class);
     Configuration conf = getConf();
-    Schema schema = isFinalOutput ? AvroJob.getOutputSchema(conf)
-        : (AvroKey.class.isAssignableFrom(c)
-            ? Pair.getKeySchema(AvroJob.getMapOutputSchema(conf))
-                : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf)));
+    Schema schema = isFinalOutput ? AvroJob.getOutputSchema(conf) : (AvroKey.class.isAssignableFrom(c) ? Pair
+        .getKeySchema(AvroJob.getMapOutputSchema(conf)) : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf)));
 
     ReflectDataFactory factory = Avros.getReflectDataFactory(conf);
     ReflectDatumWriter<T> writer = factory.getWriter();
@@ -124,15 +120,14 @@ public class SafeAvroSerialization<T> extends Configured
     private DatumWriter<T> writer;
     private OutputStream out;
     private BinaryEncoder encoder;
-    
+
     public AvroWrapperSerializer(DatumWriter<T> writer) {
       this.writer = writer;
     }
 
     public void open(OutputStream out) {
       this.out = out;
-      this.encoder = new EncoderFactory().configureBlockSize(512)
-          .binaryEncoder(out, null);
+      this.encoder = new EncoderFactory().configureBlockSize(512).binaryEncoder(out, null);
     }
 
     public void serialize(AvroWrapper<T> wrapper) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java b/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
index 5c4d83f..1178e8f 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java
@@ -23,13 +23,12 @@ import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableUtils;
 
-import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
-
 public class GenericArrayWritable<T> implements Writable {
   private Writable[] values;
   private Class<? extends Writable> valueClass;
@@ -37,13 +36,13 @@ public class GenericArrayWritable<T> implements Writable {
   public GenericArrayWritable(Class<? extends Writable> valueClass) {
     this.valueClass = valueClass;
   }
-  
+
   public GenericArrayWritable() {
     // for deserialization
   }
-  
-  public void set(Writable[] values) { 
-    this.values = values; 
+
+  public void set(Writable[] values) {
+    this.values = values;
   }
 
   public Writable[] get() {
@@ -51,7 +50,7 @@ public class GenericArrayWritable<T> implements Writable {
   }
 
   public void readFields(DataInput in) throws IOException {
-    values = new Writable[WritableUtils.readVInt(in)];          // construct values
+    values = new Writable[WritableUtils.readVInt(in)]; // construct values
     if (values.length > 0) {
       int nulls = WritableUtils.readVInt(in);
       if (nulls == values.length) {
@@ -61,24 +60,24 @@ public class GenericArrayWritable<T> implements Writable {
       setValueType(valueType);
       for (int i = 0; i < values.length; i++) {
         Writable value = WritableFactories.newInstance(valueClass);
-        value.readFields(in);                       // read a value
-        values[i] = value;                          // store it in values
+        value.readFields(in); // read a value
+        values[i] = value; // store it in values
       }
     }
   }
-  
+
   protected void setValueType(String valueType) {
     if (valueClass == null) {
       try {
-        valueClass = Class.forName(valueType).asSubclass(Writable.class);      
+        valueClass = Class.forName(valueType).asSubclass(Writable.class);
       } catch (ClassNotFoundException e) {
         throw new CrunchRuntimeException(e);
       }
-    } else if (!valueType.equals(valueClass.getName()))  {
+    } else if (!valueType.equals(valueClass.getName())) {
       throw new IllegalStateException("Incoming " + valueType + " is not " + valueClass);
     }
   }
-  
+
   public void write(DataOutput out) throws IOException {
     WritableUtils.writeVInt(out, values.length);
     int nulls = 0;
@@ -100,7 +99,7 @@ public class GenericArrayWritable<T> implements Writable {
       }
     }
   }
-  
+
   @Override
   public int hashCode() {
     HashCodeBuilder hcb = new HashCodeBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/TupleWritable.java b/crunch/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
index ee4e80b..1c3536b 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/TupleWritable.java
@@ -154,8 +154,7 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
     try {
       for (int i = 0; i < card; ++i) {
         if (has(i)) {
-          cls[i] = Class.forName(Text.readString(in))
-              .asSubclass(Writable.class);
+          cls[i] = Class.forName(Text.readString(in)).asSubclass(Writable.class);
         }
       }
       for (int i = 0; i < card; ++i) {
@@ -207,7 +206,7 @@ public class TupleWritable implements WritableComparable<TupleWritable> {
         Writable v2 = o.values[i];
         if (v1 != v2 && (v1 != null && !v1.equals(v2))) {
           if (v1 instanceof WritableComparable && v2 instanceof WritableComparable) {
-            int cmp = ((WritableComparable)v1).compareTo((WritableComparable)v2);
+            int cmp = ((WritableComparable) v1).compareTo((WritableComparable) v2);
             if (cmp != 0) {
               return cmp;
             }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
index 1d18843..3c9312a 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
@@ -17,37 +17,34 @@
  */
 package org.apache.crunch.types.writable;
 
-import org.apache.hadoop.mapreduce.Job;
-
 import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
 import org.apache.crunch.lib.PTables;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PGroupedTableType;
+import org.apache.hadoop.mapreduce.Job;
 
 public class WritableGroupedTableType<K, V> extends PGroupedTableType<K, V> {
 
   private final MapFn inputFn;
   private final MapFn outputFn;
   private final Converter converter;
-  
+
   public WritableGroupedTableType(WritableTableType<K, V> tableType) {
     super(tableType);
     WritableType keyType = (WritableType) tableType.getKeyType();
     WritableType valueType = (WritableType) tableType.getValueType();
-    this.inputFn =  new PairIterableMapFn(keyType.getInputMapFn(),
-        valueType.getInputMapFn());
+    this.inputFn = new PairIterableMapFn(keyType.getInputMapFn(), valueType.getInputMapFn());
     this.outputFn = tableType.getOutputMapFn();
-    this.converter = new WritablePairConverter(keyType.getSerializationClass(),
-        valueType.getSerializationClass());
+    this.converter = new WritablePairConverter(keyType.getSerializationClass(), valueType.getSerializationClass());
   }
-  
+
   @Override
   public Class<Pair<K, Iterable<V>>> getTypeClass() {
-    return (Class<Pair<K, Iterable<V>>>) Pair.of(null, null).getClass();  
+    return (Class<Pair<K, Iterable<V>>>) Pair.of(null, null).getClass();
   }
-  
+
   @Override
   public Converter getGroupingConverter() {
     return converter;
@@ -57,12 +54,12 @@ public class WritableGroupedTableType<K, V> extends PGroupedTableType<K, V> {
   public MapFn getInputMapFn() {
     return inputFn;
   }
-  
+
   @Override
   public MapFn getOutputMapFn() {
     return outputFn;
   }
-  
+
   @Override
   public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) {
     return PTables.getGroupedDetachedValue(this, value);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java
index ba64b0b..750f2c6 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java
@@ -21,15 +21,15 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.types.Converter;
 
 public class WritablePairConverter<K, V> implements Converter<K, V, Pair<K, V>, Pair<K, Iterable<V>>> {
-  
+
   private final Class<K> keyClass;
   private final Class<V> valueClass;
-  
+
   public WritablePairConverter(Class<K> keyClass, Class<V> valueClass) {
     this.keyClass = keyClass;
     this.valueClass = valueClass;
   }
-  
+
   @Override
   public Pair<K, V> convertInput(K key, V value) {
     return Pair.of(key, value);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
index 9c781f1..fc6dd04 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
@@ -20,9 +20,6 @@ package org.apache.crunch.types.writable;
 import java.util.List;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
 import org.apache.crunch.SourceTarget;
@@ -34,6 +31,9 @@ import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+
 import com.google.common.collect.ImmutableList;
 
 class WritableTableType<K, V> implements PTableType<K, V> {
@@ -43,44 +43,40 @@ class WritableTableType<K, V> implements PTableType<K, V> {
   private final MapFn inputFn;
   private final MapFn outputFn;
   private final Converter converter;
-  
-  public WritableTableType(WritableType<K, Writable> keyType,
-      WritableType<V, Writable> valueType) {
+
+  public WritableTableType(WritableType<K, Writable> keyType, WritableType<V, Writable> valueType) {
     this.keyType = keyType;
     this.valueType = valueType;
-    this.inputFn = new PairMapFn(keyType.getInputMapFn(),
-        valueType.getInputMapFn());
-    this.outputFn = new PairMapFn(keyType.getOutputMapFn(),
-        valueType.getOutputMapFn());
-    this.converter = new WritablePairConverter(keyType.getSerializationClass(),
-        valueType.getSerializationClass());
+    this.inputFn = new PairMapFn(keyType.getInputMapFn(), valueType.getInputMapFn());
+    this.outputFn = new PairMapFn(keyType.getOutputMapFn(), valueType.getOutputMapFn());
+    this.converter = new WritablePairConverter(keyType.getSerializationClass(), valueType.getSerializationClass());
   }
 
   @Override
   public Class<Pair<K, V>> getTypeClass() {
     return (Class<Pair<K, V>>) Pair.of(null, null).getClass();
   }
-  
+
   @Override
   public List<PType> getSubTypes() {
-    return ImmutableList.<PType>of(keyType, valueType);
+    return ImmutableList.<PType> of(keyType, valueType);
   }
-  
+
   @Override
   public MapFn getInputMapFn() {
     return inputFn;
   }
-  
+
   @Override
   public MapFn getOutputMapFn() {
     return outputFn;
   }
-  
+
   @Override
   public Converter getConverter() {
     return converter;
   }
-  
+
   @Override
   public PTypeFamily getFamily() {
     return WritableTypeFamily.getInstance();
@@ -103,7 +99,7 @@ class WritableTableType<K, V> implements PTableType<K, V> {
   public SourceTarget<Pair<K, V>> getDefaultFileSource(Path path) {
     return new SeqFileTableSourceTarget<K, V>(path, this);
   }
-  
+
   @Override
   public Pair<K, V> getDetachedValue(Pair<K, V> value) {
     return PTables.getDetachedValue(this, value);
@@ -111,16 +107,16 @@ class WritableTableType<K, V> implements PTableType<K, V> {
 
   @Override
   public boolean equals(Object obj) {
-	if (obj == null || !(obj instanceof WritableTableType)) {
-	  return false;
-	}
-	WritableTableType that = (WritableTableType) obj;
-	return keyType.equals(that.keyType) && valueType.equals(that.valueType);
+    if (obj == null || !(obj instanceof WritableTableType)) {
+      return false;
+    }
+    WritableTableType that = (WritableTableType) obj;
+    return keyType.equals(that.keyType) && valueType.equals(that.valueType);
   }
-  
+
   @Override
   public int hashCode() {
-	HashCodeBuilder hcb = new HashCodeBuilder();
-	return hcb.append(keyType).append(valueType).toHashCode();
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    return hcb.append(keyType).append(valueType).toHashCode();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
index b99050d..45502a3 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java
@@ -20,9 +20,6 @@ package org.apache.crunch.types.writable;
 import java.util.List;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-
 import org.apache.crunch.MapFn;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.fn.IdentityFn;
@@ -30,6 +27,9 @@ import org.apache.crunch.io.seq.SeqFileSourceTarget;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+
 import com.google.common.collect.ImmutableList;
 
 public class WritableType<T, W extends Writable> implements PType<T> {
@@ -40,15 +40,15 @@ public class WritableType<T, W extends Writable> implements PType<T> {
   private final MapFn<W, T> inputFn;
   private final MapFn<T, W> outputFn;
   private final List<PType> subTypes;
-  
-  WritableType(Class<T> typeClass, Class<W> writableClass,
-      MapFn<W, T> inputDoFn, MapFn<T, W> outputDoFn, PType...subTypes) {
+
+  WritableType(Class<T> typeClass, Class<W> writableClass, MapFn<W, T> inputDoFn, MapFn<T, W> outputDoFn,
+      PType... subTypes) {
     this.typeClass = typeClass;
     this.writableClass = writableClass;
     this.inputFn = inputDoFn;
     this.outputFn = outputDoFn;
     this.converter = new WritableValueConverter(writableClass);
-    this.subTypes = ImmutableList.<PType>builder().add(subTypes).build();
+    this.subTypes = ImmutableList.<PType> builder().add(subTypes).build();
   }
 
   @Override
@@ -65,22 +65,22 @@ public class WritableType<T, W extends Writable> implements PType<T> {
   public Converter getConverter() {
     return converter;
   }
-  
+
   @Override
   public MapFn getInputMapFn() {
     return inputFn;
   }
-  
+
   @Override
   public MapFn getOutputMapFn() {
     return outputFn;
   }
-  
+
   @Override
   public List<PType> getSubTypes() {
     return subTypes;
   }
-  
+
   public Class<W> getSerializationClass() {
     return writableClass;
   }
@@ -89,17 +89,16 @@ public class WritableType<T, W extends Writable> implements PType<T> {
   public SourceTarget<T> getDefaultFileSource(Path path) {
     return new SeqFileSourceTarget<T>(path, this);
   }
-  
+
   @Override
   public boolean equals(Object obj) {
-	if (obj == null || !(obj instanceof WritableType)) {
-	  return false;
-	}
-	WritableType wt = (WritableType) obj;
-	return (typeClass.equals(wt.typeClass) && writableClass.equals(wt.writableClass) &&	
-		subTypes.equals(wt.subTypes));
+    if (obj == null || !(obj instanceof WritableType)) {
+      return false;
+    }
+    WritableType wt = (WritableType) obj;
+    return (typeClass.equals(wt.typeClass) && writableClass.equals(wt.writableClass) && subTypes.equals(wt.subTypes));
   }
-  
+
   // Unchecked warnings are suppressed because we know that W and T are the same
   // type (due to the IdentityFn being used)
   @SuppressWarnings("unchecked")
@@ -115,8 +114,8 @@ public class WritableType<T, W extends Writable> implements PType<T> {
 
   @Override
   public int hashCode() {
-	HashCodeBuilder hcb = new HashCodeBuilder();
-	hcb.append(typeClass).append(writableClass).append(subTypes);
-	return hcb.toHashCode();
+    HashCodeBuilder hcb = new HashCodeBuilder();
+    hcb.append(typeClass).append(writableClass).append(subTypes);
+    return hcb.toHashCode();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
index 6ff33e9..a94db96 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
@@ -21,8 +21,6 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Map;
 
-import org.apache.hadoop.io.Writable;
-
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Tuple;
@@ -34,10 +32,11 @@ import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.PTypeUtils;
+import org.apache.hadoop.io.Writable;
 
 /**
- * The {@link Writable}-based implementation of the {@link org.apache.crunch.types.PTypeFamily}
- * interface.
+ * The {@link Writable}-based implementation of the
+ * {@link org.apache.crunch.types.PTypeFamily} interface.
  */
 public class WritableTypeFamily implements PTypeFamily {
 
@@ -78,11 +77,11 @@ public class WritableTypeFamily implements PTypeFamily {
   public PType<Boolean> booleans() {
     return Writables.booleans();
   }
-  
+
   public PType<ByteBuffer> bytes() {
     return Writables.bytes();
   }
-  
+
   public <T> PType<T> records(Class<T> clazz) {
     return Writables.records(clazz);
   }
@@ -99,13 +98,11 @@ public class WritableTypeFamily implements PTypeFamily {
     return Writables.pairs(p1, p2);
   }
 
-  public <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1,
-      PType<V2> p2, PType<V3> p3) {
+  public <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) {
     return Writables.triples(p1, p2, p3);
   }
 
-  public <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1,
-      PType<V2> p2, PType<V3> p3, PType<V4> p4) {
+  public <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3, PType<V4> p4) {
     return Writables.quads(p1, p2, p3, p4);
   }
 
@@ -118,13 +115,13 @@ public class WritableTypeFamily implements PTypeFamily {
   }
 
   public <T> PType<Map<String, T>> maps(PType<T> ptype) {
-	return Writables.maps(ptype);
+    return Writables.maps(ptype);
   }
-  
+
   @Override
   public <T> PType<T> as(PType<T> ptype) {
-    if (ptype instanceof WritableType || ptype instanceof WritableTableType ||
-        ptype instanceof WritableGroupedTableType) {
+    if (ptype instanceof WritableType || ptype instanceof WritableTableType
+        || ptype instanceof WritableGroupedTableType) {
       return ptype;
     }
     if (ptype instanceof PGroupedTableType) {
@@ -144,8 +141,7 @@ public class WritableTypeFamily implements PTypeFamily {
   }
 
   @Override
-  public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn,
-      MapFn<T, S> outputFn, PType<S> base) {
+  public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
     return Writables.derived(clazz, inputFn, outputFn, base);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java
index 85cadc8..3670b90 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java
@@ -17,18 +17,17 @@
  */
 package org.apache.crunch.types.writable;
 
-import org.apache.hadoop.io.NullWritable;
-
 import org.apache.crunch.types.Converter;
+import org.apache.hadoop.io.NullWritable;
 
 class WritableValueConverter<W> implements Converter<Object, W, W, Iterable<W>> {
-  
+
   private final Class<W> serializationClass;
-  
+
   public WritableValueConverter(Class<W> serializationClass) {
     this.serializationClass = serializationClass;
   }
-  
+
   @Override
   public W convertInput(Object key, W value) {
     return value;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
index 08b6c64..f4906b7 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
@@ -27,18 +27,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Tuple;
@@ -51,6 +39,18 @@ import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.TupleFactory;
 import org.apache.crunch.util.PTypes;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -74,7 +74,7 @@ public class Writables {
       return NullWritable.get();
     }
   };
-  
+
   private static final MapFn<Text, String> TEXT_TO_STRING = new MapFn<Text, String>() {
     @Override
     public String map(Text input) {
@@ -160,14 +160,14 @@ public class Writables {
       return input == Boolean.TRUE ? TRUE : FALSE;
     }
   };
-  
+
   private static final MapFn<BytesWritable, ByteBuffer> BW_TO_BB = new MapFn<BytesWritable, ByteBuffer>() {
     @Override
     public ByteBuffer map(BytesWritable input) {
       return ByteBuffer.wrap(input.getBytes(), 0, input.getLength());
     }
   };
-  
+
   private static final MapFn<ByteBuffer, BytesWritable> BB_TO_BW = new MapFn<ByteBuffer, BytesWritable>() {
     @Override
     public BytesWritable map(ByteBuffer input) {
@@ -177,49 +177,42 @@ public class Writables {
     }
   };
 
-  private static <S, W extends Writable> WritableType<S, W> create(Class<S> typeClass,
-      Class<W> writableClass, MapFn<W, S> inputDoFn, MapFn<S, W> outputDoFn) {
-    return new WritableType<S, W>(typeClass, writableClass, inputDoFn,
-        outputDoFn);
+  private static <S, W extends Writable> WritableType<S, W> create(Class<S> typeClass, Class<W> writableClass,
+      MapFn<W, S> inputDoFn, MapFn<S, W> outputDoFn) {
+    return new WritableType<S, W>(typeClass, writableClass, inputDoFn, outputDoFn);
   }
 
   private static final WritableType<Void, NullWritable> nulls = create(Void.class, NullWritable.class,
       NULL_WRITABLE_TO_VOID, VOID_TO_NULL_WRITABLE);
-  private static final WritableType<String, Text> strings = create(String.class, Text.class,
-      TEXT_TO_STRING, STRING_TO_TEXT);
-  private static final WritableType<Long, LongWritable> longs = create(Long.class, LongWritable.class,
-      LW_TO_LONG, LONG_TO_LW);
-  private static final WritableType<Integer, IntWritable> ints = create(Integer.class, IntWritable.class,
-      IW_TO_INT, INT_TO_IW);
+  private static final WritableType<String, Text> strings = create(String.class, Text.class, TEXT_TO_STRING,
+      STRING_TO_TEXT);
+  private static final WritableType<Long, LongWritable> longs = create(Long.class, LongWritable.class, LW_TO_LONG,
+      LONG_TO_LW);
+  private static final WritableType<Integer, IntWritable> ints = create(Integer.class, IntWritable.class, IW_TO_INT,
+      INT_TO_IW);
   private static final WritableType<Float, FloatWritable> floats = create(Float.class, FloatWritable.class,
       FW_TO_FLOAT, FLOAT_TO_FW);
-  private static final WritableType<Double, DoubleWritable> doubles = create(Double.class,
-      DoubleWritable.class, DW_TO_DOUBLE, DOUBLE_TO_DW);
-  private static final WritableType<Boolean, BooleanWritable> booleans = create(Boolean.class,
-      BooleanWritable.class, BW_TO_BOOLEAN, BOOLEAN_TO_BW);
-  private static final WritableType<ByteBuffer, BytesWritable> bytes = create(ByteBuffer.class,
-      BytesWritable.class, BW_TO_BB, BB_TO_BW);
-
-  private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>>builder()
-      .put(String.class, strings)
-      .put(Long.class, longs)
-      .put(Integer.class, ints)
-      .put(Float.class, floats)
-      .put(Double.class, doubles)
-      .put(Boolean.class, booleans)
-      .put(ByteBuffer.class, bytes)
-      .build();
-  
+  private static final WritableType<Double, DoubleWritable> doubles = create(Double.class, DoubleWritable.class,
+      DW_TO_DOUBLE, DOUBLE_TO_DW);
+  private static final WritableType<Boolean, BooleanWritable> booleans = create(Boolean.class, BooleanWritable.class,
+      BW_TO_BOOLEAN, BOOLEAN_TO_BW);
+  private static final WritableType<ByteBuffer, BytesWritable> bytes = create(ByteBuffer.class, BytesWritable.class,
+      BW_TO_BB, BB_TO_BW);
+
+  private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>> builder()
+      .put(String.class, strings).put(Long.class, longs).put(Integer.class, ints).put(Float.class, floats)
+      .put(Double.class, doubles).put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
+
   private static final Map<Class<?>, WritableType<?, ?>> EXTENSIONS = Maps.newHashMap();
-  
+
   public static <T> PType<T> getPrimitiveType(Class<T> clazz) {
     return (PType<T>) PRIMITIVES.get(clazz);
   }
-  
+
   public static <T> void register(Class<T> clazz, WritableType<T, ? extends Writable> ptype) {
     EXTENSIONS.put(clazz, ptype);
   }
-  
+
   public static final WritableType<Void, NullWritable> nulls() {
     return nulls;
   }
@@ -247,11 +240,11 @@ public class Writables {
   public static final WritableType<Boolean, BooleanWritable> booleans() {
     return booleans;
   }
-  
+
   public static final WritableType<ByteBuffer, BytesWritable> bytes() {
     return bytes;
   }
-  
+
   public static final <T, W extends Writable> WritableType<T, W> records(Class<T> clazz) {
     if (EXTENSIONS.containsKey(clazz)) {
       return (WritableType<T, W>) EXTENSIONS.get(clazz);
@@ -264,8 +257,7 @@ public class Writables {
     return new WritableType<W, W>(clazz, clazz, wIdentity, wIdentity);
   }
 
-  public static <K, V> WritableTableType<K, V> tableOf(
-      PType<K> key, PType<V> value) {
+  public static <K, V> WritableTableType<K, V> tableOf(PType<K> key, PType<V> value) {
     if (key instanceof WritableTableType) {
       WritableTableType wtt = (WritableTableType) key;
       key = pairs(wtt.getKeyType(), wtt.getValueType());
@@ -312,7 +304,7 @@ public class Writables {
         fn.setConfigurationForTest(conf);
       }
     }
-    
+
     @Override
     public void initialize() {
       for (MapFn fn : fns) {
@@ -343,7 +335,7 @@ public class Writables {
    * 
    */
   private static class TupleTWMapFn extends MapFn<Tuple, TupleWritable> {
-    
+
     private transient TupleWritable writable;
     private transient Writable[] values;
 
@@ -399,24 +391,18 @@ public class Writables {
     return new WritableType(Pair.class, TupleWritable.class, input, output, p1, p2);
   }
 
-  public static <V1, V2, V3> WritableType<Tuple3<V1, V2, V3>, TupleWritable> triples(PType<V1> p1,
-      PType<V2> p2, PType<V3> p3) {
+  public static <V1, V2, V3> WritableType<Tuple3<V1, V2, V3>, TupleWritable> triples(PType<V1> p1, PType<V2> p2,
+      PType<V3> p3) {
     TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE3, p1, p2, p3);
     TupleTWMapFn output = new TupleTWMapFn(p1, p2, p3);
-    return new WritableType(Tuple3.class, TupleWritable.class,
-        input,
-        output,
-        p1, p2, p3);
+    return new WritableType(Tuple3.class, TupleWritable.class, input, output, p1, p2, p3);
   }
 
-  public static <V1, V2, V3, V4> WritableType<Tuple4<V1, V2, V3, V4>, TupleWritable> quads(PType<V1> p1,
-      PType<V2> p2, PType<V3> p3, PType<V4> p4) {
+  public static <V1, V2, V3, V4> WritableType<Tuple4<V1, V2, V3, V4>, TupleWritable> quads(PType<V1> p1, PType<V2> p2,
+      PType<V3> p3, PType<V4> p4) {
     TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE4, p1, p2, p3, p4);
     TupleTWMapFn output = new TupleTWMapFn(p1, p2, p3, p4);
-    return new WritableType(Tuple4.class, TupleWritable.class,
-        input,
-        output,
-        p1, p2, p3, p4);
+    return new WritableType(Tuple4.class, TupleWritable.class, input, output, p1, p2, p3, p4);
   }
 
   public static WritableType<TupleN, TupleWritable> tuples(PType... ptypes) {
@@ -433,19 +419,17 @@ public class Writables {
     TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
     TWTupleMapFn input = new TWTupleMapFn(factory, ptypes);
     TupleTWMapFn output = new TupleTWMapFn(ptypes);
-    return new WritableType(clazz, TupleWritable.class, input, output, ptypes);  
+    return new WritableType(clazz, TupleWritable.class, input, output, ptypes);
   }
-  
-  public static <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn,
-      PType<S> base) {
+
+  public static <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, PType<S> base) {
     WritableType<S, ?> wt = (WritableType<S, ?>) base;
     MapFn input = new CompositeMapFn(wt.getInputMapFn(), inputFn);
     MapFn output = new CompositeMapFn(outputFn, wt.getOutputMapFn());
     return new WritableType(clazz, wt.getSerializationClass(), input, output, base.getSubTypes().toArray(new PType[0]));
   }
-  
-  private static class ArrayCollectionMapFn<T> extends
-      MapFn<GenericArrayWritable, Collection<T>> {
+
+  private static class ArrayCollectionMapFn<T> extends MapFn<GenericArrayWritable, Collection<T>> {
     private final MapFn<Object, T> mapFn;
 
     public ArrayCollectionMapFn(MapFn<Object, T> mapFn) {
@@ -461,12 +445,12 @@ public class Writables {
     public void setConfigurationForTest(Configuration conf) {
       mapFn.setConfigurationForTest(conf);
     }
-    
+
     @Override
     public void initialize() {
-      mapFn.setContext(getContext());   
+      mapFn.setContext(getContext());
     }
-    
+
     @Override
     public Collection<T> map(GenericArrayWritable input) {
       Collection<T> collection = Lists.newArrayList();
@@ -477,14 +461,12 @@ public class Writables {
     }
   }
 
-  private static class CollectionArrayMapFn<T> extends
-      MapFn<Collection<T>, GenericArrayWritable> {
-    
+  private static class CollectionArrayMapFn<T> extends MapFn<Collection<T>, GenericArrayWritable> {
+
     private final Class<? extends Writable> clazz;
     private final MapFn<T, Object> mapFn;
 
-    public CollectionArrayMapFn(Class<? extends Writable> clazz,
-        MapFn<T, Object> mapFn) {
+    public CollectionArrayMapFn(Class<? extends Writable> clazz, MapFn<T, Object> mapFn) {
       this.clazz = clazz;
       this.mapFn = mapFn;
     }
@@ -498,12 +480,12 @@ public class Writables {
     public void setConfigurationForTest(Configuration conf) {
       mapFn.setConfigurationForTest(conf);
     }
-    
+
     @Override
     public void initialize() {
-      mapFn.setContext(getContext());   
+      mapFn.setContext(getContext());
     }
-    
+
     @Override
     public GenericArrayWritable map(Collection<T> input) {
       GenericArrayWritable arrayWritable = new GenericArrayWritable(clazz);
@@ -519,9 +501,8 @@ public class Writables {
 
   public static <T> WritableType<Collection<T>, GenericArrayWritable<T>> collections(PType<T> ptype) {
     WritableType<T, ?> wt = (WritableType<T, ?>) ptype;
-    return new WritableType(Collection.class, GenericArrayWritable.class,
-        new ArrayCollectionMapFn(wt.getInputMapFn()), new CollectionArrayMapFn(
-            wt.getSerializationClass(), wt.getOutputMapFn()), ptype);
+    return new WritableType(Collection.class, GenericArrayWritable.class, new ArrayCollectionMapFn(wt.getInputMapFn()),
+        new CollectionArrayMapFn(wt.getSerializationClass(), wt.getOutputMapFn()), ptype);
   }
 
   private static class MapInputMapFn<T> extends MapFn<TextMapWritable<Writable>, Map<String, T>> {
@@ -555,7 +536,7 @@ public class Writables {
       return out;
     }
   }
-  
+
   private static class MapOutputMapFn<T> extends MapFn<Map<String, T>, TextMapWritable<Writable>> {
 
     private final Class<Writable> clazz;
@@ -588,20 +569,19 @@ public class Writables {
         tmw.put(new Text(e.getKey()), mapFn.map(e.getValue()));
       }
       return tmw;
-    }	
+    }
   }
-  
+
   public static <T> WritableType<Map<String, T>, MapWritable> maps(PType<T> ptype) {
-	WritableType<T, ?> wt = (WritableType<T, ?>) ptype;
-    return new WritableType(Map.class, TextMapWritable.class,
-        new MapInputMapFn(wt.getInputMapFn()),
+    WritableType<T, ?> wt = (WritableType<T, ?>) ptype;
+    return new WritableType(Map.class, TextMapWritable.class, new MapInputMapFn(wt.getInputMapFn()),
         new MapOutputMapFn(wt.getSerializationClass(), wt.getOutputMapFn()), ptype);
   }
-  
+
   public static <T> PType<T> jsons(Class<T> clazz) {
-    return PTypes.jsonString(clazz, WritableTypeFamily.getInstance());  
+    return PTypes.jsonString(clazz, WritableTypeFamily.getInstance());
   }
-  
+
   /**
    * Perform a deep copy of a writable value.
    * 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/util/Collects.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/util/Collects.java b/crunch/src/main/java/org/apache/crunch/util/Collects.java
index f5b07c4..c8c9311 100644
--- a/crunch/src/main/java/org/apache/crunch/util/Collects.java
+++ b/crunch/src/main/java/org/apache/crunch/util/Collects.java
@@ -27,12 +27,12 @@ import com.google.common.collect.Lists;
  * of implementations.
  */
 public class Collects {
-  
+
   public static <T> Collection<T> newArrayList() {
     return Lists.newArrayList();
   }
 
-  public static <T> Collection<T> newArrayList(T...elements) {
+  public static <T> Collection<T> newArrayList(T... elements) {
     return Lists.newArrayList(elements);
   }
 
@@ -44,5 +44,6 @@ public class Collects {
     return Lists.newArrayList(elements);
   }
 
-  private Collects() {}
+  private Collects() {
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/util/DistCache.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/util/DistCache.java b/crunch/src/main/java/org/apache/crunch/util/DistCache.java
index 682e8f0..6ab2a50 100644
--- a/crunch/src/main/java/org/apache/crunch/util/DistCache.java
+++ b/crunch/src/main/java/org/apache/crunch/util/DistCache.java
@@ -26,29 +26,28 @@ import java.net.URL;
 import java.net.URLDecoder;
 import java.util.Enumeration;
 
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
-
 /**
- * Provides functions for working with Hadoop's distributed cache. These include:
+ * Provides functions for working with Hadoop's distributed cache. These
+ * include:
  * <ul>
- *   <li>
- *     Functions for working with a job-specific distributed cache of objects, like the
- *     serialized runtime nodes in a MapReduce.
- *   </li>
- *   <li>
- *     Functions for adding library jars to the distributed cache, which will be added to the
- *     classpath of MapReduce tasks.
- *   </li>
+ * <li>
+ * Functions for working with a job-specific distributed cache of objects, like
+ * the serialized runtime nodes in a MapReduce.</li>
+ * <li>
+ * Functions for adding library jars to the distributed cache, which will be
+ * added to the classpath of MapReduce tasks.</li>
  * </ul>
  */
 public class DistCache {
 
-  // Configuration key holding the paths of jars to export to the distributed cache.
+  // Configuration key holding the paths of jars to export to the distributed
+  // cache.
   private static final String TMPJARS_KEY = "tmpjars";
 
   public static void write(Configuration conf, Path path, Object value) throws IOException {
@@ -82,20 +81,24 @@ public class DistCache {
   }
 
   /**
-   * Adds the specified jar to the distributed cache of jobs using the provided configuration. The
-   * jar will be placed on the classpath of tasks run by the job.
-   *
-   * @param conf The configuration used to add the jar to the distributed cache.
-   * @param jarFile The jar file to add to the distributed cache.
-   * @throws IOException If the jar file does not exist or there is a problem accessing the file.
+   * Adds the specified jar to the distributed cache of jobs using the provided
+   * configuration. The jar will be placed on the classpath of tasks run by the
+   * job.
+   * 
+   * @param conf
+   *          The configuration used to add the jar to the distributed cache.
+   * @param jarFile
+   *          The jar file to add to the distributed cache.
+   * @throws IOException
+   *           If the jar file does not exist or there is a problem accessing
+   *           the file.
    */
   public static void addJarToDistributedCache(Configuration conf, File jarFile) throws IOException {
     if (!jarFile.exists()) {
       throw new IOException("Jar file: " + jarFile.getCanonicalPath() + " does not exist.");
     }
     if (!jarFile.getName().endsWith(".jar")) {
-      throw new IllegalArgumentException("File: " + jarFile.getCanonicalPath() + " is not a .jar "
-          + "file.");
+      throw new IllegalArgumentException("File: " + jarFile.getCanonicalPath() + " is not a .jar " + "file.");
     }
     // Get a qualified path for the jar.
     FileSystem fileSystem = FileSystem.getLocal(conf);
@@ -111,67 +114,77 @@ public class DistCache {
   }
 
   /**
-   * Adds the jar at the specified path to the distributed cache of jobs using the provided
-   * configuration. The jar will be placed on the classpath of tasks run by the job.
-   *
-   * @param conf The configuration used to add the jar to the distributed cache.
-   * @param jarFile The path to the jar file to add to the distributed cache.
-   * @throws IOException If the jar file does not exist or there is a problem accessing the file.
+   * Adds the jar at the specified path to the distributed cache of jobs using
+   * the provided configuration. The jar will be placed on the classpath of
+   * tasks run by the job.
+   * 
+   * @param conf
+   *          The configuration used to add the jar to the distributed cache.
+   * @param jarFile
+   *          The path to the jar file to add to the distributed cache.
+   * @throws IOException
+   *           If the jar file does not exist or there is a problem accessing
+   *           the file.
    */
-  public static void addJarToDistributedCache(Configuration conf, String jarFile)
-      throws IOException {
+  public static void addJarToDistributedCache(Configuration conf, String jarFile) throws IOException {
     addJarToDistributedCache(conf, new File(jarFile));
   }
 
   /**
-   * Finds the path to a jar that contains the class provided, if any. There is no guarantee that
-   * the jar returned will be the first on the classpath to contain the file. This method is
-   * basically lifted out of Hadoop's {@link org.apache.hadoop.mapred.JobConf} class.
-   *
-   * @param jarClass The class the jar file should contain.
-   * @return The path to a jar file that contains the class, or <code>null</code> if no such jar
-   *     exists.
-   * @throws IOException If there is a problem searching for the jar file.
+   * Finds the path to a jar that contains the class provided, if any. There is
+   * no guarantee that the jar returned will be the first on the classpath to
+   * contain the file. This method is basically lifted out of Hadoop's
+   * {@link org.apache.hadoop.mapred.JobConf} class.
+   * 
+   * @param jarClass
+   *          The class the jar file should contain.
+   * @return The path to a jar file that contains the class, or
+   *         <code>null</code> if no such jar exists.
+   * @throws IOException
+   *           If there is a problem searching for the jar file.
    */
   public static String findContainingJar(Class jarClass) throws IOException {
     ClassLoader loader = jarClass.getClassLoader();
     String classFile = jarClass.getName().replaceAll("\\.", "/") + ".class";
-      for(Enumeration itr = loader.getResources(classFile); itr.hasMoreElements();) {
-        URL url = (URL) itr.nextElement();
-        if ("jar".equals(url.getProtocol())) {
-          String toReturn = url.getPath();
-          if (toReturn.startsWith("file:")) {
-            toReturn = toReturn.substring("file:".length());
-          }
-          // URLDecoder is a misnamed class, since it actually decodes
-          // x-www-form-urlencoded MIME type rather than actual
-          // URL encoding (which the file path has). Therefore it would
-          // decode +s to ' 's which is incorrect (spaces are actually
-          // either unencoded or encoded as "%20"). Replace +s first, so
-          // that they are kept sacred during the decoding process.
-          toReturn = toReturn.replaceAll("\\+", "%2B");
-          toReturn = URLDecoder.decode(toReturn, "UTF-8");
-          return toReturn.replaceAll("!.*$", "");
+    for (Enumeration itr = loader.getResources(classFile); itr.hasMoreElements();) {
+      URL url = (URL) itr.nextElement();
+      if ("jar".equals(url.getProtocol())) {
+        String toReturn = url.getPath();
+        if (toReturn.startsWith("file:")) {
+          toReturn = toReturn.substring("file:".length());
         }
+        // URLDecoder is a misnamed class, since it actually decodes
+        // x-www-form-urlencoded MIME type rather than actual
+        // URL encoding (which the file path has). Therefore it would
+        // decode +s to ' 's which is incorrect (spaces are actually
+        // either unencoded or encoded as "%20"). Replace +s first, so
+        // that they are kept sacred during the decoding process.
+        toReturn = toReturn.replaceAll("\\+", "%2B");
+        toReturn = URLDecoder.decode(toReturn, "UTF-8");
+        return toReturn.replaceAll("!.*$", "");
       }
+    }
     return null;
   }
 
   /**
-   * Adds all jars under the specified directory to the distributed cache of jobs using the
-   * provided configuration. The jars will be placed on the classpath of tasks run by the job.
-   * This method does not descend into subdirectories when adding jars.
-   *
-   * @param conf The configuration used to add jars to the distributed cache.
-   * @param jarDirectory A directory containing jar files to add to the distributed cache.
-   * @throws IOException If the directory does not exist or there is a problem accessing the
-   *     directory.
+   * Adds all jars under the specified directory to the distributed cache of
+   * jobs using the provided configuration. The jars will be placed on the
+   * classpath of tasks run by the job. This method does not descend into
+   * subdirectories when adding jars.
+   * 
+   * @param conf
+   *          The configuration used to add jars to the distributed cache.
+   * @param jarDirectory
+   *          A directory containing jar files to add to the distributed cache.
+   * @throws IOException
+   *           If the directory does not exist or there is a problem accessing
+   *           the directory.
    */
-  public static void addJarDirToDistributedCache(Configuration conf, File jarDirectory)
-      throws IOException {
+  public static void addJarDirToDistributedCache(Configuration conf, File jarDirectory) throws IOException {
     if (!jarDirectory.exists() || !jarDirectory.isDirectory()) {
       throw new IOException("Jar directory: " + jarDirectory.getCanonicalPath() + " does not "
-        + "exist or is not a directory.");
+          + "exist or is not a directory.");
     }
     for (File file : jarDirectory.listFiles()) {
       if (!file.isDirectory() && file.getName().endsWith(".jar")) {
@@ -181,18 +194,21 @@ public class DistCache {
   }
 
   /**
-   * Adds all jars under the directory at the specified path to the distributed cache of jobs
-   * using the provided configuration.  The jars will be placed on the classpath of the tasks
-   * run by the job. This method does not descend into subdirectories when adding jars.
-   *
-   * @param conf The configuration used to add jars to the distributed cache.
-   * @param jarDirectory The path to a directory containing jar files to add to the distributed
-   *     cache.
-   * @throws IOException If the directory does not exist or there is a problem accessing the
-   *     directory.
+   * Adds all jars under the directory at the specified path to the distributed
+   * cache of jobs using the provided configuration. The jars will be placed on
+   * the classpath of the tasks run by the job. This method does not descend
+   * into subdirectories when adding jars.
+   * 
+   * @param conf
+   *          The configuration used to add jars to the distributed cache.
+   * @param jarDirectory
+   *          The path to a directory containing jar files to add to the
+   *          distributed cache.
+   * @throws IOException
+   *           If the directory does not exist or there is a problem accessing
+   *           the directory.
    */
-  public static void addJarDirToDistributedCache(Configuration conf, String jarDirectory)
-      throws IOException {
+  public static void addJarDirToDistributedCache(Configuration conf, String jarDirectory) throws IOException {
     addJarDirToDistributedCache(conf, new File(jarDirectory));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/util/PTypes.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/util/PTypes.java b/crunch/src/main/java/org/apache/crunch/util/PTypes.java
index 863b40f..bf46636 100644
--- a/crunch/src/main/java/org/apache/crunch/util/PTypes.java
+++ b/crunch/src/main/java/org/apache/crunch/util/PTypes.java
@@ -20,6 +20,10 @@ package org.apache.crunch.util;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 
+import org.apache.crunch.MapFn;
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TDeserializer;
@@ -29,44 +33,37 @@ import org.apache.thrift.protocol.TBinaryProtocol;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.smile.SmileFactory;
 
-import org.apache.crunch.MapFn;
-import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.Message;
 
 /**
- * Utility functions for creating common types of derived PTypes, e.g., for JSON data,
- * protocol buffers, and Thrift records.
- *
+ * Utility functions for creating common types of derived PTypes, e.g., for JSON
+ * data, protocol buffers, and Thrift records.
+ * 
  */
 public class PTypes {
 
   public static PType<BigInteger> bigInt(PTypeFamily typeFamily) {
-    return typeFamily.derived(BigInteger.class, BYTE_TO_BIGINT, BIGINT_TO_BYTE, typeFamily.bytes());  
+    return typeFamily.derived(BigInteger.class, BYTE_TO_BIGINT, BIGINT_TO_BYTE, typeFamily.bytes());
   }
-  
+
   public static <T> PType<T> jsonString(Class<T> clazz, PTypeFamily typeFamily) {
-    return typeFamily.derived(clazz, new JacksonInputMapFn<T>(clazz),
-        new JacksonOutputMapFn<T>(), typeFamily.strings());
+    return typeFamily
+        .derived(clazz, new JacksonInputMapFn<T>(clazz), new JacksonOutputMapFn<T>(), typeFamily.strings());
   }
 
   public static <T> PType<T> smile(Class<T> clazz, PTypeFamily typeFamily) {
-	return typeFamily.derived(clazz, new SmileInputMapFn<T>(clazz),
-	    new SmileOutputMapFn<T>(), typeFamily.bytes());
+    return typeFamily.derived(clazz, new SmileInputMapFn<T>(clazz), new SmileOutputMapFn<T>(), typeFamily.bytes());
   }
-  
+
   public static <T extends Message> PType<T> protos(Class<T> clazz, PTypeFamily typeFamily) {
-    return typeFamily.derived(clazz, new ProtoInputMapFn<T>(clazz),
-        new ProtoOutputMapFn<T>(), typeFamily.bytes());
+    return typeFamily.derived(clazz, new ProtoInputMapFn<T>(clazz), new ProtoOutputMapFn<T>(), typeFamily.bytes());
   }
-  
+
   public static <T extends TBase> PType<T> thrifts(Class<T> clazz, PTypeFamily typeFamily) {
-    return typeFamily.derived(clazz, new ThriftInputMapFn<T>(clazz),
-        new ThriftOutputMapFn<T>(), typeFamily.bytes());
+    return typeFamily.derived(clazz, new ThriftInputMapFn<T>(clazz), new ThriftOutputMapFn<T>(), typeFamily.bytes());
   }
-  
+
   public static MapFn<ByteBuffer, BigInteger> BYTE_TO_BIGINT = new MapFn<ByteBuffer, BigInteger>() {
     public BigInteger map(ByteBuffer input) {
       return input == null ? null : new BigInteger(input.array());
@@ -78,12 +75,12 @@ public class PTypes {
       return input == null ? null : ByteBuffer.wrap(input.toByteArray());
     }
   };
-  
+
   public static class SmileInputMapFn<T> extends MapFn<ByteBuffer, T> {
 
     private final Class<T> clazz;
     private transient ObjectMapper mapper;
-    
+
     public SmileInputMapFn(Class<T> clazz) {
       this.clazz = clazz;
     }
@@ -92,25 +89,25 @@ public class PTypes {
     public void initialize() {
       this.mapper = new ObjectMapper(new SmileFactory());
     }
-    
-	@Override
-	public T map(ByteBuffer input) {
+
+    @Override
+    public T map(ByteBuffer input) {
       try {
         return mapper.readValue(input.array(), input.position(), input.limit(), clazz);
       } catch (Exception e) {
         throw new CrunchRuntimeException(e);
       }
-	}
+    }
   }
-  
+
   public static class SmileOutputMapFn<T> extends MapFn<T, ByteBuffer> {
     private transient ObjectMapper mapper;
-    
+
     @Override
     public void initialize() {
       this.mapper = new ObjectMapper(new SmileFactory());
     }
-    
+
     @Override
     public ByteBuffer map(T input) {
       try {
@@ -122,19 +119,19 @@ public class PTypes {
   }
 
   public static class JacksonInputMapFn<T> extends MapFn<String, T> {
-    
+
     private final Class<T> clazz;
     private transient ObjectMapper mapper;
-    
+
     public JacksonInputMapFn(Class<T> clazz) {
       this.clazz = clazz;
     }
-    
+
     @Override
     public void initialize() {
       this.mapper = new ObjectMapper();
     }
-    
+
     @Override
     public T map(String input) {
       try {
@@ -142,18 +139,18 @@ public class PTypes {
       } catch (Exception e) {
         throw new CrunchRuntimeException(e);
       }
-    } 
+    }
   }
-  
+
   public static class JacksonOutputMapFn<T> extends MapFn<T, String> {
-    
+
     private transient ObjectMapper mapper;
-    
+
     @Override
     public void initialize() {
       this.mapper = new ObjectMapper();
     }
-    
+
     @Override
     public String map(T input) {
       try {
@@ -163,41 +160,40 @@ public class PTypes {
       }
     }
   }
-  
+
   public static class ProtoInputMapFn<T extends Message> extends MapFn<ByteBuffer, T> {
-    
+
     private final Class<T> clazz;
     private transient T instance;
-    
+
     public ProtoInputMapFn(Class<T> clazz) {
       this.clazz = clazz;
     }
-    
+
     @Override
     public void initialize() {
       this.instance = ReflectionUtils.newInstance(clazz, getConfiguration());
     }
-    
+
     @Override
     public T map(ByteBuffer bb) {
       try {
-        return (T) instance.newBuilderForType().mergeFrom(
-            bb.array(), bb.position(), bb.limit()).build();
+        return (T) instance.newBuilderForType().mergeFrom(bb.array(), bb.position(), bb.limit()).build();
       } catch (InvalidProtocolBufferException e) {
         throw new CrunchRuntimeException(e);
       }
-    }    
+    }
   }
-  
+
   public static class ProtoOutputMapFn<T extends Message> extends MapFn<T, ByteBuffer> {
-    
+
     public ProtoOutputMapFn() {
     }
-    
+
     @Override
     public ByteBuffer map(T proto) {
       return ByteBuffer.wrap(proto.toByteArray());
-    }    
+    }
   }
 
   public static class ThriftInputMapFn<T extends TBase> extends MapFn<ByteBuffer, T> {
@@ -206,18 +202,18 @@ public class PTypes {
     private transient T instance;
     private transient TDeserializer deserializer;
     private transient byte[] bytes;
-    
+
     public ThriftInputMapFn(Class<T> clazz) {
       this.clazz = clazz;
     }
-    
+
     @Override
     public void initialize() {
       this.instance = ReflectionUtils.newInstance(clazz, getConfiguration());
       this.deserializer = new TDeserializer(new TBinaryProtocol.Factory());
       this.bytes = new byte[0];
     }
-    
+
     @Override
     public T map(ByteBuffer bb) {
       T next = (T) instance.deepCopy();
@@ -232,21 +228,21 @@ public class PTypes {
         throw new CrunchRuntimeException(e);
       }
       return next;
-    }    
+    }
   }
-  
+
   public static class ThriftOutputMapFn<T extends TBase> extends MapFn<T, ByteBuffer> {
 
     private transient TSerializer serializer;
-    
+
     public ThriftOutputMapFn() {
     }
-    
+
     @Override
     public void initialize() {
       this.serializer = new TSerializer(new TBinaryProtocol.Factory());
     }
-    
+
     @Override
     public ByteBuffer map(T t) {
       try {
@@ -254,6 +250,6 @@ public class PTypes {
       } catch (TException e) {
         throw new CrunchRuntimeException(e);
       }
-    }    
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/util/Protos.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/util/Protos.java b/crunch/src/main/java/org/apache/crunch/util/Protos.java
index 2cda492..9d661c6 100644
--- a/crunch/src/main/java/org/apache/crunch/util/Protos.java
+++ b/crunch/src/main/java/org/apache/crunch/util/Protos.java
@@ -20,11 +20,11 @@ package org.apache.crunch.util;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.hadoop.util.ReflectionUtils;
-
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.MapFn;
+import org.apache.hadoop.util.ReflectionUtils;
+
 import com.google.common.base.Splitter;
 import com.google.protobuf.Descriptors.FieldDescriptor;
 import com.google.protobuf.Message;
@@ -38,21 +38,21 @@ public class Protos {
   public static <M extends Message, K> MapFn<M, K> extractKey(String fieldName) {
     return new ExtractKeyFn<M, K>(fieldName);
   }
-  
+
   public static <M extends Message> DoFn<String, M> lineParser(String sep, Class<M> msgClass) {
-    return new TextToProtoFn<M>(sep, msgClass);  
+    return new TextToProtoFn<M>(sep, msgClass);
   }
-  
+
   public static class ExtractKeyFn<M extends Message, K> extends MapFn<M, K> {
-    
+
     private final String fieldName;
-    
+
     private transient FieldDescriptor fd;
-    
+
     public ExtractKeyFn(String fieldName) {
       this.fieldName = fieldName;
     }
-    
+
     @Override
     public K map(M input) {
       if (input == null) {
@@ -60,31 +60,33 @@ public class Protos {
       } else if (fd == null) {
         fd = input.getDescriptorForType().findFieldByName(fieldName);
         if (fd == null) {
-          throw new IllegalStateException(
-              "Could not find field: " + fieldName + " in message: " + input);
+          throw new IllegalStateException("Could not find field: " + fieldName + " in message: " + input);
         }
       }
       return (K) input.getField(fd);
     }
-    
+
   }
-  
+
   public static class TextToProtoFn<M extends Message> extends DoFn<String, M> {
-    
+
     private final String sep;
     private final Class<M> msgClass;
-    
+
     private transient M msgInstance;
     private transient List<FieldDescriptor> fields;
     private transient Splitter splitter;
-    
-    enum ParseErrors { TOTAL, NUMBER_FORMAT };
-    
+
+    enum ParseErrors {
+      TOTAL,
+      NUMBER_FORMAT
+    };
+
     public TextToProtoFn(String sep, Class<M> msgClass) {
       this.sep = sep;
       this.msgClass = msgClass;
     }
-    
+
     @Override
     public void initialize() {
       this.msgInstance = ReflectionUtils.newInstance(msgClass, getConfiguration());
@@ -136,7 +138,7 @@ public class Protos {
             }
           }
         }
-        
+
         if (parseError) {
           increment(ParseErrors.TOTAL);
         } else {
@@ -145,6 +147,5 @@ public class Protos {
       }
     }
   }
-  
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/util/Tuples.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/util/Tuples.java b/crunch/src/main/java/org/apache/crunch/util/Tuples.java
index b8eb3b9..9c8d7bd 100644
--- a/crunch/src/main/java/org/apache/crunch/util/Tuples.java
+++ b/crunch/src/main/java/org/apache/crunch/util/Tuples.java
@@ -24,22 +24,23 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.Tuple3;
 import org.apache.crunch.Tuple4;
 import org.apache.crunch.TupleN;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.UnmodifiableIterator;
 
 /**
  * Utilities for working with subclasses of the {@code Tuple} interface.
- *
+ * 
  */
 public class Tuples {
 
   private static abstract class TuplifyIterator<T> extends UnmodifiableIterator<T> {
     protected List<Iterator<?>> iterators;
-    
-    public TuplifyIterator(Iterator<?>...iterators) {
+
+    public TuplifyIterator(Iterator<?>... iterators) {
       this.iterators = Lists.newArrayList(iterators);
     }
-    
+
     @Override
     public boolean hasNext() {
       for (Iterator<?> iter : iterators) {
@@ -49,21 +50,21 @@ public class Tuples {
       }
       return true;
     }
-    
+
     protected Object next(int index) {
       return iterators.get(index).next();
     }
   }
-  
+
   public static class PairIterable<S, T> implements Iterable<Pair<S, T>> {
     private final Iterable<S> first;
     private final Iterable<T> second;
-    
+
     public PairIterable(Iterable<S> first, Iterable<T> second) {
       this.first = first;
       this.second = second;
     }
-    
+
     @Override
     public Iterator<Pair<S, T>> iterator() {
       return new TuplifyIterator<Pair<S, T>>(first.iterator(), second.iterator()) {
@@ -72,68 +73,66 @@ public class Tuples {
           return Pair.of((S) next(0), (T) next(1));
         }
       };
-    }   
+    }
   }
-  
+
   public static class TripIterable<A, B, C> implements Iterable<Tuple3<A, B, C>> {
     private final Iterable<A> first;
     private final Iterable<B> second;
     private final Iterable<C> third;
-    
+
     public TripIterable(Iterable<A> first, Iterable<B> second, Iterable<C> third) {
       this.first = first;
       this.second = second;
       this.third = third;
     }
-    
+
     @Override
     public Iterator<Tuple3<A, B, C>> iterator() {
-      return new TuplifyIterator<Tuple3<A, B, C>>(first.iterator(), second.iterator(),
-          third.iterator()) {
+      return new TuplifyIterator<Tuple3<A, B, C>>(first.iterator(), second.iterator(), third.iterator()) {
         @Override
         public Tuple3<A, B, C> next() {
           return new Tuple3<A, B, C>((A) next(0), (B) next(1), (C) next(2));
         }
       };
-    }   
+    }
   }
-  
+
   public static class QuadIterable<A, B, C, D> implements Iterable<Tuple4<A, B, C, D>> {
     private final Iterable<A> first;
     private final Iterable<B> second;
     private final Iterable<C> third;
     private final Iterable<D> fourth;
-    
-    public QuadIterable(Iterable<A> first, Iterable<B> second, Iterable<C> third,
-        Iterable<D> fourth) {
+
+    public QuadIterable(Iterable<A> first, Iterable<B> second, Iterable<C> third, Iterable<D> fourth) {
       this.first = first;
       this.second = second;
       this.third = third;
       this.fourth = fourth;
     }
-    
+
     @Override
     public Iterator<Tuple4<A, B, C, D>> iterator() {
-      return new TuplifyIterator<Tuple4<A, B, C, D>>(first.iterator(), second.iterator(),
-          third.iterator(), fourth.iterator()) {
+      return new TuplifyIterator<Tuple4<A, B, C, D>>(first.iterator(), second.iterator(), third.iterator(),
+          fourth.iterator()) {
         @Override
         public Tuple4<A, B, C, D> next() {
           return new Tuple4<A, B, C, D>((A) next(0), (B) next(1), (C) next(2), (D) next(3));
         }
       };
-    }   
+    }
   }
-  
+
   public static class TupleNIterable implements Iterable<TupleN> {
     private final Iterator<?>[] iters;
-    
+
     public TupleNIterable(Iterable<?>... iterables) {
       this.iters = new Iterator[iterables.length];
       for (int i = 0; i < iters.length; i++) {
         iters[i] = iterables[i].iterator();
       }
     }
-    
+
     @Override
     public Iterator<TupleN> iterator() {
       return new TuplifyIterator<TupleN>(iters) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
index 6756dbb..a452d2e 100644
--- a/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ b/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -43,7 +43,12 @@ public class CrunchControlledJob {
 
   // A job will be in one of the following states
   public static enum State {
-    SUCCESS, WAITING, RUNNING, READY, FAILED, DEPENDENT_FAILED
+    SUCCESS,
+    WAITING,
+    RUNNING,
+    READY,
+    FAILED,
+    DEPENDENT_FAILED
   };
 
   public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
@@ -63,8 +68,7 @@ public class CrunchControlledJob {
    * @param dependingJobs
    *          an array of jobs the current job depends on
    */
-  public CrunchControlledJob(Job job, List<CrunchControlledJob> dependingJobs)
-      throws IOException {
+  public CrunchControlledJob(Job job, List<CrunchControlledJob> dependingJobs) throws IOException {
     this.job = job;
     this.dependingJobs = dependingJobs;
     this.state = State.WAITING;
@@ -95,8 +99,7 @@ public class CrunchControlledJob {
     if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
       sb.append("job has no depending job:\t").append("\n");
     } else {
-      sb.append("job has ").append(this.dependingJobs.size())
-          .append(" dependeng jobs:\n");
+      sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:\n");
       for (int i = 0; i < this.dependingJobs.size(); i++) {
         sb.append("\t depending job ").append(i).append(":\t");
         sb.append((this.dependingJobs.get(i)).getJobName()).append("\n");
@@ -227,8 +230,7 @@ public class CrunchControlledJob {
    * @return true if this job is in a complete state
    */
   public synchronized boolean isCompleted() {
-    return this.state == State.FAILED || this.state == State.DEPENDENT_FAILED
-        || this.state == State.SUCCESS;
+    return this.state == State.FAILED || this.state == State.DEPENDENT_FAILED || this.state == State.SUCCESS;
   }
 
   /**
@@ -294,8 +296,7 @@ public class CrunchControlledJob {
       }
       if (s == State.FAILED || s == State.DEPENDENT_FAILED) {
         this.state = State.DEPENDENT_FAILED;
-        this.message = "depending job " + i + " with jobID " + pred.getJobID()
-            + " failed. " + pred.getMessage();
+        this.message = "depending job " + i + " with jobID " + pred.getJobID() + " failed. " + pred.getMessage();
         break;
       }
       // pred must be in success state

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
index 7fa61d3..99bb324 100644
--- a/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
+++ b/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
@@ -41,15 +41,19 @@ import org.apache.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.State;
  * state changes of their depending jobs states. The class provides APIs for
  * suspending/resuming the thread, and for stopping the thread.
  * 
- * TODO This is mostly a copy of the JobControl class in Hadoop MapReduce core. 
- * Once the location and interface of the class are more stable in CDH, this class 
- * should be removed completely and be based on the hadoop-core class.
+ * TODO This is mostly a copy of the JobControl class in Hadoop MapReduce core.
+ * Once the location and interface of the class are more stable in CDH, this
+ * class should be removed completely and be based on the hadoop-core class.
  */
 public class CrunchJobControl implements Runnable {
 
   // The thread can be in one of the following state
   public static enum ThreadState {
-    RUNNING, SUSPENDED, STOPPED, STOPPING, READY
+    RUNNING,
+    SUSPENDED,
+    STOPPED,
+    STOPPING,
+    READY
   };
 
   private ThreadState runnerState; // the thread state
@@ -127,8 +131,7 @@ public class CrunchJobControl implements Runnable {
     return this.groupName + this.nextJobID;
   }
 
-  private static void addToQueue(CrunchControlledJob aJob,
-      Map<String, CrunchControlledJob> queue) {
+  private static void addToQueue(CrunchControlledJob aJob, Map<String, CrunchControlledJob> queue) {
     synchronized (queue) {
       queue.put(aJob.getJobID(), aJob);
     }
@@ -213,8 +216,7 @@ public class CrunchJobControl implements Runnable {
     }
   }
 
-  synchronized private void checkRunningJobs() throws IOException,
-      InterruptedException {
+  synchronized private void checkRunningJobs() throws IOException, InterruptedException {
 
     Map<String, CrunchControlledJob> oldJobs = null;
     oldJobs = this.runningJobs;
@@ -226,8 +228,7 @@ public class CrunchJobControl implements Runnable {
     }
   }
 
-  synchronized private void checkWaitingJobs() throws IOException,
-      InterruptedException {
+  synchronized private void checkWaitingJobs() throws IOException, InterruptedException {
     Map<String, CrunchControlledJob> oldJobs = null;
     oldJobs = this.waitingJobs;
     this.waitingJobs = new Hashtable<String, CrunchControlledJob>();
@@ -251,8 +252,7 @@ public class CrunchJobControl implements Runnable {
   }
 
   synchronized public boolean allFinished() {
-    return this.waitingJobs.size() == 0 && this.readyJobs.size() == 0
-        && this.runningJobs.size() == 0;
+    return this.waitingJobs.size() == 0 && this.readyJobs.size() == 0 && this.runningJobs.size() == 0;
   }
 
   /**
@@ -277,8 +277,7 @@ public class CrunchJobControl implements Runnable {
       } catch (Exception e) {
         this.runnerState = ThreadState.STOPPED;
       }
-      if (this.runnerState != ThreadState.RUNNING
-          && this.runnerState != ThreadState.SUSPENDED) {
+      if (this.runnerState != ThreadState.RUNNING && this.runnerState != ThreadState.SUSPENDED) {
         break;
       }
       try {
@@ -286,8 +285,7 @@ public class CrunchJobControl implements Runnable {
       } catch (Exception e) {
 
       }
-      if (this.runnerState != ThreadState.RUNNING
-          && this.runnerState != ThreadState.SUSPENDED) {
+      if (this.runnerState != ThreadState.RUNNING && this.runnerState != ThreadState.SUSPENDED) {
         break;
       }
     }


Mime
View raw message