incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject [4/10] Format all sources according to formatting profile
Date Sat, 14 Jul 2012 18:14:55 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
index 4e114ab..0ee4c3f 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
@@ -22,9 +22,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.util.ReflectionUtils;
-
 import org.apache.crunch.CombineFn;
 import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.PCollection;
@@ -36,51 +33,52 @@ import org.apache.crunch.Target;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.util.ReflectionUtils;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implements PGroupedTable<K, V> {
 
   private final MemTable<K, V> parent;
-  
+
   private static <S, T> Map<S, Collection<T>> createMapFor(PType<S> keyType, GroupingOptions options, Pipeline pipeline) {
     if (options != null && options.getSortComparatorClass() != null) {
-      RawComparator<S> rc = ReflectionUtils.newInstance(options.getSortComparatorClass(),
-          pipeline.getConfiguration());
+      RawComparator<S> rc = ReflectionUtils.newInstance(options.getSortComparatorClass(), pipeline.getConfiguration());
       return new TreeMap<S, Collection<T>>(rc);
     } else if (keyType != null && Comparable.class.isAssignableFrom(keyType.getTypeClass())) {
       return new TreeMap<S, Collection<T>>();
     }
     return Maps.newHashMap();
   }
-  
+
   private static <S, T> Iterable<Pair<S, Iterable<T>>> buildMap(MemTable<S, T> parent, GroupingOptions options) {
     PType<S> keyType = parent.getKeyType();
     Map<S, Collection<T>> map = createMapFor(keyType, options, parent.getPipeline());
-    
+
     for (Pair<S, T> pair : parent.materialize()) {
       S key = pair.first();
       if (!map.containsKey(key)) {
-        map.put(key, Lists.<T>newArrayList());
+        map.put(key, Lists.<T> newArrayList());
       }
       map.get(key).add(pair.second());
     }
-    
+
     List<Pair<S, Iterable<T>>> values = Lists.newArrayList();
     for (Map.Entry<S, Collection<T>> e : map.entrySet()) {
       values.add(Pair.of(e.getKey(), (Iterable<T>) e.getValue()));
     }
     return values;
   }
-  
+
   public MemGroupedTable(MemTable<K, V> parent, GroupingOptions options) {
-	super(buildMap(parent, options));
+    super(buildMap(parent, options));
     this.parent = parent;
   }
 
   @Override
-  public PCollection<Pair<K, Iterable<V>>> union(
-      PCollection<Pair<K, Iterable<V>>>... collections) {
+  public PCollection<Pair<K, Iterable<V>>> union(PCollection<Pair<K, Iterable<V>>>... collections) {
     throw new UnsupportedOperationException();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
index 53e7526..ff2b2d7 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
@@ -34,21 +34,22 @@ import org.apache.crunch.lib.PTables;
 import org.apache.crunch.materialize.MaterializableMap;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
+
 import com.google.common.collect.Lists;
 
 public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<K, V> {
 
   private PTableType<K, V> ptype;
-  
+
   public MemTable(Iterable<Pair<K, V>> collect) {
     this(collect, null, null);
   }
-  
+
   public MemTable(Iterable<Pair<K, V>> collect, PTableType<K, V> ptype, String name) {
     super(collect, ptype, name);
     this.ptype = ptype;
   }
-  
+
   @Override
   public PTable<K, V> union(PTable<K, V>... others) {
     List<Pair<K, V>> values = Lists.newArrayList();
@@ -81,7 +82,7 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<
     super.write(target);
     return this;
   }
-  
+
   @Override
   public PTableType<K, V> getPTableType() {
     return ptype;
@@ -105,32 +106,32 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<
 
   @Override
   public PTable<K, V> top(int count) {
-	return Aggregate.top(this, count, true);
+    return Aggregate.top(this, count, true);
   }
 
   @Override
   public PTable<K, V> bottom(int count) {
-	return Aggregate.top(this, count, false);
+    return Aggregate.top(this, count, false);
   }
 
   @Override
   public PTable<K, Collection<V>> collectValues() {
-	return Aggregate.collectValues(this);
+    return Aggregate.collectValues(this);
   }
 
   @Override
   public <U> PTable<K, Pair<V, U>> join(PTable<K, U> other) {
-	return Join.join(this, other);
+    return Join.join(this, other);
   }
-  
+
   @Override
   public <U> PTable<K, Pair<Collection<V>, Collection<U>>> cogroup(PTable<K, U> other) {
-	return Cogroup.cogroup(this, other);
+    return Cogroup.cogroup(this, other);
   }
-  
+
   @Override
   public PCollection<K> keys() {
-	return PTables.keys(this);
+    return PTables.keys(this);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index 0b7d8d7..37e2083 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -24,14 +24,6 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Appender;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
@@ -55,6 +47,14 @@ import org.apache.crunch.io.ReadableSourceTarget;
 import org.apache.crunch.materialize.MaterializableIterable;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
@@ -164,8 +164,8 @@ public class MRPipeline implements Pipeline {
     if (pcollection instanceof PGroupedTableImpl) {
       pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup();
     } else if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable) {
-      pcollection = pcollection.parallelDo("UnionCollectionWrapper",
-          (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
+      pcollection = pcollection.parallelDo("UnionCollectionWrapper", (MapFn) IdentityFn.<Object> getInstance(),
+          pcollection.getPType());
     }
     addOutput((PCollectionImpl<?>) pcollection, target);
   }
@@ -236,8 +236,11 @@ public class MRPipeline implements Pipeline {
   }
 
   /**
-   * Safely cast a PCollection into a PCollectionImpl, including handling the case of UnionCollections.
-   * @param pcollection The PCollection to be cast/transformed
+   * Safely cast a PCollection into a PCollectionImpl, including handling the
+   * case of UnionCollections.
+   * 
+   * @param pcollection
+   *          The PCollection to be cast/transformed
    * @return The PCollectionImpl representation
    */
   private <T> PCollectionImpl<T> toPcollectionImpl(PCollection<T> pcollection) {
@@ -274,8 +277,8 @@ public class MRPipeline implements Pipeline {
   @Override
   public <T> void writeTextFile(PCollection<T> pcollection, String pathName) {
     // Ensure that this is a writable pcollection instance.
-    pcollection = pcollection.parallelDo("asText", IdentityFn.<T> getInstance(), WritableTypeFamily
-        .getInstance().as(pcollection.getPType()));
+    pcollection = pcollection.parallelDo("asText", IdentityFn.<T> getInstance(),
+        WritableTypeFamily.getInstance().as(pcollection.getPType()));
     write(pcollection, At.textFile(pathName));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
index 3c9d522..1f4fea2 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoCollectionImpl.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.types.PType;
+
 import com.google.common.collect.ImmutableList;
 
 public class DoCollectionImpl<S> extends PCollectionImpl<S> {
@@ -30,8 +31,7 @@ public class DoCollectionImpl<S> extends PCollectionImpl<S> {
   private final DoFn<Object, S> fn;
   private final PType<S> ntype;
 
-  <T> DoCollectionImpl(String name, PCollectionImpl<T> parent, DoFn<T, S> fn,
-      PType<S> ntype) {
+  <T> DoCollectionImpl(String name, PCollectionImpl<T> parent, DoFn<T, S> fn, PType<S> ntype) {
     super(name);
     this.parent = (PCollectionImpl<Object>) parent;
     this.fn = (DoFn<Object, S>) fn;
@@ -42,7 +42,7 @@ public class DoCollectionImpl<S> extends PCollectionImpl<S> {
   protected long getSizeInternal() {
     return (long) (fn.scaleFactor() * parent.getSize());
   }
-  
+
   @Override
   public PType<S> getPType() {
     return ntype;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
index 3e5a275..1d19580 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
@@ -26,17 +26,16 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
+
 import com.google.common.collect.ImmutableList;
 
-public class DoTableImpl<K, V> extends PTableBase<K, V> implements
-    PTable<K, V> {
+public class DoTableImpl<K, V> extends PTableBase<K, V> implements PTable<K, V> {
 
   private final PCollectionImpl<?> parent;
   private final DoFn<?, Pair<K, V>> fn;
   private final PTableType<K, V> type;
 
-  <S> DoTableImpl(String name, PCollectionImpl<S> parent,
-      DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype) {
+  <S> DoTableImpl(String name, PCollectionImpl<S> parent, DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype) {
     super(name);
     this.parent = parent;
     this.fn = fn;
@@ -72,7 +71,7 @@ public class DoTableImpl<K, V> extends PTableBase<K, V> implements
   public DoNode createDoNode() {
     return DoNode.createFnNode(getName(), fn, type);
   }
-  
+
   public boolean hasCombineFn() {
     return fn instanceof CombineFn;
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
index 5c9b93e..ace5cc1 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputCollection.java
@@ -20,11 +20,11 @@ package org.apache.crunch.impl.mr.collect;
 import java.util.List;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
-
 import org.apache.crunch.Source;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.types.PType;
+
 import com.google.common.collect.ImmutableList;
 
 public class InputCollection<S> extends PCollectionImpl<S> {
@@ -54,7 +54,7 @@ public class InputCollection<S> extends PCollectionImpl<S> {
     }
     return sz;
   }
-  
+
   @Override
   protected void acceptInternal(PCollectionImpl.Visitor visitor) {
     visitor.visitInputCollection(this);
@@ -69,7 +69,7 @@ public class InputCollection<S> extends PCollectionImpl<S> {
   public DoNode createDoNode() {
     return DoNode.createInputNode(source);
   }
-  
+
   @Override
   public boolean equals(Object obj) {
     if (obj == null || !(obj instanceof InputCollection)) {
@@ -77,7 +77,7 @@ public class InputCollection<S> extends PCollectionImpl<S> {
     }
     return source.equals(((InputCollection) obj).source);
   }
-  
+
   @Override
   public int hashCode() {
     return new HashCodeBuilder().append(source).toHashCode();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
index 617d768..9f64803 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/InputTable.java
@@ -25,13 +25,14 @@ import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
+
 import com.google.common.collect.ImmutableList;
 
 public class InputTable<K, V> extends PTableBase<K, V> {
 
   private final TableSource<K, V> source;
   private final InputCollection<Pair<K, V>> asCollection;
-  
+
   public InputTable(TableSource<K, V> source, MRPipeline pipeline) {
     super(source.toString());
     this.source = source;
@@ -68,12 +69,12 @@ public class InputTable<K, V> extends PTableBase<K, V> {
   public DoNode createDoNode() {
     return DoNode.createInputNode(source);
   }
-  
+
   @Override
   public int hashCode() {
     return asCollection.hashCode();
   }
-  
+
   @Override
   public boolean equals(Object other) {
     return asCollection.equals(other);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index a9e8401..962eeb8 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -22,7 +22,6 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.crunch.DoFn;
 import org.apache.crunch.FilterFn;
 import org.apache.crunch.MapFn;
@@ -41,6 +40,7 @@ import org.apache.crunch.lib.Sort;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+
 import com.google.common.collect.Lists;
 
 public abstract class PCollectionImpl<S> implements PCollection<S> {
@@ -217,8 +217,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
 
   public void accept(Visitor visitor) {
     if (materializedAt != null) {
-      visitor.visitInputCollection(new InputCollection<S>(materializedAt,
-          (MRPipeline) getPipeline()));
+      visitor.visitInputCollection(new InputCollection<S>(materializedAt, (MRPipeline) getPipeline()));
     } else {
       acceptInternal(visitor);
     }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
index 13e5567..5a40413 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
@@ -21,8 +21,6 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.Job;
-
 import org.apache.crunch.CombineFn;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
@@ -33,13 +31,14 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PType;
+import org.apache.hadoop.mapreduce.Job;
+
 import com.google.common.collect.ImmutableList;
 
-public class PGroupedTableImpl<K, V> extends
-    PCollectionImpl<Pair<K, Iterable<V>>> implements PGroupedTable<K, V> {
+public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>>> implements PGroupedTable<K, V> {
 
   private static final Log LOG = LogFactory.getLog(PGroupedTableImpl.class);
-  
+
   private final PTableBase<K, V> parent;
   private final GroupingOptions groupingOptions;
   private final PGroupedTableType<K, V> ptype;
@@ -58,8 +57,7 @@ public class PGroupedTableImpl<K, V> extends
   public void configureShuffle(Job job) {
     ptype.configureShuffle(job, groupingOptions);
     if (groupingOptions == null || groupingOptions.getNumReducers() <= 0) {
-      long bytesPerTask = job.getConfiguration().getLong("crunch.bytes.per.reduce.task",
-          (1000L * 1000L * 1000L));
+      long bytesPerTask = job.getConfiguration().getLong("crunch.bytes.per.reduce.task", (1000L * 1000L * 1000L));
       int numReduceTasks = 1 + (int) (getSize() / bytesPerTask);
       if (numReduceTasks > 0) {
         job.setNumReduceTasks(numReduceTasks);
@@ -69,20 +67,19 @@ public class PGroupedTableImpl<K, V> extends
       }
     }
   }
-  
+
   @Override
   protected long getSizeInternal() {
     return parent.getSizeInternal();
   }
-  
+
   @Override
   public PType<Pair<K, Iterable<V>>> getPType() {
     return ptype;
   }
 
   public PTable<K, V> combineValues(CombineFn<K, V> combineFn) {
-    return new DoTableImpl<K, V>("combine", this, combineFn,
-        parent.getPTableType());
+    return new DoTableImpl<K, V>("combine", this, combineFn, parent.getPTableType());
   }
 
   private static class Ungroup<K, V> extends DoFn<Pair<K, Iterable<V>>, Pair<K, V>> {
@@ -110,8 +107,7 @@ public class PGroupedTableImpl<K, V> extends
 
   @Override
   public DoNode createDoNode() {
-    return DoNode.createFnNode(getName(),
-        ptype.getInputMapFn(), ptype);
+    return DoNode.createFnNode(getName(), ptype.getInputMapFn(), ptype);
   }
 
   public DoNode getGroupingNode() {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
index f480001..4126f74 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
@@ -32,10 +32,10 @@ import org.apache.crunch.lib.Join;
 import org.apache.crunch.lib.PTables;
 import org.apache.crunch.materialize.MaterializableMap;
 import org.apache.crunch.types.PType;
+
 import com.google.common.collect.Lists;
 
-public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>>
-    implements PTable<K, V> {
+public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> implements PTable<K, V> {
 
   public PTableBase(String name) {
     super(name);
@@ -44,20 +44,19 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>>
   public PType<K> getKeyType() {
     return getPTableType().getKeyType();
   }
-  
+
   public PType<V> getValueType() {
     return getPTableType().getValueType();
   }
-  
+
   public PGroupedTableImpl<K, V> groupByKey() {
     return new PGroupedTableImpl<K, V>(this);
   }
 
   public PGroupedTableImpl<K, V> groupByKey(int numReduceTasks) {
-    return new PGroupedTableImpl<K, V>(this,
-        GroupingOptions.builder().numReducers(numReduceTasks).build());
+    return new PGroupedTableImpl<K, V>(this, GroupingOptions.builder().numReducers(numReduceTasks).build());
   }
-  
+
   public PGroupedTableImpl<K, V> groupByKey(GroupingOptions groupingOptions) {
     return new PGroupedTableImpl<K, V>(this, groupingOptions);
   }
@@ -71,43 +70,43 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>>
     }
     return new UnionTable<K, V>(internal);
   }
-  
+
   @Override
   public PTable<K, V> write(Target target) {
     getPipeline().write(this, target);
     return this;
   }
-  
+
   @Override
   public PTable<K, V> top(int count) {
-	return Aggregate.top(this, count, true);
+    return Aggregate.top(this, count, true);
   }
 
   @Override
   public PTable<K, V> bottom(int count) {
-	return Aggregate.top(this, count, false);
+    return Aggregate.top(this, count, false);
   }
-  
+
   @Override
   public PTable<K, Collection<V>> collectValues() {
-	return Aggregate.collectValues(this);
+    return Aggregate.collectValues(this);
   }
-  
+
   @Override
   public <U> PTable<K, Pair<V, U>> join(PTable<K, U> other) {
-	return Join.join(this, other);
+    return Join.join(this, other);
   }
-  
+
   @Override
   public <U> PTable<K, Pair<Collection<V>, Collection<U>>> cogroup(PTable<K, U> other) {
-	return Cogroup.cogroup(this, other);
+    return Cogroup.cogroup(this, other);
   }
-  
+
   @Override
   public PCollection<K> keys() {
-	return PTables.keys(this);
+    return PTables.keys(this);
   }
- 
+
   @Override
   public PCollection<V> values() {
     return PTables.values(this);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
index 19a1161..7b3dd7b 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
@@ -22,13 +22,14 @@ import java.util.List;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.types.PType;
+
 import com.google.common.collect.ImmutableList;
 
 public class UnionCollection<S> extends PCollectionImpl<S> {
 
   private List<PCollectionImpl<S>> parents;
   private long size = 0;
-  
+
   private static <S> String flatName(List<PCollectionImpl<S>> collections) {
     StringBuilder sb = new StringBuilder("union(");
     for (int i = 0; i < collections.size(); i++) {
@@ -39,15 +40,14 @@ public class UnionCollection<S> extends PCollectionImpl<S> {
     }
     return sb.append(')').toString();
   }
-  
+
   UnionCollection(List<PCollectionImpl<S>> collections) {
     super(flatName(collections));
     this.parents = ImmutableList.copyOf(collections);
     this.pipeline = (MRPipeline) parents.get(0).getPipeline();
     for (PCollectionImpl<S> parent : parents) {
       if (this.pipeline != parent.getPipeline()) {
-        throw new IllegalStateException(
-            "Cannot union PCollections from different Pipeline instances");
+        throw new IllegalStateException("Cannot union PCollections from different Pipeline instances");
       }
       size += parent.getSize();
     }
@@ -57,7 +57,7 @@ public class UnionCollection<S> extends PCollectionImpl<S> {
   protected long getSizeInternal() {
     return size;
   }
-  
+
   @Override
   protected void acceptInternal(PCollectionImpl.Visitor visitor) {
     visitor.visitUnionCollection(this);
@@ -75,7 +75,6 @@ public class UnionCollection<S> extends PCollectionImpl<S> {
 
   @Override
   public DoNode createDoNode() {
-    throw new UnsupportedOperationException(
-        "Unioned collection does not support DoNodes");
+    throw new UnsupportedOperationException("Unioned collection does not support DoNodes");
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
index f713912..a369432 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
@@ -24,6 +24,7 @@ import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
@@ -32,7 +33,7 @@ public class UnionTable<K, V> extends PTableBase<K, V> {
   private PTableType<K, V> ptype;
   private List<PCollectionImpl<Pair<K, V>>> parents;
   private long size;
-  
+
   private static <K, V> String flatName(List<PTableBase<K, V>> tables) {
     StringBuilder sb = new StringBuilder("union(");
     for (int i = 0; i < tables.size(); i++) {
@@ -43,7 +44,7 @@ public class UnionTable<K, V> extends PTableBase<K, V> {
     }
     return sb.append(')').toString();
   }
-  
+
   public UnionTable(List<PTableBase<K, V>> tables) {
     super(flatName(tables));
     this.ptype = tables.get(0).getPTableType();
@@ -51,8 +52,7 @@ public class UnionTable<K, V> extends PTableBase<K, V> {
     this.parents = Lists.newArrayList();
     for (PTableBase<K, V> parent : tables) {
       if (pipeline != parent.getPipeline()) {
-        throw new IllegalStateException(
-            "Cannot union PTables from different Pipeline instances");
+        throw new IllegalStateException("Cannot union PTables from different Pipeline instances");
       }
       this.parents.add(parent);
       size += parent.getSize();
@@ -63,7 +63,7 @@ public class UnionTable<K, V> extends PTableBase<K, V> {
   protected long getSizeInternal() {
     return size;
   }
-  
+
   @Override
   public PTableType<K, V> getPTableType() {
     return ptype;
@@ -81,14 +81,12 @@ public class UnionTable<K, V> extends PTableBase<K, V> {
 
   @Override
   protected void acceptInternal(PCollectionImpl.Visitor visitor) {
-    visitor.visitUnionCollection(new UnionCollection<Pair<K, V>>(
-        parents));
+    visitor.visitUnionCollection(new UnionCollection<Pair<K, V>>(parents));
   }
 
   @Override
   public DoNode createDoNode() {
-    throw new UnsupportedOperationException(
-        "Unioned table does not support do nodes");
+    throw new UnsupportedOperationException("Unioned table does not support do nodes");
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
index 242aa4d..4c97c42 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.impl.mr.run.RTNode;
+
 import com.google.common.collect.ImmutableList;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
index 5f52f41..9e3a224 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
@@ -19,11 +19,10 @@ package org.apache.crunch.impl.mr.emit;
 
 import java.io.IOException;
 
-import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
-
 import org.apache.crunch.Emitter;
 import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
 import org.apache.crunch.types.Converter;
+import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
 
 public class MultipleOutputEmitter<T, K, V> implements Emitter<T> {
 
@@ -31,8 +30,7 @@ public class MultipleOutputEmitter<T, K, V> implements Emitter<T> {
   private final CrunchMultipleOutputs<K, V> outputs;
   private final String outputName;
 
-  public MultipleOutputEmitter(Converter converter,
-      CrunchMultipleOutputs<K, V> outputs, String outputName) {
+  public MultipleOutputEmitter(Converter converter, CrunchMultipleOutputs<K, V> outputs, String outputName) {
     this.converter = converter;
     this.outputs = outputs;
     this.outputName = outputName;
@@ -41,8 +39,7 @@ public class MultipleOutputEmitter<T, K, V> implements Emitter<T> {
   @Override
   public void emit(T emitted) {
     try {
-      this.outputs.write(outputName, converter.outputKey(emitted),
-          converter.outputValue(emitted));
+      this.outputs.write(outputName, converter.outputKey(emitted), converter.outputValue(emitted));
     } catch (IOException e) {
       throw new CrunchRuntimeException(e);
     } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/emit/OutputEmitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/OutputEmitter.java b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/OutputEmitter.java
index deb090c..88990c2 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/OutputEmitter.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/OutputEmitter.java
@@ -19,19 +19,17 @@ package org.apache.crunch.impl.mr.emit;
 
 import java.io.IOException;
 
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
 import org.apache.crunch.Emitter;
 import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
 import org.apache.crunch.types.Converter;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 public class OutputEmitter<T, K, V> implements Emitter<T> {
 
   private final Converter<K, V, Object, Object> converter;
   private final TaskInputOutputContext<?, ?, K, V> context;
 
-  public OutputEmitter(Converter<K, V, Object, Object> converter,
-      TaskInputOutputContext<?, ?, K, V> context) {
+  public OutputEmitter(Converter<K, V, Object, Object> converter, TaskInputOutputContext<?, ?, K, V> context) {
     this.converter = converter;
     this.context = context;
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
index c790c1d..fbfcf05 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
@@ -22,6 +22,8 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.impl.mr.plan.MSCROutputHandler;
+import org.apache.crunch.impl.mr.plan.PlanningParameters;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -29,33 +31,30 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
 import org.apache.hadoop.util.StringUtils;
 
-import org.apache.crunch.impl.mr.plan.MSCROutputHandler;
-import org.apache.crunch.impl.mr.plan.PlanningParameters;
 import com.google.common.collect.Lists;
 
 public class CrunchJob extends CrunchControlledJob {
 
   private final Log log = LogFactory.getLog(CrunchJob.class);
-  
+
   private final Path workingPath;
   private final List<Path> multiPaths;
   private final boolean mapOnlyJob;
-  
+
   public CrunchJob(Job job, Path workingPath, MSCROutputHandler handler) throws IOException {
-    super(job, Lists.<CrunchControlledJob>newArrayList());
+    super(job, Lists.<CrunchControlledJob> newArrayList());
     this.workingPath = workingPath;
     this.multiPaths = handler.getMultiPaths();
     this.mapOnlyJob = handler.isMapOnlyJob();
-  }  
-  
+  }
+
   private synchronized void handleMultiPaths() throws IOException {
     if (!multiPaths.isEmpty()) {
       // Need to handle moving the data from the output directory of the
       // job to the output locations specified in the paths.
       FileSystem fs = FileSystem.get(job.getConfiguration());
       for (int i = 0; i < multiPaths.size(); i++) {
-        Path src = new Path(workingPath,
-            PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*");
+        Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*");
         Path[] srcs = FileUtil.stat2Paths(fs.globStatus(src), src);
         Path dst = multiPaths.get(i);
         if (!fs.exists(dst)) {
@@ -68,7 +67,7 @@ public class CrunchJob extends CrunchControlledJob {
       }
     }
   }
-  
+
   private Path getDestFile(Path src, Path dir, int index) {
     String form = "part-%s-%05d";
     if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) {
@@ -76,12 +75,12 @@ public class CrunchJob extends CrunchControlledJob {
     }
     return new Path(dir, String.format(form, mapOnlyJob ? "m" : "r", index));
   }
-  
+
   private int getMinPartIndex(Path path, FileSystem fs) throws IOException {
     // Quick and dirty way to ensure unique naming in the directory
     return fs.listStatus(path).length;
   }
-  
+
   @Override
   protected void checkRunningState() throws IOException, InterruptedException {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index b678187..c8c47b6 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -21,10 +21,10 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.PipelineResult;
 import org.apache.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
 import org.apache.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl;
 
-import org.apache.crunch.PipelineResult;
 import com.google.common.collect.Lists;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
index 5780da8..f63700e 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
@@ -20,8 +20,6 @@ package org.apache.crunch.impl.mr.plan;
 import java.util.List;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.conf.Configuration;
-
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Source;
 import org.apache.crunch.impl.mr.run.NodeContext;
@@ -29,6 +27,8 @@ import org.apache.crunch.impl.mr.run.RTNode;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
@@ -44,8 +44,8 @@ public class DoNode {
   private final Source<?> source;
   private String outputName;
 
-  private DoNode(DoFn fn, String name, PType<?> ptype, List<DoNode> children,
-      Converter outputConverter, Source<?> source) {
+  private DoNode(DoFn fn, String name, PType<?> ptype, List<DoNode> children, Converter outputConverter,
+      Source<?> source) {
     this.fn = fn;
     this.name = name;
     this.ptype = ptype;
@@ -58,48 +58,43 @@ public class DoNode {
     return Lists.newArrayList();
   }
 
-  public static <K, V> DoNode createGroupingNode(String name,
-      PGroupedTableType<K, V> ptype) {
-    DoFn<?,?> fn = ptype.getOutputMapFn();
-    return new DoNode(fn, name, ptype, NO_CHILDREN,
-        ptype.getGroupingConverter(), null);
+  public static <K, V> DoNode createGroupingNode(String name, PGroupedTableType<K, V> ptype) {
+    DoFn<?, ?> fn = ptype.getOutputMapFn();
+    return new DoNode(fn, name, ptype, NO_CHILDREN, ptype.getGroupingConverter(), null);
   }
-  
+
   public static <S> DoNode createOutputNode(String name, PType<S> ptype) {
     Converter outputConverter = ptype.getConverter();
-    DoFn<?,?> fn = ptype.getOutputMapFn();
-    return new DoNode(fn, name, ptype, NO_CHILDREN,
-        outputConverter, null);
+    DoFn<?, ?> fn = ptype.getOutputMapFn();
+    return new DoNode(fn, name, ptype, NO_CHILDREN, outputConverter, null);
   }
 
-  public static DoNode createFnNode(String name, DoFn<?, ?> function,
-      PType<?> ptype) {
+  public static DoNode createFnNode(String name, DoFn<?, ?> function, PType<?> ptype) {
     return new DoNode(function, name, ptype, allowsChildren(), null, null);
   }
 
   public static <S> DoNode createInputNode(Source<S> source) {
     PType<?> ptype = source.getType();
-    DoFn<?,?> fn = ptype.getInputMapFn();
-    return new DoNode(fn, source.toString(), ptype, allowsChildren(), null,
-        source);
+    DoFn<?, ?> fn = ptype.getInputMapFn();
+    return new DoNode(fn, source.toString(), ptype, allowsChildren(), null, source);
   }
 
   public boolean isInputNode() {
     return source != null;
   }
-  
+
   public boolean isOutputNode() {
     return outputConverter != null;
   }
-  
+
   public String getName() {
     return name;
   }
-  
+
   public List<DoNode> getChildren() {
     return children;
   }
-  
+
   public Source<?> getSource() {
     return source;
   }
@@ -117,8 +112,7 @@ public class DoNode {
 
   public void setOutputName(String outputName) {
     if (outputConverter == null) {
-      throw new IllegalStateException(
-          "Cannot set output name w/o output converter: " + outputName);
+      throw new IllegalStateException("Cannot set output name w/o output converter: " + outputName);
     }
     this.outputName = outputName;
   }
@@ -135,13 +129,12 @@ public class DoNode {
       if (nodeContext == NodeContext.MAP) {
         inputConverter = ptype.getConverter();
       } else {
-        inputConverter = ((PGroupedTableType<?,?>) ptype).getGroupingConverter();
+        inputConverter = ((PGroupedTableType<?, ?>) ptype).getGroupingConverter();
       }
-    }          
-    return new RTNode(fn, name, childRTNodes, inputConverter, outputConverter,
-        outputName);
+    }
+    return new RTNode(fn, name, childRTNodes, inputConverter, outputConverter, outputName);
   }
-  
+
   @Override
   public boolean equals(Object other) {
     if (other == null || !(other instanceof DoNode)) {
@@ -151,14 +144,12 @@ public class DoNode {
       return true;
     }
     DoNode o = (DoNode) other;
-    return (name.equals(o.name) && fn.equals(o.fn) && source == o.source &&
-        outputConverter == o.outputConverter);
+    return (name.equals(o.name) && fn.equals(o.fn) && source == o.source && outputConverter == o.outputConverter);
   }
-  
+
   @Override
   public int hashCode() {
     HashCodeBuilder hcb = new HashCodeBuilder();
-    return hcb.append(name).append(fn).append(source)
-        .append(outputConverter).toHashCode();
+    return hcb.append(name).append(fn).append(source).append(outputConverter).toHashCode();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java
index 55f690a..2645ada 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobNameBuilder.java
@@ -23,30 +23,29 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 
 /**
- * Visitor that traverses the {@code DoNode} instances in a job and builds
- * a String that identifies the stages of the pipeline that belong to
- * this job.
+ * Visitor that traverses the {@code DoNode} instances in a job and builds a
+ * String that identifies the stages of the pipeline that belong to this job.
  */
 public class JobNameBuilder {
-  
+
   private static final Joiner JOINER = Joiner.on("+");
   private static final Joiner CHILD_JOINER = Joiner.on("/");
-  
+
   private String pipelineName;
   List<String> rootStack = Lists.newArrayList();
-  
-  public JobNameBuilder(final String pipelineName){
+
+  public JobNameBuilder(final String pipelineName) {
     this.pipelineName = pipelineName;
   }
-  
+
   public void visit(DoNode node) {
     visit(node, rootStack);
   }
-  
+
   public void visit(List<DoNode> nodes) {
     visit(nodes, rootStack);
   }
-  
+
   private void visit(List<DoNode> nodes, List<String> stack) {
     if (nodes.size() == 1) {
       visit(nodes.get(0), stack);
@@ -65,7 +64,7 @@ public class JobNameBuilder {
       }
     }
   }
-  
+
   private void visit(DoNode node, List<String> stack) {
     String name = node.getName();
     if (!name.isEmpty()) {
@@ -73,7 +72,7 @@ public class JobNameBuilder {
     }
     visit(node.getChildren(), stack);
   }
-  
+
   public String build() {
     return String.format("%s: %s", pipelineName, JOINER.join(rootStack));
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index 165641a..0cae079 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -23,10 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.Target;
 import org.apache.crunch.impl.mr.collect.DoTableImpl;
@@ -40,6 +36,10 @@ import org.apache.crunch.impl.mr.run.CrunchReducer;
 import org.apache.crunch.impl.mr.run.NodeContext;
 import org.apache.crunch.impl.mr.run.RTNode;
 import org.apache.crunch.util.DistCache;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -49,29 +49,26 @@ import com.google.common.collect.Sets;
 
 public class JobPrototype {
 
-  public static JobPrototype createMapReduceJob(PGroupedTableImpl<?,?> group,
-      Set<NodePath> inputs, Path workingPath) {
+  public static JobPrototype createMapReduceJob(PGroupedTableImpl<?, ?> group, Set<NodePath> inputs, Path workingPath) {
     return new JobPrototype(inputs, group, workingPath);
   }
 
-  public static JobPrototype createMapOnlyJob(
-      HashMultimap<Target, NodePath> mapNodePaths, Path workingPath) {
+  public static JobPrototype createMapOnlyJob(HashMultimap<Target, NodePath> mapNodePaths, Path workingPath) {
     return new JobPrototype(mapNodePaths, workingPath);
   }
 
   private final Set<NodePath> mapNodePaths;
-  private final PGroupedTableImpl<?,?> group;
+  private final PGroupedTableImpl<?, ?> group;
   private final Set<JobPrototype> dependencies = Sets.newHashSet();
   private final Map<PCollectionImpl<?>, DoNode> nodes = Maps.newHashMap();
   private final Path workingPath;
-  
+
   private HashMultimap<Target, NodePath> targetsToNodePaths;
-  private DoTableImpl<?,?> combineFnTable;
+  private DoTableImpl<?, ?> combineFnTable;
 
   private CrunchJob job;
 
-  private JobPrototype(Set<NodePath> inputs, PGroupedTableImpl<?,?> group,
-      Path workingPath) {
+  private JobPrototype(Set<NodePath> inputs, PGroupedTableImpl<?, ?> group, Path workingPath) {
     this.mapNodePaths = ImmutableSet.copyOf(inputs);
     this.group = group;
     this.workingPath = workingPath;
@@ -87,8 +84,7 @@ public class JobPrototype {
 
   public void addReducePaths(HashMultimap<Target, NodePath> outputPaths) {
     if (group == null) {
-      throw new IllegalStateException(
-          "Cannot add a reduce phase to a map-only job");
+      throw new IllegalStateException("Cannot add a reduce phase to a map-only job");
     }
     this.targetsToNodePaths = outputPaths;
   }
@@ -112,12 +108,11 @@ public class JobPrototype {
     conf = job.getConfiguration();
     conf.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, workingPath.toString());
     job.setJarByClass(jarClass);
-    
+
     Set<DoNode> outputNodes = Sets.newHashSet();
     Set<Target> targets = targetsToNodePaths.keySet();
     Path outputPath = new Path(workingPath, "output");
-    MSCROutputHandler outputHandler = new MSCROutputHandler(job, outputPath,
-        group == null);
+    MSCROutputHandler outputHandler = new MSCROutputHandler(job, outputPath, group == null);
     for (Target target : targets) {
       DoNode node = null;
       for (NodePath nodePath : targetsToNodePaths.get(target)) {
@@ -145,13 +140,12 @@ public class JobPrototype {
         DoNode combineNode = combineFnTable.createDoNode();
         combineNode.addChild(group.getGroupingNode());
         combinerInputNode.addChild(combineNode);
-        serialize(ImmutableList.of(combinerInputNode), conf, workingPath,
-            NodeContext.COMBINE);
+        serialize(ImmutableList.of(combinerInputNode), conf, workingPath, NodeContext.COMBINE);
       }
 
       group.configureShuffle(job);
 
-      DoNode mapOutputNode = group.getGroupingNode();      
+      DoNode mapOutputNode = group.getGroupingNode();
       Set<DoNode> mapNodes = Sets.newHashSet();
       for (NodePath nodePath : mapNodePaths) {
         // Advance these one step, since we've already configured
@@ -179,12 +173,12 @@ public class JobPrototype {
       job.setInputFormatClass(CrunchInputFormat.class);
     }
     job.setJobName(createJobName(pipeline.getName(), inputNodes, reduceNode));
-    
+
     return new CrunchJob(job, outputPath, outputHandler);
   }
 
-  private void serialize(List<DoNode> nodes, Configuration conf, Path workingPath,
-      NodeContext context) throws IOException {
+  private void serialize(List<DoNode> nodes, Configuration conf, Path workingPath, NodeContext context)
+      throws IOException {
     List<RTNode> rtNodes = Lists.newArrayList();
     for (DoNode node : nodes) {
       rtNodes.add(node.toRTNode(true, conf, context));
@@ -201,16 +195,14 @@ public class JobPrototype {
     }
     return builder.build();
   }
-  
+
   private DoNode walkPath(Iterator<PCollectionImpl<?>> iter, DoNode working) {
     while (iter.hasNext()) {
       PCollectionImpl<?> collect = iter.next();
-      if (combineFnTable != null &&
-          !(collect instanceof PGroupedTableImpl)) {
+      if (combineFnTable != null && !(collect instanceof PGroupedTableImpl)) {
         combineFnTable = null;
-      } else if (collect instanceof DoTableImpl &&
-          ((DoTableImpl<?,?>) collect).hasCombineFn()) {
-        combineFnTable = (DoTableImpl<?,?>) collect;
+      } else if (collect instanceof DoTableImpl && ((DoTableImpl<?, ?>) collect).hasCombineFn()) {
+        combineFnTable = (DoTableImpl<?, ?>) collect;
       }
       if (!nodes.containsKey(collect)) {
         nodes.put(collect, collect.createDoNode());

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
index af193f4..bfd3e26 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
@@ -19,14 +19,14 @@ package org.apache.crunch.impl.mr.plan;
 
 import java.util.List;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
 import org.apache.crunch.Target;
 import org.apache.crunch.io.MapReduceTarget;
 import org.apache.crunch.io.OutputHandler;
 import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
 import com.google.common.collect.Lists;
 
 public class MSCROutputHandler implements OutputHandler {
@@ -34,22 +34,22 @@ public class MSCROutputHandler implements OutputHandler {
   private final Job job;
   private final Path path;
   private final boolean mapOnlyJob;
-  
+
   private DoNode workingNode;
   private List<Path> multiPaths;
-  
+
   public MSCROutputHandler(Job job, Path outputPath, boolean mapOnlyJob) {
     this.job = job;
     this.path = outputPath;
     this.mapOnlyJob = mapOnlyJob;
     this.multiPaths = Lists.newArrayList();
   }
-  
+
   public void configureNode(DoNode node, Target target) {
     workingNode = node;
     target.accept(this, node.getPType());
   }
-  
+
   public boolean configure(Target target, PType<?> ptype) {
     if (target instanceof MapReduceTarget && target instanceof PathTarget) {
       String name = PlanningParameters.MULTI_OUTPUT_PREFIX + multiPaths.size();
@@ -68,7 +68,7 @@ public class MSCROutputHandler implements OutputHandler {
   public boolean isMapOnlyJob() {
     return mapOnlyJob;
   }
-  
+
   public List<Path> getMultiPaths() {
     return multiPaths;
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 29aeafd..975d5a0 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -26,8 +26,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
-import org.apache.hadoop.conf.Configuration;
-
 import org.apache.crunch.Source;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
@@ -39,6 +37,8 @@ import org.apache.crunch.impl.mr.collect.PCollectionImpl;
 import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
 import org.apache.crunch.impl.mr.collect.UnionCollection;
 import org.apache.crunch.impl.mr.exec.MRExecutor;
+import org.apache.hadoop.conf.Configuration;
+
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -46,33 +46,33 @@ import com.google.common.collect.Sets;
 
 public class MSCRPlanner {
 
-  // Used to ensure that we always build pipelines starting from the deepest outputs, which
+  // Used to ensure that we always build pipelines starting from the deepest
+  // outputs, which
   // helps ensure that we handle intermediate outputs correctly.
   private static final Comparator<PCollectionImpl<?>> DEPTH_COMPARATOR = new Comparator<PCollectionImpl<?>>() {
     @Override
     public int compare(PCollectionImpl<?> left, PCollectionImpl<?> right) {
-      int cmp = right.getDepth() - left.getDepth();   
-      if (cmp == 0){
-          // Ensure we don't throw away two output collections at the same depth.
-          // Using the collection name would be nicer here, but names aren't necessarily unique
-          cmp = new Integer(right.hashCode()).compareTo(left.hashCode());
+      int cmp = right.getDepth() - left.getDepth();
+      if (cmp == 0) {
+        // Ensure we don't throw away two output collections at the same depth.
+        // Using the collection name would be nicer here, but names aren't
+        // necessarily unique
+        cmp = new Integer(right.hashCode()).compareTo(left.hashCode());
       }
       return cmp;
     }
   };
-  
+
   private final MRPipeline pipeline;
   private final Map<PCollectionImpl<?>, Set<Target>> outputs;
 
-  public MSCRPlanner(MRPipeline pipeline,
-      Map<PCollectionImpl<?>, Set<Target>> outputs) {
+  public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs) {
     this.pipeline = pipeline;
     this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR);
     this.outputs.putAll(outputs);
   }
 
-  public MRExecutor plan(Class<?> jarClass, Configuration conf)
-      throws IOException {
+  public MRExecutor plan(Class<?> jarClass, Configuration conf) throws IOException {
     // Constructs all of the node paths, which either start w/an input
     // or a GBK and terminate in an output collection of any type.
     NodeVisitor visitor = new NodeVisitor();
@@ -86,17 +86,15 @@ public class MSCRPlanner {
     // Keeps track of the dependencies from collections -> jobs and then
     // between different jobs.
     Map<PCollectionImpl<?>, JobPrototype> assignments = Maps.newHashMap();
-    Map<PCollectionImpl<?>, Set<JobPrototype>> jobDependencies =
-        new HashMap<PCollectionImpl<?>, Set<JobPrototype>>();
+    Map<PCollectionImpl<?>, Set<JobPrototype>> jobDependencies = new HashMap<PCollectionImpl<?>, Set<JobPrototype>>();
 
     // Find the set of GBKs that DO NOT depend on any other GBK.
-    Set<PGroupedTableImpl<?,?>> workingGroupings = null;
+    Set<PGroupedTableImpl<?, ?>> workingGroupings = null;
     while (!(workingGroupings = getWorkingGroupings(nodePaths)).isEmpty()) {
 
-      for (PGroupedTableImpl<?,?> grouping : workingGroupings) {
+      for (PGroupedTableImpl<?, ?> grouping : workingGroupings) {
         Set<NodePath> mapInputPaths = nodePaths.get(grouping);
-        JobPrototype proto = JobPrototype.createMapReduceJob(grouping,
-            mapInputPaths, pipeline.createTempPath());
+        JobPrototype proto = JobPrototype.createMapReduceJob(grouping, mapInputPaths, pipeline.createTempPath());
         assignments.put(grouping, proto);
         if (jobDependencies.containsKey(grouping)) {
           for (JobPrototype dependency : jobDependencies.get(grouping)) {
@@ -105,10 +103,9 @@ public class MSCRPlanner {
         }
       }
 
-      Map<PGroupedTableImpl<?,?>, Set<NodePath>> dependencyPaths = getDependencyPaths(
-          workingGroupings, nodePaths);
-      for (Map.Entry<PGroupedTableImpl<?,?>, Set<NodePath>> entry : dependencyPaths.entrySet()) {
-        PGroupedTableImpl<?,?> grouping = entry.getKey();
+      Map<PGroupedTableImpl<?, ?>, Set<NodePath>> dependencyPaths = getDependencyPaths(workingGroupings, nodePaths);
+      for (Map.Entry<PGroupedTableImpl<?, ?>, Set<NodePath>> entry : dependencyPaths.entrySet()) {
+        PGroupedTableImpl<?, ?> grouping = entry.getKey();
         Set<NodePath> currentNodePaths = entry.getValue();
 
         JobPrototype proto = assignments.get(grouping);
@@ -118,7 +115,7 @@ public class MSCRPlanner {
           if (tail instanceof PGroupedTableImpl) {
             gbkPaths.add(nodePath);
             if (!jobDependencies.containsKey(tail)) {
-              jobDependencies.put(tail, Sets.<JobPrototype>newHashSet());
+              jobDependencies.put(tail, Sets.<JobPrototype> newHashSet());
             }
             jobDependencies.get(tail).add(proto);
           }
@@ -129,7 +126,8 @@ public class MSCRPlanner {
         }
 
         // At this point, all of the dependencies for the working groups will be
-        // file outputs, and so we can add them all to the JobPrototype-- we now have
+        // file outputs, and so we can add them all to the JobPrototype-- we now
+        // have
         // a complete job.
         HashMultimap<Target, NodePath> reduceOutputs = HashMultimap.create();
         for (NodePath nodePath : currentNodePaths) {
@@ -148,8 +146,7 @@ public class MSCRPlanner {
 
     // Process any map-only jobs that are remaining.
     if (!nodePaths.isEmpty()) {
-      for (Map.Entry<PCollectionImpl<?>, Set<NodePath>> entry : nodePaths
-          .entrySet()) {
+      for (Map.Entry<PCollectionImpl<?>, Set<NodePath>> entry : nodePaths.entrySet()) {
         PCollectionImpl<?> collect = entry.getKey();
         if (!assignments.containsKey(collect)) {
           HashMultimap<Target, NodePath> mapOutputs = HashMultimap.create();
@@ -158,9 +155,8 @@ public class MSCRPlanner {
               mapOutputs.put(target, nodePath);
             }
           }
-          JobPrototype proto = JobPrototype.createMapOnlyJob(mapOutputs,
-              pipeline.createTempPath());
-          
+          JobPrototype proto = JobPrototype.createMapOnlyJob(mapOutputs, pipeline.createTempPath());
+
           if (jobDependencies.containsKey(collect)) {
             for (JobPrototype dependency : jobDependencies.get(collect)) {
               proto.addDependency(dependency);
@@ -178,11 +174,10 @@ public class MSCRPlanner {
     return exec;
   }
 
-  private Map<PGroupedTableImpl<?,?>, Set<NodePath>> getDependencyPaths(
-      Set<PGroupedTableImpl<?,?>> workingGroupings,
+  private Map<PGroupedTableImpl<?, ?>, Set<NodePath>> getDependencyPaths(Set<PGroupedTableImpl<?, ?>> workingGroupings,
       Map<PCollectionImpl<?>, Set<NodePath>> nodePaths) {
-    Map<PGroupedTableImpl<?,?>, Set<NodePath>> dependencyPaths = Maps.newHashMap();
-    for (PGroupedTableImpl<?,?> grouping : workingGroupings) {
+    Map<PGroupedTableImpl<?, ?>, Set<NodePath>> dependencyPaths = Maps.newHashMap();
+    for (PGroupedTableImpl<?, ?> grouping : workingGroupings) {
       dependencyPaths.put(grouping, Sets.<NodePath> newHashSet());
     }
 
@@ -237,15 +232,13 @@ public class MSCRPlanner {
     return splitIndex;
   }
 
-  private void handleGroupingDependencies(Set<NodePath> gbkPaths,
-      Set<NodePath> currentNodePaths) throws IOException {
+  private void handleGroupingDependencies(Set<NodePath> gbkPaths, Set<NodePath> currentNodePaths) throws IOException {
     int splitIndex = getSplitIndex(currentNodePaths);
-    PCollectionImpl<?> splitTarget = currentNodePaths.iterator().next()
-        .get(splitIndex);
+    PCollectionImpl<?> splitTarget = currentNodePaths.iterator().next().get(splitIndex);
     if (!outputs.containsKey(splitTarget)) {
-      outputs.put(splitTarget, Sets.<Target>newHashSet());
+      outputs.put(splitTarget, Sets.<Target> newHashSet());
     }
-    
+
     SourceTarget srcTarget = null;
     Target targetToReplace = null;
     for (Target t : outputs.get(splitTarget)) {
@@ -272,18 +265,17 @@ public class MSCRPlanner {
     Set<NodePath> nextNodePaths = Sets.newHashSet();
     for (NodePath nodePath : currentNodePaths) {
       if (gbkPaths.contains(nodePath)) {
-    	nextNodePaths.add(nodePath.splitAt(splitIndex, inputNode));
+        nextNodePaths.add(nodePath.splitAt(splitIndex, inputNode));
       } else {
-    	nextNodePaths.add(nodePath);
+        nextNodePaths.add(nodePath);
       }
     }
     currentNodePaths.clear();
     currentNodePaths.addAll(nextNodePaths);
   }
 
-  private Set<PGroupedTableImpl<?,?>> getWorkingGroupings(
-      Map<PCollectionImpl<?>, Set<NodePath>> nodePaths) {
-    Set<PGroupedTableImpl<?,?>> gbks = Sets.newHashSet();
+  private Set<PGroupedTableImpl<?, ?>> getWorkingGroupings(Map<PCollectionImpl<?>, Set<NodePath>> nodePaths) {
+    Set<PGroupedTableImpl<?, ?>> gbks = Sets.newHashSet();
     for (PCollectionImpl<?> target : nodePaths.keySet()) {
       if (target instanceof PGroupedTableImpl) {
         boolean hasGBKDependency = false;
@@ -294,7 +286,7 @@ public class MSCRPlanner {
           }
         }
         if (!hasGBKDependency) {
-          gbks.add((PGroupedTableImpl<?,?>) target);
+          gbks.add((PGroupedTableImpl<?, ?>) target);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
index 4ab30ca..c7a67bd 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 
 import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+
 import com.google.common.collect.Lists;
 
 class NodePath implements Iterable<PCollectionImpl<?>> {
@@ -75,12 +76,12 @@ class NodePath implements Iterable<PCollectionImpl<?>> {
     NodePath nodePath = (NodePath) other;
     return path.equals(nodePath.path);
   }
-  
+
   @Override
   public int hashCode() {
     return 17 + 37 * path.hashCode();
   }
-  
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
@@ -90,7 +91,7 @@ class NodePath implements Iterable<PCollectionImpl<?>> {
     sb.deleteCharAt(sb.length() - 1);
     return sb.toString();
   }
-  
+
   public NodePath splitAt(int splitIndex, PCollectionImpl<?> newHead) {
     NodePath top = new NodePath();
     for (int i = 0; i <= splitIndex; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
index b8e59a3..6ead212 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
@@ -20,8 +20,9 @@ package org.apache.crunch.impl.mr.plan;
 public class PlanningParameters {
 
   public static final String MULTI_OUTPUT_PREFIX = "out";
-  
+
   public static final String CRUNCH_WORKING_DIRECTORY = "crunch.work.dir";
-  
-  private PlanningParameters() {}
+
+  private PlanningParameters() {
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java
index fcfbc36..47a3ded 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchCombiner.java
@@ -23,5 +23,5 @@ public class CrunchCombiner extends CrunchReducer {
   protected NodeContext getNodeContext() {
     return NodeContext.COMBINE;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
index 6b49735..7e91bdd 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.crunch.io.impl.InputBundle;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -32,14 +33,12 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.util.ReflectionUtils;
 
-import org.apache.crunch.io.impl.InputBundle;
 import com.google.common.collect.Lists;
 
 public class CrunchInputFormat<K, V> extends InputFormat<K, V> {
 
   @Override
-  public List<InputSplit> getSplits(JobContext job) throws IOException,
-      InterruptedException {
+  public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
     List<InputSplit> splits = Lists.newArrayList();
     Configuration conf = job.getConfiguration();
     Map<InputBundle, Map<Integer, List<Path>>> formatNodeMap = CrunchInputs.getFormatNodeMap(job);
@@ -48,10 +47,9 @@ public class CrunchInputFormat<K, V> extends InputFormat<K, V> {
     for (Map.Entry<InputBundle, Map<Integer, List<Path>>> entry : formatNodeMap.entrySet()) {
       InputBundle inputBundle = entry.getKey();
       Job jobCopy = new Job(conf);
-      InputFormat<?,?> format = (InputFormat<?,?>) ReflectionUtils.newInstance(
-          inputBundle.getInputFormatClass(), jobCopy.getConfiguration());
-      for (Map.Entry<Integer, List<Path>> nodeEntry : entry.getValue()
-          .entrySet()) {
+      InputFormat<?, ?> format = (InputFormat<?, ?>) ReflectionUtils.newInstance(inputBundle.getInputFormatClass(),
+          jobCopy.getConfiguration());
+      for (Map.Entry<Integer, List<Path>> nodeEntry : entry.getValue().entrySet()) {
         Integer nodeIndex = nodeEntry.getKey();
         List<Path> paths = nodeEntry.getValue();
         FileInputFormat.setInputPaths(jobCopy, paths.toArray(new Path[paths.size()]));
@@ -60,8 +58,8 @@ public class CrunchInputFormat<K, V> extends InputFormat<K, V> {
         // and Mapper types by wrapping in a TaggedInputSplit.
         List<InputSplit> pathSplits = format.getSplits(jobCopy);
         for (InputSplit pathSplit : pathSplits) {
-          splits.add(new CrunchInputSplit(pathSplit, inputBundle.getInputFormatClass(),
-              inputBundle.getExtraConfiguration(), nodeIndex, jobCopy.getConfiguration()));
+          splits.add(new CrunchInputSplit(pathSplit, inputBundle.getInputFormatClass(), inputBundle
+              .getExtraConfiguration(), nodeIndex, jobCopy.getConfiguration()));
         }
       }
     }
@@ -69,9 +67,9 @@ public class CrunchInputFormat<K, V> extends InputFormat<K, V> {
   }
 
   @Override
-  public RecordReader<K, V> createRecordReader(InputSplit inputSplit,
-      TaskAttemptContext context) throws IOException, InterruptedException {
-    return new CrunchRecordReader<K,V>(inputSplit, context);
+  public RecordReader<K, V> createRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException,
+      InterruptedException {
+    return new CrunchRecordReader<K, V>(inputSplit, context);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
index b57ca58..5882180 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
@@ -47,9 +47,8 @@ public class CrunchInputSplit extends InputSplit implements Configurable, Writab
     // default constructor
   }
 
-  public CrunchInputSplit(InputSplit inputSplit,
-      Class<? extends InputFormat> inputFormatClass, Map<String, String> extraConf,
-      int nodeIndex, Configuration conf) {
+  public CrunchInputSplit(InputSplit inputSplit, Class<? extends InputFormat> inputFormatClass,
+      Map<String, String> extraConf, int nodeIndex, Configuration conf) {
     this.inputSplit = inputSplit;
     this.inputFormatClass = inputFormatClass;
     this.extraConf = extraConf;
@@ -87,10 +86,9 @@ public class CrunchInputSplit extends InputSplit implements Configurable, Writab
         conf.set(in.readUTF(), in.readUTF());
       }
     }
-    inputFormatClass = (Class<? extends InputFormat<?,?>>) readClass(in);
+    inputFormatClass = (Class<? extends InputFormat<?, ?>>) readClass(in);
     Class<? extends InputSplit> inputSplitClass = (Class<? extends InputSplit>) readClass(in);
-    inputSplit = (InputSplit) ReflectionUtils
-        .newInstance(inputSplitClass, conf);
+    inputSplit = (InputSplit) ReflectionUtils.newInstance(inputSplitClass, conf);
     SerializationFactory factory = new SerializationFactory(conf);
     Deserializer deserializer = factory.getDeserializer(inputSplitClass);
     deserializer.open((DataInputStream) in);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java
index 8fa1d56..93868fc 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java
@@ -20,12 +20,12 @@ package org.apache.crunch.impl.mr.run;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.crunch.io.impl.InputBundle;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 
-import org.apache.crunch.io.impl.InputBundle;
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
@@ -37,18 +37,15 @@ public class CrunchInputs {
   private static final char FIELD_SEP = ';';
   private static final Joiner JOINER = Joiner.on(FIELD_SEP);
   private static final Splitter SPLITTER = Splitter.on(FIELD_SEP);
-  
-  public static void addInputPath(Job job, Path path,
-      InputBundle inputBundle, int nodeIndex) {
+
+  public static void addInputPath(Job job, Path path, InputBundle inputBundle, int nodeIndex) {
     Configuration conf = job.getConfiguration();
     String inputs = JOINER.join(inputBundle.serialize(), String.valueOf(nodeIndex), path.toString());
     String existing = conf.get(RuntimeParameters.MULTI_INPUTS);
-    conf.set(RuntimeParameters.MULTI_INPUTS, existing == null ? inputs : existing + RECORD_SEP
-        + inputs);
+    conf.set(RuntimeParameters.MULTI_INPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs);
   }
 
-  public static Map<InputBundle, Map<Integer, List<Path>>> getFormatNodeMap(
-      JobContext job) {
+  public static Map<InputBundle, Map<Integer, List<Path>>> getFormatNodeMap(JobContext job) {
     Map<InputBundle, Map<Integer, List<Path>>> formatNodeMap = Maps.newHashMap();
     Configuration conf = job.getConfiguration();
     for (String input : Splitter.on(RECORD_SEP).split(conf.get(RuntimeParameters.MULTI_INPUTS))) {
@@ -59,11 +56,9 @@ public class CrunchInputs {
       }
       Integer nodeIndex = Integer.valueOf(fields.get(1));
       if (!formatNodeMap.get(inputBundle).containsKey(nodeIndex)) {
-        formatNodeMap.get(inputBundle).put(nodeIndex,
-            Lists.<Path> newLinkedList());
+        formatNodeMap.get(inputBundle).put(nodeIndex, Lists.<Path> newLinkedList());
       }
-      formatNodeMap.get(inputBundle).get(nodeIndex)
-          .add(new Path(fields.get(2)));
+      formatNodeMap.get(inputBundle).get(nodeIndex).add(new Path(fields.get(2)));
     }
     return formatNodeMap;
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
index 814c8c3..2de1567 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
@@ -27,11 +27,11 @@ import org.apache.hadoop.mapreduce.Mapper;
 public class CrunchMapper extends Mapper<Object, Object, Object, Object> {
 
   private static final Log LOG = LogFactory.getLog(CrunchMapper.class);
-  
+
   private RTNode node;
   private CrunchTaskContext ctxt;
   private boolean debug;
-  
+
   @Override
   protected void setup(Mapper<Object, Object, Object, Object>.Context context) {
     List<RTNode> nodes;
@@ -52,8 +52,7 @@ public class CrunchMapper extends Mapper<Object, Object, Object, Object> {
   }
 
   @Override
-  protected void map(Object k, Object v,
-      Mapper<Object, Object, Object, Object>.Context context) {
+  protected void map(Object k, Object v, Mapper<Object, Object, Object, Object>.Context context) {
     if (debug) {
       try {
         node.process(k, v);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
index 5967aa9..c3575a5 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
@@ -29,14 +29,13 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> {
 
   private final RecordReader<K, V> delegate;
 
-  public CrunchRecordReader(InputSplit inputSplit, final TaskAttemptContext context)
-      throws IOException, InterruptedException {
+  public CrunchRecordReader(InputSplit inputSplit, final TaskAttemptContext context) throws IOException,
+      InterruptedException {
     CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
-    InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
-        .newInstance(crunchSplit.getInputFormatClass(), crunchSplit.getConf());
-    this.delegate = inputFormat.createRecordReader(
-        crunchSplit.getInputSplit(), TaskAttemptContextFactory.create(
-            crunchSplit.getConf(), context.getTaskAttemptID()));
+    InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils.newInstance(crunchSplit.getInputFormatClass(),
+        crunchSplit.getConf());
+    this.delegate = inputFormat.createRecordReader(crunchSplit.getInputSplit(),
+        TaskAttemptContextFactory.create(crunchSplit.getConf(), context.getTaskAttemptID()));
   }
 
   @Override
@@ -60,12 +59,11 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> {
   }
 
   @Override
-  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
-      throws IOException, InterruptedException {
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
     CrunchInputSplit crunchSplit = (CrunchInputSplit) inputSplit;
     InputSplit delegateSplit = crunchSplit.getInputSplit();
-    delegate.initialize(delegateSplit, TaskAttemptContextFactory.create(
-        crunchSplit.getConf(), context.getTaskAttemptID()));
+    delegate.initialize(delegateSplit,
+        TaskAttemptContextFactory.create(crunchSplit.getConf(), context.getTaskAttemptID()));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
index aa5fc95..89fbd26 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
@@ -27,15 +27,15 @@ import org.apache.hadoop.mapreduce.Reducer;
 public class CrunchReducer extends Reducer<Object, Object, Object, Object> {
 
   private static final Log LOG = LogFactory.getLog(CrunchReducer.class);
-  
+
   private RTNode node;
   private CrunchTaskContext ctxt;
   private boolean debug;
-  
+
   protected NodeContext getNodeContext() {
     return NodeContext.REDUCE;
   }
-  
+
   @Override
   protected void setup(Reducer<Object, Object, Object, Object>.Context context) {
     this.ctxt = new CrunchTaskContext(context, getNodeContext());
@@ -50,8 +50,7 @@ public class CrunchReducer extends Reducer<Object, Object, Object, Object> {
   }
 
   @Override
-  protected void reduce(Object key, Iterable<Object> values,
-      Reducer<Object, Object, Object, Object>.Context context) {
+  protected void reduce(Object key, Iterable<Object> values, Reducer<Object, Object, Object, Object>.Context context) {
     if (debug) {
       try {
         node.processIterable(key, values);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java
index 9eac51c..72e939f 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRuntimeException.java
@@ -20,15 +20,15 @@ package org.apache.crunch.impl.mr.run;
 public class CrunchRuntimeException extends RuntimeException {
 
   private boolean logged = false;
-  
+
   public CrunchRuntimeException(String msg) {
     super(msg);
   }
-  
+
   public CrunchRuntimeException(Exception e) {
     super(e);
   }
-  
+
   public CrunchRuntimeException(String msg, Exception e) {
     super(msg, e);
   }
@@ -36,7 +36,7 @@ public class CrunchRuntimeException extends RuntimeException {
   public boolean wasLogged() {
     return logged;
   }
-  
+
   public void markLogged() {
     this.logged = true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
index dd04813..8093cd5 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
@@ -20,23 +20,20 @@ package org.apache.crunch.impl.mr.run;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.util.DistCache;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
 
-import org.apache.crunch.impl.mr.plan.PlanningParameters;
-import org.apache.crunch.util.DistCache;
-
 public class CrunchTaskContext {
 
   private final TaskInputOutputContext<Object, Object, Object, Object> taskContext;
   private final NodeContext nodeContext;
   private CrunchMultipleOutputs<Object, Object> multipleOutputs;
 
-  public CrunchTaskContext(
-      TaskInputOutputContext<Object, Object, Object, Object> taskContext,
-      NodeContext nodeContext) {
+  public CrunchTaskContext(TaskInputOutputContext<Object, Object, Object, Object> taskContext, NodeContext nodeContext) {
     this.taskContext = taskContext;
     this.nodeContext = nodeContext;
   }
@@ -61,12 +58,12 @@ public class CrunchTaskContext {
     }
     return nodes;
   }
-  
+
   public boolean isDebugRun() {
     Configuration conf = taskContext.getConfiguration();
     return conf.getBoolean(RuntimeParameters.DEBUG, false);
   }
-  
+
   public void cleanup() {
     if (multipleOutputs != null) {
       try {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
index 648e87c..ffc9e7c 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/NodeContext.java
@@ -25,7 +25,9 @@ import org.apache.crunch.impl.mr.plan.DoNode;
  * 
  */
 public enum NodeContext {
-  MAP, REDUCE, COMBINE;
+  MAP,
+  REDUCE,
+  COMBINE;
 
   public String getConfigurationKey() {
     return "crunch.donode." + toString().toLowerCase();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
index 4debfc1..0eb429a 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
@@ -22,7 +22,6 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.impl.mr.emit.IntermediateEmitter;
@@ -31,9 +30,9 @@ import org.apache.crunch.impl.mr.emit.OutputEmitter;
 import org.apache.crunch.types.Converter;
 
 public class RTNode implements Serializable {
-  
+
   private static final Log LOG = LogFactory.getLog(RTNode.class);
-  
+
   private final String nodeName;
   private DoFn<Object, Object> fn;
   private final List<RTNode> children;
@@ -43,8 +42,8 @@ public class RTNode implements Serializable {
 
   private transient Emitter<Object> emitter;
 
-  public RTNode(DoFn<Object, Object> fn, String name, List<RTNode> children,
-      Converter inputConverter, Converter outputConverter, String outputName) {
+  public RTNode(DoFn<Object, Object> fn, String name, List<RTNode> children, Converter inputConverter,
+      Converter outputConverter, String outputName) {
     this.fn = fn;
     this.nodeName = name;
     this.children = children;
@@ -58,7 +57,7 @@ public class RTNode implements Serializable {
       // Already initialized
       return;
     }
-    
+
     fn.setContext(ctxt.getContext());
     for (RTNode child : children) {
       child.initialize(ctxt);
@@ -66,11 +65,9 @@ public class RTNode implements Serializable {
 
     if (outputConverter != null) {
       if (outputName != null) {
-        this.emitter = new MultipleOutputEmitter(
-            outputConverter, ctxt.getMultipleOutputs(), outputName);
+        this.emitter = new MultipleOutputEmitter(outputConverter, ctxt.getMultipleOutputs(), outputName);
       } else {
-        this.emitter = new OutputEmitter(
-            outputConverter, ctxt.getContext());
+        this.emitter = new OutputEmitter(outputConverter, ctxt.getContext());
       }
     } else if (!children.isEmpty()) {
       this.emitter = new IntermediateEmitter(children);
@@ -88,8 +85,7 @@ public class RTNode implements Serializable {
       fn.process(input, emitter);
     } catch (CrunchRuntimeException e) {
       if (!e.wasLogged()) {
-        LOG.info(String.format("Crunch exception in '%s' for input: %s",
-            nodeName, input.toString()), e);
+        LOG.info(String.format("Crunch exception in '%s' for input: %s", nodeName, input.toString()), e);
         e.markLogged();
       }
       throw e;
@@ -103,7 +99,7 @@ public class RTNode implements Serializable {
   public void processIterable(Object key, Iterable values) {
     process(inputConverter.convertIterableInput(key, values));
   }
-  
+
   public void cleanup() {
     fn.cleanup(emitter);
     emitter.flush();
@@ -114,9 +110,7 @@ public class RTNode implements Serializable {
 
   @Override
   public String toString() {
-    return "RTNode [nodeName=" + nodeName + ", fn=" + fn + ", children="
-        + children + ", inputConverter=" + inputConverter
-        + ", outputConverter=" + outputConverter + ", outputName=" + outputName
-        + "]";
+    return "RTNode [nodeName=" + nodeName + ", fn=" + fn + ", children=" + children + ", inputConverter="
+        + inputConverter + ", outputConverter=" + outputConverter + ", outputName=" + outputName + "]";
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
index 5e534eb..f16752f 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -19,16 +19,17 @@ package org.apache.crunch.impl.mr.run;
 
 /**
  * Parameters used during the runtime execution.
- *
+ * 
  */
 public class RuntimeParameters {
 
   public static final String AGGREGATOR_BUCKETS = "crunch.aggregator.buckets";
-  
+
   public static final String MULTI_INPUTS = "crunch.inputs.dir";
 
   public static final String DEBUG = "crunch.debug";
-  
+
   // Not instantiated
-  private RuntimeParameters() {}
+  private RuntimeParameters() {
+  }
 }


Mime
View raw message