incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject [9/10] Format all sources according to formatting profile
Date Sat, 14 Jul 2012 18:14:55 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java b/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
index 10d033f..39b95b5 100644
--- a/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
+++ b/crunch/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.StringTokenizer;
 
+import org.apache.crunch.impl.mr.run.TaskAttemptContextFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -36,15 +37,12 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.hadoop.util.ReflectionUtils;
 
-import org.apache.crunch.impl.mr.run.TaskAttemptContextFactory;
-
 /**
- * The MultipleOutputs class simplifies writing output data 
- * to multiple outputs
+ * The MultipleOutputs class simplifies writing output data to multiple outputs
  * 
- * <p> 
+ * <p>
  * Case one: writing to additional outputs other than the job default output.
- *
+ * 
  * Each additional output, or named output, may be configured with its own
  * <code>OutputFormat</code>, with its own key class and with its own value
  * class.
@@ -54,39 +52,41 @@ import org.apache.crunch.impl.mr.run.TaskAttemptContextFactory;
  * </p>
  * 
  * <p>
- * MultipleOutputs supports counters, by default they are disabled. The 
- * counters group is the {@link CrunchMultipleOutputs} class name. The names of the 
- * counters are the same as the output name. These count the number records 
+ * MultipleOutputs supports counters, by default they are disabled. The counters
+ * group is the {@link CrunchMultipleOutputs} class name. The names of the
+ * counters are the same as the output name. These count the number records
  * written to each output name.
  * </p>
  * 
  * Usage pattern for job submission:
+ * 
  * <pre>
- *
+ * 
  * Job job = new Job();
- *
+ * 
  * FileInputFormat.setInputPath(job, inDir);
  * FileOutputFormat.setOutputPath(job, outDir);
- *
+ * 
  * job.setMapperClass(MOMap.class);
  * job.setReducerClass(MOReduce.class);
  * ...
- *
+ * 
  * // Defines additional single text based output 'text' for the job
  * MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
  * LongWritable.class, Text.class);
- *
+ * 
  * // Defines additional sequence-file based output 'sequence' for the job
  * MultipleOutputs.addNamedOutput(job, "seq",
  *   SequenceFileOutputFormat.class,
  *   LongWritable.class, Text.class);
  * ...
- *
+ * 
  * job.waitForCompletion(true);
  * ...
  * </pre>
  * <p>
  * Usage in Reducer:
+ * 
  * <pre>
  * <K, V> String generateFileName(K k, V v) {
  *   return k.toString() + "_" + v.toString();
@@ -99,7 +99,7 @@ import org.apache.crunch.impl.mr.run.TaskAttemptContextFactory;
  * ...
  * mos = new MultipleOutputs(context);
  * }
- *
+ * 
  * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
  * Context context)
  * throws IOException {
@@ -110,12 +110,12 @@ import org.apache.crunch.impl.mr.run.TaskAttemptContextFactory;
  * mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
  * ...
  * }
- *
+ * 
  * public void cleanup(Context) throws IOException {
  * mos.close();
  * ...
  * }
- *
+ * 
  * }
  * </pre>
  */
@@ -123,22 +123,20 @@ public class CrunchMultipleOutputs<KEYOUT, VALUEOUT> {
 
   private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs";
 
-  private static final String MO_PREFIX = 
-    "mapreduce.multipleoutputs.namedOutput.";
+  private static final String MO_PREFIX = "mapreduce.multipleoutputs.namedOutput.";
 
   private static final String FORMAT = ".format";
   private static final String KEY = ".key";
   private static final String VALUE = ".value";
-  private static final String COUNTERS_ENABLED = 
-    "mapreduce.multipleoutputs.counters";
+  private static final String COUNTERS_ENABLED = "mapreduce.multipleoutputs.counters";
 
   private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
-  
+
   /**
    * Counters group used by the counters of MultipleOutputs.
    */
   private static final String COUNTERS_GROUP = CrunchMultipleOutputs.class.getName();
-  
+
   /**
    * Cache for the taskContexts
    */
@@ -146,14 +144,15 @@ public class CrunchMultipleOutputs<KEYOUT, VALUEOUT> {
 
   /**
    * Checks if a named output name is valid token.
-   *
-   * @param namedOutput named output Name
-   * @throws IllegalArgumentException if the output name is not valid.
+   * 
+   * @param namedOutput
+   *          named output Name
+   * @throws IllegalArgumentException
+   *           if the output name is not valid.
    */
   private static void checkTokenName(String namedOutput) {
     if (namedOutput == null || namedOutput.length() == 0) {
-      throw new IllegalArgumentException(
-        "Name cannot be NULL or emtpy");
+      throw new IllegalArgumentException("Name cannot be NULL or emtpy");
     }
     for (char ch : namedOutput.toCharArray()) {
       if ((ch >= 'A') && (ch <= 'Z')) {
@@ -165,49 +164,49 @@ public class CrunchMultipleOutputs<KEYOUT, VALUEOUT> {
       if ((ch >= '0') && (ch <= '9')) {
         continue;
       }
-      throw new IllegalArgumentException(
-        "Name cannot be have a '" + ch + "' char");
+      throw new IllegalArgumentException("Name cannot be have a '" + ch + "' char");
     }
   }
 
   /**
    * Checks if output name is valid.
-   *
+   * 
    * name cannot be the name used for the default output
-   * @param outputPath base output Name
-   * @throws IllegalArgumentException if the output name is not valid.
+   * 
+   * @param outputPath
+   *          base output Name
+   * @throws IllegalArgumentException
+   *           if the output name is not valid.
    */
   private static void checkBaseOutputPath(String outputPath) {
     if (outputPath.equals(FileOutputFormat.PART)) {
       throw new IllegalArgumentException("output name cannot be 'part'");
     }
   }
-  
+
   /**
    * Checks if a named output name is valid.
-   *
-   * @param namedOutput named output Name
-   * @throws IllegalArgumentException if the output name is not valid.
+   * 
+   * @param namedOutput
+   *          named output Name
+   * @throws IllegalArgumentException
+   *           if the output name is not valid.
    */
-  private static void checkNamedOutputName(JobContext job,
-      String namedOutput, boolean alreadyDefined) {
+  private static void checkNamedOutputName(JobContext job, String namedOutput, boolean alreadyDefined) {
     checkTokenName(namedOutput);
     checkBaseOutputPath(namedOutput);
     List<String> definedChannels = getNamedOutputsList(job);
     if (alreadyDefined && definedChannels.contains(namedOutput)) {
-      throw new IllegalArgumentException("Named output '" + namedOutput +
-        "' already alreadyDefined");
+      throw new IllegalArgumentException("Named output '" + namedOutput + "' already alreadyDefined");
     } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
-      throw new IllegalArgumentException("Named output '" + namedOutput +
-        "' not defined");
+      throw new IllegalArgumentException("Named output '" + namedOutput + "' not defined");
     }
   }
 
   // Returns list of channel names.
   private static List<String> getNamedOutputsList(JobContext job) {
     List<String> names = new ArrayList<String>();
-    StringTokenizer st = new StringTokenizer(
-      job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
+    StringTokenizer st = new StringTokenizer(job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
     while (st.hasMoreTokens()) {
       names.add(st.nextToken());
     }
@@ -216,48 +215,44 @@ public class CrunchMultipleOutputs<KEYOUT, VALUEOUT> {
 
   // Returns the named output OutputFormat.
   @SuppressWarnings("unchecked")
-  private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(
-    JobContext job, String namedOutput) {
-    return (Class<? extends OutputFormat<?, ?>>)
-      job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null,
-      OutputFormat.class);
+  private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(JobContext job, String namedOutput) {
+    return (Class<? extends OutputFormat<?, ?>>) job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT,
+        null, OutputFormat.class);
   }
 
   // Returns the key class for a named output.
-  private static Class<?> getNamedOutputKeyClass(JobContext job,
-                                                String namedOutput) {
-    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null,
-      Object.class);
+  private static Class<?> getNamedOutputKeyClass(JobContext job, String namedOutput) {
+    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null, Object.class);
   }
 
   // Returns the value class for a named output.
-  private static Class<?> getNamedOutputValueClass(
-      JobContext job, String namedOutput) {
-    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE,
-      null, Object.class);
+  private static Class<?> getNamedOutputValueClass(JobContext job, String namedOutput) {
+    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE, null, Object.class);
   }
 
   /**
    * Adds a named output for the job.
    * <p/>
-   *
-   * @param job               job to add the named output
-   * @param namedOutput       named output name, it has to be a word, letters
-   *                          and numbers only, cannot be the word 'part' as
-   *                          that is reserved for the default output.
-   * @param outputFormatClass OutputFormat class.
-   * @param keyClass          key class
-   * @param valueClass        value class
+   * 
+   * @param job
+   *          job to add the named output
+   * @param namedOutput
+   *          named output name, it has to be a word, letters and numbers only,
+   *          cannot be the word 'part' as that is reserved for the default
+   *          output.
+   * @param outputFormatClass
+   *          OutputFormat class.
+   * @param keyClass
+   *          key class
+   * @param valueClass
+   *          value class
    */
-  public static void addNamedOutput(Job job, String namedOutput,
-      Class<? extends OutputFormat> outputFormatClass,
+  public static void addNamedOutput(Job job, String namedOutput, Class<? extends OutputFormat> outputFormatClass,
       Class<?> keyClass, Class<?> valueClass) {
     checkNamedOutputName(job, namedOutput, true);
     Configuration conf = job.getConfiguration();
-    conf.set(MULTIPLE_OUTPUTS,
-      conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
-    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
-      OutputFormat.class);
+    conf.set(MULTIPLE_OUTPUTS, conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
+    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass, OutputFormat.class);
     conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
     conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
   }
@@ -265,23 +260,26 @@ public class CrunchMultipleOutputs<KEYOUT, VALUEOUT> {
   /**
    * Enables or disables counters for the named outputs.
    * 
-   * The counters group is the {@link CrunchMultipleOutputs} class name.
-   * The names of the counters are the same as the named outputs. These
-   * counters count the number records written to each output name.
-   * By default these counters are disabled.
-   *
-   * @param job    job  to enable counters
-   * @param enabled indicates if the counters will be enabled or not.
+   * The counters group is the {@link CrunchMultipleOutputs} class name. The
+   * names of the counters are the same as the named outputs. These counters
+   * count the number records written to each output name. By default these
+   * counters are disabled.
+   * 
+   * @param job
+   *          job to enable counters
+   * @param enabled
+   *          indicates if the counters will be enabled or not.
    */
   public static void setCountersEnabled(Job job, boolean enabled) {
     job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled);
   }
 
   /**
-   * Returns if the counters for the named outputs are enabled or not.
-   * By default these counters are disabled.
-   *
-   * @param job    the job 
+   * Returns if the counters for the named outputs are enabled or not. By
+   * default these counters are disabled.
+   * 
+   * @param job
+   *          the job
    * @return TRUE if the counters are enabled, FALSE if they are disabled.
    */
   public static boolean getCountersEnabled(JobContext job) {
@@ -289,7 +287,7 @@ public class CrunchMultipleOutputs<KEYOUT, VALUEOUT> {
   }
 
   /**
-   * Wraps RecordWriter to increment counters. 
+   * Wraps RecordWriter to increment counters.
    */
   @SuppressWarnings("unchecked")
   private static class RecordWriterWithCounter extends RecordWriter {
@@ -297,22 +295,19 @@ public class CrunchMultipleOutputs<KEYOUT, VALUEOUT> {
     private String counterName;
     private TaskInputOutputContext context;
 
-    public RecordWriterWithCounter(RecordWriter writer, String counterName,
-                                   TaskInputOutputContext context) {
+    public RecordWriterWithCounter(RecordWriter writer, String counterName, TaskInputOutputContext context) {
       this.writer = writer;
       this.counterName = counterName;
       this.context = context;
     }
 
-    @SuppressWarnings({"unchecked"})
-    public void write(Object key, Object value) 
-        throws IOException, InterruptedException {
+    @SuppressWarnings({ "unchecked" })
+    public void write(Object key, Object value) throws IOException, InterruptedException {
       context.getCounter(COUNTERS_GROUP, counterName).increment(1);
       writer.write(key, value);
     }
 
-    public void close(TaskAttemptContext context) 
-        throws IOException, InterruptedException {
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
       writer.close(context);
     }
   }
@@ -323,55 +318,59 @@ public class CrunchMultipleOutputs<KEYOUT, VALUEOUT> {
   private Set<String> namedOutputs;
   private Map<String, RecordWriter<?, ?>> recordWriters;
   private boolean countersEnabled;
-  
+
   /**
-   * Creates and initializes multiple outputs support,
-   * it should be instantiated in the Mapper/Reducer setup method.
-   *
-   * @param context the TaskInputOutputContext object
+   * Creates and initializes multiple outputs support, it should be instantiated
+   * in the Mapper/Reducer setup method.
+   * 
+   * @param context
+   *          the TaskInputOutputContext object
    */
-  public CrunchMultipleOutputs(
-      TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
+  public CrunchMultipleOutputs(TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
     this.context = context;
-    namedOutputs = Collections.unmodifiableSet(
-      new HashSet<String>(CrunchMultipleOutputs.getNamedOutputsList(context)));
+    namedOutputs = Collections.unmodifiableSet(new HashSet<String>(CrunchMultipleOutputs.getNamedOutputsList(context)));
     recordWriters = new HashMap<String, RecordWriter<?, ?>>();
     countersEnabled = getCountersEnabled(context);
   }
 
   /**
    * Write key and value to the namedOutput.
-   *
-   * Output path is a unique file generated for the namedOutput.
-   * For example, {namedOutput}-(m|r)-{part-number}
    * 
-   * @param namedOutput the named output name
-   * @param key         the key
-   * @param value       the value
+   * Output path is a unique file generated for the namedOutput. For example,
+   * {namedOutput}-(m|r)-{part-number}
+   * 
+   * @param namedOutput
+   *          the named output name
+   * @param key
+   *          the key
+   * @param value
+   *          the value
    */
   @SuppressWarnings("unchecked")
-  public <K, V> void write(String namedOutput, K key, V value)
-      throws IOException, InterruptedException {
+  public <K, V> void write(String namedOutput, K key, V value) throws IOException, InterruptedException {
     write(namedOutput, key, value, namedOutput);
   }
 
   /**
    * Write key and value to baseOutputPath using the namedOutput.
    * 
-   * @param namedOutput    the named output name
-   * @param key            the key
-   * @param value          the value
-   * @param baseOutputPath base-output path to write the record to.
-   * Note: Framework will generate unique filename for the baseOutputPath
+   * @param namedOutput
+   *          the named output name
+   * @param key
+   *          the key
+   * @param value
+   *          the value
+   * @param baseOutputPath
+   *          base-output path to write the record to. Note: Framework will
+   *          generate unique filename for the baseOutputPath
    */
   @SuppressWarnings("unchecked")
-  public <K, V> void write(String namedOutput, K key, V value,
-      String baseOutputPath) throws IOException, InterruptedException {
+  public <K, V> void write(String namedOutput, K key, V value, String baseOutputPath) throws IOException,
+      InterruptedException {
     checkNamedOutputName(context, namedOutput, false);
     checkBaseOutputPath(baseOutputPath);
     if (!namedOutputs.contains(namedOutput)) {
-      throw new IllegalArgumentException("Undefined named output '" +
-        namedOutput + "'");
+      throw new IllegalArgumentException("Undefined named output '" + namedOutput + "'");
     }
     TaskAttemptContext taskContext = getContext(namedOutput);
     getRecordWriter(taskContext, baseOutputPath).write(key, value);
@@ -380,59 +379,59 @@ public class CrunchMultipleOutputs<KEYOUT, VALUEOUT> {
   /**
    * Write key value to an output file name.
    * 
-   * Gets the record writer from job's output format.  
-   * Job's output format should be a FileOutputFormat.
+   * Gets the record writer from job's output format. Job's output format should
+   * be a FileOutputFormat.
    * 
-   * @param key       the key
-   * @param value     the value
-   * @param baseOutputPath base-output path to write the record to.
-   * Note: Framework will generate unique filename for the baseOutputPath
+   * @param key
+   *          the key
+   * @param value
+   *          the value
+   * @param baseOutputPath
+   *          base-output path to write the record to. Note: Framework will
+   *          generate unique filename for the baseOutputPath
    */
   @SuppressWarnings("unchecked")
-  public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 
-      throws IOException, InterruptedException {
+  public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) throws IOException, InterruptedException {
     checkBaseOutputPath(baseOutputPath);
-    TaskAttemptContext taskContext = TaskAttemptContextFactory.create(
-      context.getConfiguration(), context.getTaskAttemptID());
+    TaskAttemptContext taskContext = TaskAttemptContextFactory.create(context.getConfiguration(),
+        context.getTaskAttemptID());
     getRecordWriter(taskContext, baseOutputPath).write(key, value);
   }
 
   // by being synchronized MultipleOutputTask can be use with a
   // MultithreadedMapper.
   @SuppressWarnings("unchecked")
-  private synchronized RecordWriter getRecordWriter(
-      TaskAttemptContext taskContext, String baseFileName) 
+  private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext, String baseFileName)
       throws IOException, InterruptedException {
-    
+
     // look for record-writer in the cache
     RecordWriter writer = recordWriters.get(baseFileName);
-    
+
     // If not in cache, create a new one
     if (writer == null) {
       // get the record writer from context output format
       taskContext.getConfiguration().set(BASE_OUTPUT_NAME, baseFileName);
       try {
-        writer = ((OutputFormat) ReflectionUtils.newInstance(
-          taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
-          .getRecordWriter(taskContext);
+        writer = ((OutputFormat) ReflectionUtils.newInstance(taskContext.getOutputFormatClass(),
+            taskContext.getConfiguration())).getRecordWriter(taskContext);
       } catch (ClassNotFoundException e) {
         throw new IOException(e);
       }
- 
-      // if counters are enabled, wrap the writer with context 
-      // to increment counters 
+
+      // if counters are enabled, wrap the writer with context
+      // to increment counters
       if (countersEnabled) {
         writer = new RecordWriterWithCounter(writer, baseFileName, context);
       }
-      
+
       // add the record-writer to the cache
       recordWriters.put(baseFileName, writer);
     }
     return writer;
   }
 
-   // Create a taskAttemptContext for the named output with 
-   // output format and output key/value types put in the context
+  // Create a taskAttemptContext for the named output with
+  // output format and output key/value types put in the context
   private TaskAttemptContext getContext(String nameOutput) throws IOException {
 
     TaskAttemptContext taskContext = taskContexts.get(nameOutput);
@@ -448,20 +447,19 @@ public class CrunchMultipleOutputs<KEYOUT, VALUEOUT> {
     job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
     job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
     job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
-    taskContext = TaskAttemptContextFactory.create(
-      job.getConfiguration(), context.getTaskAttemptID());
-    
+    taskContext = TaskAttemptContextFactory.create(job.getConfiguration(), context.getTaskAttemptID());
+
     taskContexts.put(nameOutput, taskContext);
-    
+
     return taskContext;
   }
-  
+
   /**
    * Closes all the opened outputs.
    * 
-   * This should be called from cleanup method of map/reduce task.
-   * If overridden subclasses must invoke <code>super.close()</code> at the
-   * end of their <code>close()</code>
+   * This should be called from cleanup method of map/reduce task. If overridden
+   * subclasses must invoke <code>super.close()</code> at the end of their
+   * <code>close()</code>
    * 
    */
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/CombineFnTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/CombineFnTest.java b/crunch/src/test/java/org/apache/crunch/CombineFnTest.java
index e015498..4f08bbe 100644
--- a/crunch/src/test/java/org/apache/crunch/CombineFnTest.java
+++ b/crunch/src/test/java/org/apache/crunch/CombineFnTest.java
@@ -37,8 +37,6 @@ import static org.junit.Assert.assertEquals;
 import java.math.BigInteger;
 import java.util.List;
 
-import org.junit.Test;
-
 import org.apache.crunch.CombineFn.Aggregator;
 import org.apache.crunch.CombineFn.AggregatorFactory;
 import org.apache.crunch.CombineFn.FirstNAggregator;
@@ -49,6 +47,8 @@ import org.apache.crunch.CombineFn.PairAggregator;
 import org.apache.crunch.CombineFn.QuadAggregator;
 import org.apache.crunch.CombineFn.TripAggregator;
 import org.apache.crunch.CombineFn.TupleNAggregator;
+import org.junit.Test;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 
@@ -57,7 +57,7 @@ public class CombineFnTest {
   private <T> Iterable<T> applyAggregator(AggregatorFactory<T> a, Iterable<T> values) {
     return applyAggregator(a.create(), values);
   }
-  
+
   private <T> Iterable<T> applyAggregator(Aggregator<T> a, Iterable<T> values) {
     a.reset();
     for (T value : values) {
@@ -65,142 +65,122 @@ public class CombineFnTest {
     }
     return a.results();
   }
-  
+
   @Test
   public void testSums() {
-    assertEquals(ImmutableList.of(1775L),
-        applyAggregator(SUM_LONGS, ImmutableList.of(29L, 17L, 1729L)));
+    assertEquals(ImmutableList.of(1775L), applyAggregator(SUM_LONGS, ImmutableList.of(29L, 17L, 1729L)));
 
-    assertEquals(ImmutableList.of(1765L),
-        applyAggregator(SUM_LONGS, ImmutableList.of(29L, 7L, 1729L)));
+    assertEquals(ImmutableList.of(1765L), applyAggregator(SUM_LONGS, ImmutableList.of(29L, 7L, 1729L)));
 
-    assertEquals(ImmutableList.of(1775),
-        applyAggregator(SUM_INTS, ImmutableList.of(29, 17, 1729)));
+    assertEquals(ImmutableList.of(1775), applyAggregator(SUM_INTS, ImmutableList.of(29, 17, 1729)));
 
-    assertEquals(ImmutableList.of(1775.0f),
-        applyAggregator(SUM_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
+    assertEquals(ImmutableList.of(1775.0f), applyAggregator(SUM_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
 
-    assertEquals(ImmutableList.of(1775.0),
-        applyAggregator(SUM_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
-    
-    assertEquals(ImmutableList.of(new BigInteger("1775")),
+    assertEquals(ImmutableList.of(1775.0), applyAggregator(SUM_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
+
+    assertEquals(
+        ImmutableList.of(new BigInteger("1775")),
         applyAggregator(SUM_BIGINTS,
             ImmutableList.of(new BigInteger("29"), new BigInteger("17"), new BigInteger("1729"))));
   }
-  
+
   @Test
   public void testMax() {
-    assertEquals(ImmutableList.of(1729L),
-        applyAggregator(MAX_LONGS, ImmutableList.of(29L, 17L, 1729L)));
-    
-    assertEquals(ImmutableList.of(1729),
-        applyAggregator(MAX_INTS, ImmutableList.of(29, 17, 1729)));
-
-    assertEquals(ImmutableList.of(1729.0f),
-        applyAggregator(MAX_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
-
-    assertEquals(ImmutableList.of(1729.0),
-        applyAggregator(MAX_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
-    
-    assertEquals(ImmutableList.of(1745.0f),
-        applyAggregator(MAX_FLOATS, ImmutableList.of(29f, 1745f, 17f, 1729f)));
-
-    assertEquals(ImmutableList.of(new BigInteger("1729")),
+    assertEquals(ImmutableList.of(1729L), applyAggregator(MAX_LONGS, ImmutableList.of(29L, 17L, 1729L)));
+
+    assertEquals(ImmutableList.of(1729), applyAggregator(MAX_INTS, ImmutableList.of(29, 17, 1729)));
+
+    assertEquals(ImmutableList.of(1729.0f), applyAggregator(MAX_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
+
+    assertEquals(ImmutableList.of(1729.0), applyAggregator(MAX_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
+
+    assertEquals(ImmutableList.of(1745.0f), applyAggregator(MAX_FLOATS, ImmutableList.of(29f, 1745f, 17f, 1729f)));
+
+    assertEquals(
+        ImmutableList.of(new BigInteger("1729")),
         applyAggregator(MAX_BIGINTS,
             ImmutableList.of(new BigInteger("29"), new BigInteger("17"), new BigInteger("1729"))));
   }
-  
+
   @Test
   public void testMin() {
-    assertEquals(ImmutableList.of(17L),
-        applyAggregator(MIN_LONGS, ImmutableList.of(29L, 17L, 1729L)));
-    
-    assertEquals(ImmutableList.of(17),
-        applyAggregator(MIN_INTS, ImmutableList.of(29, 17, 1729)));
-
-    assertEquals(ImmutableList.of(17.0f),
-        applyAggregator(MIN_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
-
-    assertEquals(ImmutableList.of(17.0),
-        applyAggregator(MIN_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
-    
-    assertEquals(ImmutableList.of(29),
-        applyAggregator(MIN_INTS, ImmutableList.of(29, 170, 1729)));
-    
-    assertEquals(ImmutableList.of(new BigInteger("17")),
+    assertEquals(ImmutableList.of(17L), applyAggregator(MIN_LONGS, ImmutableList.of(29L, 17L, 1729L)));
+
+    assertEquals(ImmutableList.of(17), applyAggregator(MIN_INTS, ImmutableList.of(29, 17, 1729)));
+
+    assertEquals(ImmutableList.of(17.0f), applyAggregator(MIN_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
+
+    assertEquals(ImmutableList.of(17.0), applyAggregator(MIN_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
+
+    assertEquals(ImmutableList.of(29), applyAggregator(MIN_INTS, ImmutableList.of(29, 170, 1729)));
+
+    assertEquals(
+        ImmutableList.of(new BigInteger("17")),
         applyAggregator(MIN_BIGINTS,
             ImmutableList.of(new BigInteger("29"), new BigInteger("17"), new BigInteger("1729"))));
   }
 
   @Test
   public void testMaxN() {
-    assertEquals(ImmutableList.of(98, 1009), applyAggregator(new MaxNAggregator<Integer>(2),
-        ImmutableList.of(17, 34, 98, 29, 1009)));
+    assertEquals(ImmutableList.of(98, 1009),
+        applyAggregator(new MaxNAggregator<Integer>(2), ImmutableList.of(17, 34, 98, 29, 1009)));
   }
 
   @Test
   public void testMinN() {
-    assertEquals(ImmutableList.of(17, 29), applyAggregator(new MinNAggregator<Integer>(2),
-        ImmutableList.of(17, 34, 98, 29, 1009)));
+    assertEquals(ImmutableList.of(17, 29),
+        applyAggregator(new MinNAggregator<Integer>(2), ImmutableList.of(17, 34, 98, 29, 1009)));
   }
 
   @Test
   public void testFirstN() {
-    assertEquals(ImmutableList.of(17, 34), applyAggregator(new FirstNAggregator<Integer>(2),
-        ImmutableList.of(17, 34, 98, 29, 1009)));
+    assertEquals(ImmutableList.of(17, 34),
+        applyAggregator(new FirstNAggregator<Integer>(2), ImmutableList.of(17, 34, 98, 29, 1009)));
   }
 
   @Test
   public void testLastN() {
-    assertEquals(ImmutableList.of(29, 1009), applyAggregator(new LastNAggregator<Integer>(2),
-        ImmutableList.of(17, 34, 98, 29, 1009)));
+    assertEquals(ImmutableList.of(29, 1009),
+        applyAggregator(new LastNAggregator<Integer>(2), ImmutableList.of(17, 34, 98, 29, 1009)));
   }
-  
+
   @Test
   public void testPairs() {
     List<Pair<Long, Double>> input = ImmutableList.of(Pair.of(1720L, 17.29), Pair.of(9L, -3.14));
-    Aggregator<Pair<Long, Double>> a = new PairAggregator<Long, Double>(
-        SUM_LONGS.create(), MIN_DOUBLES.create());
+    Aggregator<Pair<Long, Double>> a = new PairAggregator<Long, Double>(SUM_LONGS.create(), MIN_DOUBLES.create());
     assertEquals(Pair.of(1729L, -3.14), Iterables.getOnlyElement(applyAggregator(a, input)));
   }
-  
+
   @Test
   public void testPairsTwoLongs() {
     List<Pair<Long, Long>> input = ImmutableList.of(Pair.of(1720L, 1L), Pair.of(9L, 19L));
-    Aggregator<Pair<Long, Long>> a = new PairAggregator<Long, Long>(
-        SUM_LONGS.create(), SUM_LONGS.create());
+    Aggregator<Pair<Long, Long>> a = new PairAggregator<Long, Long>(SUM_LONGS.create(), SUM_LONGS.create());
     assertEquals(Pair.of(1729L, 20L), Iterables.getOnlyElement(applyAggregator(a, input)));
   }
-  
+
   @Test
   public void testTrips() {
-    List<Tuple3<Float, Double, Double>> input = ImmutableList.of(
-        Tuple3.of(17.29f, 12.2, 0.1), Tuple3.of(3.0f, 1.2, 3.14), Tuple3.of(-1.0f, 14.5, -0.98));
-    Aggregator<Tuple3<Float, Double, Double>> a = new TripAggregator<Float, Double, Double>(
-        MAX_FLOATS.create(), MAX_DOUBLES.create(), MIN_DOUBLES.create());
-    assertEquals(Tuple3.of(17.29f, 14.5, -0.98),
-        Iterables.getOnlyElement(applyAggregator(a, input)));
+    List<Tuple3<Float, Double, Double>> input = ImmutableList.of(Tuple3.of(17.29f, 12.2, 0.1),
+        Tuple3.of(3.0f, 1.2, 3.14), Tuple3.of(-1.0f, 14.5, -0.98));
+    Aggregator<Tuple3<Float, Double, Double>> a = new TripAggregator<Float, Double, Double>(MAX_FLOATS.create(),
+        MAX_DOUBLES.create(), MIN_DOUBLES.create());
+    assertEquals(Tuple3.of(17.29f, 14.5, -0.98), Iterables.getOnlyElement(applyAggregator(a, input)));
   }
-  
+
   @Test
   public void testQuads() {
-    List<Tuple4<Float, Double, Double, Integer>> input = ImmutableList.of(
-        Tuple4.of(17.29f, 12.2, 0.1, 1), Tuple4.of(3.0f, 1.2, 3.14, 2),
-        Tuple4.of(-1.0f, 14.5, -0.98, 3));
-    Aggregator<Tuple4<Float, Double, Double, Integer>> a =
-        new QuadAggregator<Float, Double, Double, Integer>(MAX_FLOATS.create(),
-            MAX_DOUBLES.create(), MIN_DOUBLES.create(), SUM_INTS.create());
-    assertEquals(Tuple4.of(17.29f, 14.5, -0.98, 6),
-        Iterables.getOnlyElement(applyAggregator(a, input)));
+    List<Tuple4<Float, Double, Double, Integer>> input = ImmutableList.of(Tuple4.of(17.29f, 12.2, 0.1, 1),
+        Tuple4.of(3.0f, 1.2, 3.14, 2), Tuple4.of(-1.0f, 14.5, -0.98, 3));
+    Aggregator<Tuple4<Float, Double, Double, Integer>> a = new QuadAggregator<Float, Double, Double, Integer>(
+        MAX_FLOATS.create(), MAX_DOUBLES.create(), MIN_DOUBLES.create(), SUM_INTS.create());
+    assertEquals(Tuple4.of(17.29f, 14.5, -0.98, 6), Iterables.getOnlyElement(applyAggregator(a, input)));
   }
 
   @Test
   public void testTupleN() {
-    List<TupleN> input = ImmutableList.of(new TupleN(1, 3.0, 1, 2.0, 4L),
-        new TupleN(4, 17.0, 1, 9.7, 12L));
-    Aggregator<TupleN> a = new TupleNAggregator(MIN_INTS.create(), SUM_DOUBLES.create(),
-        MAX_INTS.create(), MIN_DOUBLES.create(), MAX_LONGS.create());
-    assertEquals(new TupleN(1, 20.0, 1, 2.0, 12L),
-        Iterables.getOnlyElement(applyAggregator(a, input)));
+    List<TupleN> input = ImmutableList.of(new TupleN(1, 3.0, 1, 2.0, 4L), new TupleN(4, 17.0, 1, 9.7, 12L));
+    Aggregator<TupleN> a = new TupleNAggregator(MIN_INTS.create(), SUM_DOUBLES.create(), MAX_INTS.create(),
+        MIN_DOUBLES.create(), MAX_LONGS.create());
+    assertEquals(new TupleN(1, 20.0, 1, 2.0, 12L), Iterables.getOnlyElement(applyAggregator(a, input)));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/FilterFnTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/FilterFnTest.java b/crunch/src/test/java/org/apache/crunch/FilterFnTest.java
index a173bc5..9de086d 100644
--- a/crunch/src/test/java/org/apache/crunch/FilterFnTest.java
+++ b/crunch/src/test/java/org/apache/crunch/FilterFnTest.java
@@ -38,7 +38,7 @@ public class FilterFnTest {
       return false;
     }
   };
-  
+
   @Test
   public void testAnd() {
     assertTrue(FilterFn.and(TRUE).accept("foo"));
@@ -46,7 +46,7 @@ public class FilterFnTest {
     assertFalse(FilterFn.and(TRUE, FALSE).accept("foo"));
     assertFalse(FilterFn.and(FALSE, FALSE, FALSE).accept("foo"));
   }
-  
+
   @Test
   public void testOr() {
     assertFalse(FilterFn.or(FALSE).accept("foo"));

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/PairTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/PairTest.java b/crunch/src/test/java/org/apache/crunch/PairTest.java
index eff183b..106413c 100644
--- a/crunch/src/test/java/org/apache/crunch/PairTest.java
+++ b/crunch/src/test/java/org/apache/crunch/PairTest.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.fail;
 import org.junit.Test;
 
 public class PairTest {
-  
+
   @Test
   public void testPairConstructor() {
     Pair<String, Integer> pair = new Pair<String, Integer>("brock", 45);
@@ -39,11 +39,11 @@ public class PairTest {
 
   protected void test(Pair<String, Integer> pair) {
     assertTrue(pair.size() == 2);
-    
+
     assertEquals("brock", pair.first());
     assertEquals(new Integer(45), pair.second());
     assertEquals(Pair.of("brock", 45), pair);
-    
+
     assertEquals("brock", pair.get(0));
     assertEquals(new Integer(45), pair.get(1));
 
@@ -54,7 +54,7 @@ public class PairTest {
       // expected
     }
   }
-  
+
   @Test
   public void testPairComparisons() {
     assertEquals(0, Pair.of(null, null).compareTo(Pair.of(null, null)));

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/TupleTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/TupleTest.java b/crunch/src/test/java/org/apache/crunch/TupleTest.java
index f259acc..b07ec3f 100644
--- a/crunch/src/test/java/org/apache/crunch/TupleTest.java
+++ b/crunch/src/test/java/org/apache/crunch/TupleTest.java
@@ -22,9 +22,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.junit.Test;
-
 import org.apache.crunch.types.TupleFactory;
+import org.junit.Test;
 
 public class TupleTest {
   private String first = "foo";
@@ -32,7 +31,7 @@ public class TupleTest {
   private Double third = 64.2;
   private Boolean fourth = false;
   private Float fifth = 17.29f;
-  
+
   @Test
   public void testTuple3() {
     Tuple3<String, Integer, Double> t = new Tuple3<String, Integer, Double>(first, second, third);
@@ -50,7 +49,7 @@ public class TupleTest {
       // expected
     }
   }
-  
+
   @Test
   public void testTuple3Equality() {
     Tuple3<String, Integer, Double> t = new Tuple3<String, Integer, Double>(first, second, third);
@@ -59,11 +58,11 @@ public class TupleTest {
     assertFalse((new Tuple3(null, null, null)).equals(t));
     assertTrue((new Tuple3(first, null, null)).equals(new Tuple3(first, null, null)));
   }
-  
+
   @Test
   public void testTuple4() {
-    Tuple4<String, Integer, Double, Boolean> t = 
-      new Tuple4<String, Integer, Double, Boolean>(first, second, third, fourth);
+    Tuple4<String, Integer, Double, Boolean> t = new Tuple4<String, Integer, Double, Boolean>(first, second, third,
+        fourth);
     assertEquals(4, t.size());
     assertEquals(first, t.first());
     assertEquals(second, t.second());
@@ -83,13 +82,12 @@ public class TupleTest {
 
   @Test
   public void testTuple4Equality() {
-    Tuple4<String, Integer, Double, Boolean> t = 
-      new Tuple4<String, Integer, Double, Boolean>(first, second, third, fourth);
+    Tuple4<String, Integer, Double, Boolean> t = new Tuple4<String, Integer, Double, Boolean>(first, second, third,
+        fourth);
     assertFalse(t.equals(new Tuple3(first, second, third)));
     assertFalse(t.equals(new Tuple4(first, null, third, null)));
     assertFalse((new Tuple4(null, null, null, null)).equals(t));
-    assertTrue((new Tuple4(first, null, third, null)).equals(
-        new Tuple4(first, null, third, null)));
+    assertTrue((new Tuple4(first, null, third, null)).equals(new Tuple4(first, null, third, null)));
   }
 
   @Test
@@ -111,12 +109,11 @@ public class TupleTest {
 
   @Test
   public void testTupleNEquality() {
-	TupleN t = new TupleN(first, second, third, fourth, fifth);
-	assertTrue(t.equals(new TupleN(first, second, third, fourth, fifth)));
+    TupleN t = new TupleN(first, second, third, fourth, fifth);
+    assertTrue(t.equals(new TupleN(first, second, third, fourth, fifth)));
     assertFalse(t.equals(new TupleN(first, null, third, null)));
     assertFalse((new TupleN(null, null, null, null, null)).equals(t));
-    assertTrue((new TupleN(first, second, third, null, null)).equals(
-        new TupleN(first, second, third, null, null)));
+    assertTrue((new TupleN(first, second, third, null, null)).equals(new TupleN(first, second, third, null, null)));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/fn/ExtractKeyFnTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/fn/ExtractKeyFnTest.java b/crunch/src/test/java/org/apache/crunch/fn/ExtractKeyFnTest.java
index 205809e..b5b2a1b 100644
--- a/crunch/src/test/java/org/apache/crunch/fn/ExtractKeyFnTest.java
+++ b/crunch/src/test/java/org/apache/crunch/fn/ExtractKeyFnTest.java
@@ -19,28 +19,26 @@ package org.apache.crunch.fn;
 
 import static org.junit.Assert.assertEquals;
 
-import org.junit.Test;
-
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
+import org.junit.Test;
 
 @SuppressWarnings("serial")
 public class ExtractKeyFnTest {
 
-	protected static final MapFn<String, Integer> mapFn = new MapFn<String, Integer>() {
-		@Override
-		public Integer map(String input) {
-			return input.hashCode();
-		}
-	};
+  protected static final MapFn<String, Integer> mapFn = new MapFn<String, Integer>() {
+    @Override
+    public Integer map(String input) {
+      return input.hashCode();
+    }
+  };
 
-	protected static final ExtractKeyFn<Integer, String> one = new ExtractKeyFn<Integer, String>(
-			mapFn);
+  protected static final ExtractKeyFn<Integer, String> one = new ExtractKeyFn<Integer, String>(mapFn);
 
-	@Test
-	public void test() {
-		StoreLastEmitter<Pair<Integer, String>> emitter = StoreLastEmitter.create();
-		one.process("boza", emitter);
-		assertEquals(Pair.of("boza".hashCode(), "boza"), emitter.getLast());
-	}
+  @Test
+  public void test() {
+    StoreLastEmitter<Pair<Integer, String>> emitter = StoreLastEmitter.create();
+    one.process("boza", emitter);
+    assertEquals(Pair.of("boza".hashCode(), "boza"), emitter.getLast());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/fn/MapKeysTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/fn/MapKeysTest.java b/crunch/src/test/java/org/apache/crunch/fn/MapKeysTest.java
index 323f276..6b73700 100644
--- a/crunch/src/test/java/org/apache/crunch/fn/MapKeysTest.java
+++ b/crunch/src/test/java/org/apache/crunch/fn/MapKeysTest.java
@@ -19,28 +19,26 @@ package org.apache.crunch.fn;
 
 import static org.junit.Assert.assertEquals;
 
-import org.junit.Test;
-
 import org.apache.crunch.Pair;
-
+import org.junit.Test;
 
 @SuppressWarnings("serial")
 public class MapKeysTest {
-  
+
   protected static final MapKeysFn<String, Integer, Integer> one = new MapKeysFn<String, Integer, Integer>() {
     @Override
     public Integer map(String input) {
       return 1;
     }
   };
-  
+
   protected static final MapKeysFn<String, Integer, Integer> two = new MapKeysFn<String, Integer, Integer>() {
     @Override
     public Integer map(String input) {
       return 2;
     }
   };
-  
+
   @Test
   public void test() {
     StoreLastEmitter<Pair<Integer, Integer>> emitter = StoreLastEmitter.create();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/fn/MapValuesTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/fn/MapValuesTest.java b/crunch/src/test/java/org/apache/crunch/fn/MapValuesTest.java
index cf971a1..097b008 100644
--- a/crunch/src/test/java/org/apache/crunch/fn/MapValuesTest.java
+++ b/crunch/src/test/java/org/apache/crunch/fn/MapValuesTest.java
@@ -19,27 +19,26 @@ package org.apache.crunch.fn;
 
 import static org.junit.Assert.assertEquals;
 
-import org.junit.Test;
-
 import org.apache.crunch.Pair;
+import org.junit.Test;
 
 @SuppressWarnings("serial")
 public class MapValuesTest {
-  
+
   static final MapValuesFn<String, String, Integer> one = new MapValuesFn<String, String, Integer>() {
     @Override
     public Integer map(String input) {
       return 1;
     }
   };
-  
+
   static final MapValuesFn<String, String, Integer> two = new MapValuesFn<String, String, Integer>() {
     @Override
     public Integer map(String input) {
       return 2;
     }
   };
-  
+
   @Test
   public void test() {
     StoreLastEmitter<Pair<String, Integer>> emitter = StoreLastEmitter.create();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/fn/PairMapTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/fn/PairMapTest.java b/crunch/src/test/java/org/apache/crunch/fn/PairMapTest.java
index 28e2459..bef6c85 100644
--- a/crunch/src/test/java/org/apache/crunch/fn/PairMapTest.java
+++ b/crunch/src/test/java/org/apache/crunch/fn/PairMapTest.java
@@ -19,28 +19,27 @@ package org.apache.crunch.fn;
 
 import static org.junit.Assert.assertTrue;
 
-import org.junit.Test;
-
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
+import org.junit.Test;
 
 @SuppressWarnings("serial")
 public class PairMapTest {
-  
+
   static final MapFn<String, Integer> one = new MapFn<String, Integer>() {
     @Override
     public Integer map(String input) {
       return 1;
     }
   };
-  
+
   static final MapFn<String, Integer> two = new MapFn<String, Integer>() {
     @Override
     public Integer map(String input) {
       return 2;
     }
   };
-  
+
   @Test
   public void testPairMap() {
     StoreLastEmitter<Pair<Integer, Integer>> emitter = StoreLastEmitter.create();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/fn/StoreLastEmitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/fn/StoreLastEmitter.java b/crunch/src/test/java/org/apache/crunch/fn/StoreLastEmitter.java
index 2632957..cdd8754 100644
--- a/crunch/src/test/java/org/apache/crunch/fn/StoreLastEmitter.java
+++ b/crunch/src/test/java/org/apache/crunch/fn/StoreLastEmitter.java
@@ -26,15 +26,15 @@ class StoreLastEmitter<T> implements Emitter<T> {
   public void emit(T emitted) {
     last = emitted;
   }
-  
+
   public T getLast() {
     return last;
   }
-  
+
   @Override
   public void flush() {
   }
-  
+
   public static <T> StoreLastEmitter<T> create() {
     return new StoreLastEmitter<T>();
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java
index 9d10e97..f010755 100644
--- a/crunch/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java
+++ b/crunch/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java
@@ -25,13 +25,12 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 
-import org.junit.Before;
-import org.junit.Test;
-
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.impl.mr.collect.PCollectionImpl;
 import org.apache.crunch.io.ReadableSourceTarget;
 import org.apache.crunch.types.avro.Avros;
+import org.junit.Before;
+import org.junit.Test;
 
 public class MRPipelineTest {
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
index 96fbd7e..fd582bc 100644
--- a/crunch/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
+++ b/crunch/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
@@ -21,50 +21,44 @@ import static org.junit.Assert.assertEquals;
 
 import java.util.List;
 
-import org.junit.Test;
-
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
 
 public class DoCollectionImplTest {
-  
-  
 
   @Test
   public void testGetSizeInternal_NoScaleFactor() {
     runScaleTest(100L, 1.0f, 100L);
   }
-  
+
   @Test
   public void testGetSizeInternal_ScaleFactorBelowZero() {
     runScaleTest(100L, 0.5f, 50L);
   }
-  
+
   @Test
   public void testGetSizeInternal_ScaleFactorAboveZero() {
     runScaleTest(100L, 1.5f, 150L);
   }
-  
-  private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize){
-    PCollectionImpl<String> parentCollection = new SizedPCollectionImpl(
-        "Sized collection", inputSize);
-    
-    DoCollectionImpl<String> doCollectionImpl = new DoCollectionImpl<String>(
-        "Scaled collection", parentCollection, new ScaledFunction(scaleFactor),
-        Writables.strings());
-
-    assertEquals(expectedScaledSize, doCollectionImpl.getSizeInternal()); 
+
+  private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize) {
+    PCollectionImpl<String> parentCollection = new SizedPCollectionImpl("Sized collection", inputSize);
+
+    DoCollectionImpl<String> doCollectionImpl = new DoCollectionImpl<String>("Scaled collection", parentCollection,
+        new ScaledFunction(scaleFactor), Writables.strings());
+
+    assertEquals(expectedScaledSize, doCollectionImpl.getSizeInternal());
   }
-  
 
-  static class ScaledFunction extends DoFn<String, String>{
-    
+  static class ScaledFunction extends DoFn<String, String> {
+
     private float scaleFactor;
 
-    public ScaledFunction(float scaleFactor){
+    public ScaledFunction(float scaleFactor) {
       this.scaleFactor = scaleFactor;
     }
 
@@ -72,12 +66,12 @@ public class DoCollectionImplTest {
     public void process(String input, Emitter<String> emitter) {
       emitter.emit(input);
     }
-    
+
     @Override
     public float scaleFactor() {
       return scaleFactor;
     }
-    
+
   }
 
   static class SizedPCollectionImpl extends PCollectionImpl<String> {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java
index d5a9722..89b9944 100644
--- a/crunch/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java
+++ b/crunch/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java
@@ -25,64 +25,62 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import org.junit.Test;
-
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.Pair;
+import org.junit.Test;
 
 public class DoTableImplTest {
 
-	@Test
-	public void testGetSizeInternal_NoScaleFactor() {
-		runScaleTest(100L, 1.0f, 100L);
-	}
-
-	@Test
-	public void testGetSizeInternal_ScaleFactorBelowZero() {
-		runScaleTest(100L, 0.5f, 50L);
-	}
-
-	@Test
-	public void testGetSizeInternal_ScaleFactorAboveZero() {
-		runScaleTest(100L, 1.5f, 150L);
-	}
-
-	private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize) {
-		
-		@SuppressWarnings("unchecked")
-		PCollectionImpl<String> parentCollection = (PCollectionImpl<String>) mock(PCollectionImpl.class);
-		
-		when(parentCollection.getSize()).thenReturn(inputSize);
-
-		DoTableImpl<String, String> doTableImpl = new DoTableImpl<String, String>("Scalled table collection",
-				parentCollection, new TableScaledFunction(scaleFactor), tableOf(strings(),
-						strings()));
-
-		assertEquals(expectedScaledSize, doTableImpl.getSizeInternal());
-		
-		verify(parentCollection).getSize();
-		
-		verifyNoMoreInteractions(parentCollection);
-	}
-
-	static class TableScaledFunction extends DoFn<String, Pair<String, String>> {
-
-		private float scaleFactor;
-
-		public TableScaledFunction(float scaleFactor) {
-			this.scaleFactor = scaleFactor;
-		}
-
-		@Override
-		public float scaleFactor() {
-			return scaleFactor;
-		}
-
-		@Override
-		public void process(String input, Emitter<Pair<String, String>> emitter) {
-			emitter.emit(Pair.of(input, input));
-
-		}
-	}
+  @Test
+  public void testGetSizeInternal_NoScaleFactor() {
+    runScaleTest(100L, 1.0f, 100L);
+  }
+
+  @Test
+  public void testGetSizeInternal_ScaleFactorBelowZero() {
+    runScaleTest(100L, 0.5f, 50L);
+  }
+
+  @Test
+  public void testGetSizeInternal_ScaleFactorAboveZero() {
+    runScaleTest(100L, 1.5f, 150L);
+  }
+
+  private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize) {
+
+    @SuppressWarnings("unchecked")
+    PCollectionImpl<String> parentCollection = (PCollectionImpl<String>) mock(PCollectionImpl.class);
+
+    when(parentCollection.getSize()).thenReturn(inputSize);
+
+    DoTableImpl<String, String> doTableImpl = new DoTableImpl<String, String>("Scalled table collection",
+        parentCollection, new TableScaledFunction(scaleFactor), tableOf(strings(), strings()));
+
+    assertEquals(expectedScaledSize, doTableImpl.getSizeInternal());
+
+    verify(parentCollection).getSize();
+
+    verifyNoMoreInteractions(parentCollection);
+  }
+
+  static class TableScaledFunction extends DoFn<String, Pair<String, String>> {
+
+    private float scaleFactor;
+
+    public TableScaledFunction(float scaleFactor) {
+      this.scaleFactor = scaleFactor;
+    }
+
+    @Override
+    public float scaleFactor() {
+      return scaleFactor;
+    }
+
+    @Override
+    public void process(String input, Emitter<Pair<String, String>> emitter) {
+      emitter.emit(Pair.of(input, input));
+
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
index 0e0b9d8..7963c83 100644
--- a/crunch/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
+++ b/crunch/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
@@ -19,9 +19,9 @@ package org.apache.crunch.impl.mr.plan;
 
 import static org.junit.Assert.assertEquals;
 
+import org.apache.crunch.types.writable.Writables;
 import org.junit.Test;
 
-import org.apache.crunch.types.writable.Writables;
 import com.google.common.collect.Lists;
 
 public class JobNameBuilderTest {
@@ -34,7 +34,7 @@ public class JobNameBuilderTest {
     JobNameBuilder jobNameBuilder = new JobNameBuilder(pipelineName);
     jobNameBuilder.visit(Lists.newArrayList(doNode));
     String jobName = jobNameBuilder.build();
-    
+
     assertEquals(String.format("%s: %s", pipelineName, nodeName), jobName);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/io/SourceTargetHelperTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/io/SourceTargetHelperTest.java b/crunch/src/test/java/org/apache/crunch/io/SourceTargetHelperTest.java
index a1dbb5f..5b0ea55 100644
--- a/crunch/src/test/java/org/apache/crunch/io/SourceTargetHelperTest.java
+++ b/crunch/src/test/java/org/apache/crunch/io/SourceTargetHelperTest.java
@@ -31,29 +31,29 @@ import org.junit.Test;
 
 public class SourceTargetHelperTest {
 
-	@Test
-	public void testGetNonexistentPathSize() throws Exception {
-		File tmp = File.createTempFile("pathsize", "");
-		Path tmpPath = new Path(tmp.getAbsolutePath());
-		tmp.delete();
-		FileSystem fs = FileSystem.getLocal(new Configuration());
-		assertEquals(-1L, SourceTargetHelper.getPathSize(fs, tmpPath));
-	}
-
-	@Test
-	public void testGetNonExistentPathSize_NonExistantPath() throws IOException {
-		FileSystem mockFs = new MockFileSystem();
-		assertEquals(-1L, SourceTargetHelper.getPathSize(mockFs, new Path("does/not/exist")));
-	}
-
-	/**
-	 * Mock FileSystem that returns null for {@link FileSystem#listStatus(Path)}.
-	 */
-	static class MockFileSystem extends LocalFileSystem {
-
-		@Override
-		public FileStatus[] listStatus(Path f) throws IOException {
-			return null;
-		}
-	}
+  @Test
+  public void testGetNonexistentPathSize() throws Exception {
+    File tmp = File.createTempFile("pathsize", "");
+    Path tmpPath = new Path(tmp.getAbsolutePath());
+    tmp.delete();
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    assertEquals(-1L, SourceTargetHelper.getPathSize(fs, tmpPath));
+  }
+
+  @Test
+  public void testGetNonExistentPathSize_NonExistantPath() throws IOException {
+    FileSystem mockFs = new MockFileSystem();
+    assertEquals(-1L, SourceTargetHelper.getPathSize(mockFs, new Path("does/not/exist")));
+  }
+
+  /**
+   * Mock FileSystem that returns null for {@link FileSystem#listStatus(Path)}.
+   */
+  static class MockFileSystem extends LocalFileSystem {
+
+    @Override
+    public FileStatus[] listStatus(Path f) throws IOException {
+      return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
index 885ed61..28ff5ba 100644
--- a/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
+++ b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
@@ -32,6 +32,8 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.ReflectData;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.types.avro.Avros;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -39,120 +41,112 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.crunch.test.Person;
-import org.apache.crunch.types.avro.Avros;
 import com.google.common.collect.Lists;
 
 public class AvroFileReaderFactoryTest {
 
-	private File avroFile;
-
-	@Before
-	public void setUp() throws IOException {
-		// InputSupplier<InputStream> inputStreamSupplier =
-		// newInputStreamSupplier(getResource("person.avro"));
-		avroFile = File.createTempFile("test", ".av");
-	}
-
-	@After
-	public void tearDown() {
-		avroFile.delete();
-	}
-
-	private void populateGenericFile(List<GenericRecord> genericRecords,
-			Schema outputSchema) throws IOException {
-		FileOutputStream outputStream = new FileOutputStream(this.avroFile);
-		GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(
-				outputSchema);
-
-		DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(
-				genericDatumWriter);
-		dataFileWriter.create(outputSchema, outputStream);
-
-		for (GenericRecord record : genericRecords) {
-			dataFileWriter.append(record);
-		}
-
-		dataFileWriter.close();
-		outputStream.close();
-
-	}
-
-	@Test
-	public void testRead_GenericReader() throws IOException {
-		GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
-		savedRecord.put("name", "John Doe");
-		savedRecord.put("age", 42);
-		savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-		populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
-
-		AvroFileReaderFactory<GenericData.Record> genericReader = new AvroFileReaderFactory<GenericData.Record>(
-				Avros.generics(Person.SCHEMA$), new Configuration());
-		Iterator<GenericData.Record> recordIterator = genericReader.read(
-				FileSystem.getLocal(new Configuration()), new Path(
-						this.avroFile.getAbsolutePath()));
-
-		GenericRecord genericRecord = recordIterator.next();
-		assertEquals(savedRecord, genericRecord);
-		assertFalse(recordIterator.hasNext());
-	}
-
-	@Test
-	public void testRead_SpecificReader() throws IOException {
-		GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
-		savedRecord.put("name", "John Doe");
-		savedRecord.put("age", 42);
-		savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-		populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
-
-		AvroFileReaderFactory<Person> genericReader = new AvroFileReaderFactory<Person>(
-				Avros.records(Person.class), new Configuration());
-		Iterator<Person> recordIterator = genericReader.read(FileSystem
-				.getLocal(new Configuration()),
-				new Path(this.avroFile.getAbsolutePath()));
-
-		Person expectedPerson = new Person();
-		expectedPerson.setAge(42);
-		expectedPerson.setName("John Doe");
-		List<CharSequence> siblingNames = Lists.newArrayList();
-		siblingNames.add("Jimmy");
-		siblingNames.add("Jane");
-		expectedPerson.setSiblingnames(siblingNames);
-
-		Person person = recordIterator.next();
-
-		assertEquals(expectedPerson, person);
-		assertFalse(recordIterator.hasNext());
-	}
-
-	@Test
-	public void testRead_ReflectReader() throws IOException {
-		Schema reflectSchema = ReflectData.get().getSchema(PojoPerson.class);
-		GenericRecord savedRecord = new GenericData.Record(reflectSchema);
-		savedRecord.put("name", "John Doe");
-		populateGenericFile(Lists.newArrayList(savedRecord), reflectSchema);
-
-		AvroFileReaderFactory<PojoPerson> genericReader = new AvroFileReaderFactory<PojoPerson>(
-				Avros.reflects(PojoPerson.class), new Configuration());
-		Iterator<PojoPerson> recordIterator = genericReader.read(FileSystem
-				.getLocal(new Configuration()),
-				new Path(this.avroFile.getAbsolutePath()));
-
-		PojoPerson person = recordIterator.next();
-
-		assertEquals("John Doe", person.getName());
-		assertFalse(recordIterator.hasNext());
-	}
-
-	public static class PojoPerson {
-		private String name;
-
-		public String getName() {
-			return name;
-		}
-
-		public void setName(String name) {
-			this.name = name;
-		}
-	}
+  private File avroFile;
+
+  @Before
+  public void setUp() throws IOException {
+    // InputSupplier<InputStream> inputStreamSupplier =
+    // newInputStreamSupplier(getResource("person.avro"));
+    avroFile = File.createTempFile("test", ".av");
+  }
+
+  @After
+  public void tearDown() {
+    avroFile.delete();
+  }
+
+  private void populateGenericFile(List<GenericRecord> genericRecords, Schema outputSchema) throws IOException {
+    FileOutputStream outputStream = new FileOutputStream(this.avroFile);
+    GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(outputSchema);
+
+    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter);
+    dataFileWriter.create(outputSchema, outputStream);
+
+    for (GenericRecord record : genericRecords) {
+      dataFileWriter.append(record);
+    }
+
+    dataFileWriter.close();
+    outputStream.close();
+
+  }
+
+  @Test
+  public void testRead_GenericReader() throws IOException {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    AvroFileReaderFactory<GenericData.Record> genericReader = new AvroFileReaderFactory<GenericData.Record>(
+        Avros.generics(Person.SCHEMA$), new Configuration());
+    Iterator<GenericData.Record> recordIterator = genericReader.read(FileSystem.getLocal(new Configuration()),
+        new Path(this.avroFile.getAbsolutePath()));
+
+    GenericRecord genericRecord = recordIterator.next();
+    assertEquals(savedRecord, genericRecord);
+    assertFalse(recordIterator.hasNext());
+  }
+
+  @Test
+  public void testRead_SpecificReader() throws IOException {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    AvroFileReaderFactory<Person> genericReader = new AvroFileReaderFactory<Person>(Avros.records(Person.class),
+        new Configuration());
+    Iterator<Person> recordIterator = genericReader.read(FileSystem.getLocal(new Configuration()), new Path(
+        this.avroFile.getAbsolutePath()));
+
+    Person expectedPerson = new Person();
+    expectedPerson.setAge(42);
+    expectedPerson.setName("John Doe");
+    List<CharSequence> siblingNames = Lists.newArrayList();
+    siblingNames.add("Jimmy");
+    siblingNames.add("Jane");
+    expectedPerson.setSiblingnames(siblingNames);
+
+    Person person = recordIterator.next();
+
+    assertEquals(expectedPerson, person);
+    assertFalse(recordIterator.hasNext());
+  }
+
+  @Test
+  public void testRead_ReflectReader() throws IOException {
+    Schema reflectSchema = ReflectData.get().getSchema(PojoPerson.class);
+    GenericRecord savedRecord = new GenericData.Record(reflectSchema);
+    savedRecord.put("name", "John Doe");
+    populateGenericFile(Lists.newArrayList(savedRecord), reflectSchema);
+
+    AvroFileReaderFactory<PojoPerson> genericReader = new AvroFileReaderFactory<PojoPerson>(
+        Avros.reflects(PojoPerson.class), new Configuration());
+    Iterator<PojoPerson> recordIterator = genericReader.read(FileSystem.getLocal(new Configuration()), new Path(
+        this.avroFile.getAbsolutePath()));
+
+    PojoPerson person = recordIterator.next();
+
+    assertEquals("John Doe", person.getName());
+    assertFalse(recordIterator.hasNext());
+  }
+
+  public static class PojoPerson {
+    private String name;
+
+    public String getName() {
+      return name;
+    }
+
+    public void setName(String name) {
+      this.name = name;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
index 9529d45..ceef2b2 100644
--- a/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
+++ b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
@@ -55,8 +55,8 @@ public class AvroFileSourceTest {
   @Test
   public void testConfigureJob_SpecificData() throws IOException {
     AvroType<Person> avroSpecificType = Avros.records(Person.class);
-    AvroFileSource<Person> personFileSource = new AvroFileSource<Person>(new Path(
-        tempFile.getAbsolutePath()), avroSpecificType);
+    AvroFileSource<Person> personFileSource = new AvroFileSource<Person>(new Path(tempFile.getAbsolutePath()),
+        avroSpecificType);
 
     personFileSource.configureSource(job, -1);
 
@@ -67,8 +67,8 @@ public class AvroFileSourceTest {
   @Test
   public void testConfigureJob_GenericData() throws IOException {
     AvroType<Record> avroGenericType = Avros.generics(Person.SCHEMA$);
-    AvroFileSource<Record> personFileSource = new AvroFileSource<Record>(new Path(
-        tempFile.getAbsolutePath()), avroGenericType);
+    AvroFileSource<Record> personFileSource = new AvroFileSource<Record>(new Path(tempFile.getAbsolutePath()),
+        avroGenericType);
 
     personFileSource.configureSource(job, -1);
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java b/crunch/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java
index eb50662..0dfed32 100644
--- a/crunch/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java
+++ b/crunch/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java
@@ -23,77 +23,76 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
+import org.apache.crunch.lib.join.JoinUtils.AvroIndexedRecordPartitioner;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.crunch.lib.join.JoinUtils.AvroIndexedRecordPartitioner;
-
 public class AvroIndexedRecordPartitionerTest {
 
-	private AvroIndexedRecordPartitioner avroPartitioner;
-	
-	@Before
-	public void setUp(){
-		avroPartitioner = new AvroIndexedRecordPartitioner();
-	}
-	
-	@Test
-	public void testGetPartition() {
-		IndexedRecord indexedRecord = new MockIndexedRecord(3);
-		AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
-		
-		assertEquals(3, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 5));
-		assertEquals(1, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 2));
-	}
-	
-	@Test
-	public void testGetPartition_NegativeHashValue(){
-		IndexedRecord indexedRecord = new MockIndexedRecord(-3);
-		AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
-		
-		assertEquals(3, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 5));
-		assertEquals(1, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 2));
-	}
-	
-	@Test
-	public void testGetPartition_IntegerMinValue(){
-		IndexedRecord indexedRecord = new MockIndexedRecord(Integer.MIN_VALUE);
-		AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
-		
-		assertEquals(0, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), Integer.MAX_VALUE));
-	}
-	
-	/**
-	 * Mock implementation of IndexedRecord to give us control over the hashCode.
-	 */
-	static class MockIndexedRecord implements IndexedRecord {
-		
-		private Integer value;
-		
-		public MockIndexedRecord(Integer value){
-			this.value = value;
-		}
-		
-		@Override
-		public int hashCode() {
-			return value.hashCode();
-		}
-
-		@Override
-		public Schema getSchema() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public Object get(int arg0) {
-			return this.value;
-		}
-
-		@Override
-		public void put(int arg0, Object arg1) {
-			throw new UnsupportedOperationException();
-		}
-		
-	}
+  private AvroIndexedRecordPartitioner avroPartitioner;
+
+  @Before
+  public void setUp() {
+    avroPartitioner = new AvroIndexedRecordPartitioner();
+  }
+
+  @Test
+  public void testGetPartition() {
+    IndexedRecord indexedRecord = new MockIndexedRecord(3);
+    AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
+
+    assertEquals(3, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 5));
+    assertEquals(1, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 2));
+  }
+
+  @Test
+  public void testGetPartition_NegativeHashValue() {
+    IndexedRecord indexedRecord = new MockIndexedRecord(-3);
+    AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
+
+    assertEquals(3, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 5));
+    assertEquals(1, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 2));
+  }
+
+  @Test
+  public void testGetPartition_IntegerMinValue() {
+    IndexedRecord indexedRecord = new MockIndexedRecord(Integer.MIN_VALUE);
+    AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
+
+    assertEquals(0, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), Integer.MAX_VALUE));
+  }
+
+  /**
+   * Mock implementation of IndexedRecord to give us control over the hashCode.
+   */
+  static class MockIndexedRecord implements IndexedRecord {
+
+    private Integer value;
+
+    public MockIndexedRecord(Integer value) {
+      this.value = value;
+    }
+
+    @Override
+    public int hashCode() {
+      return value.hashCode();
+    }
+
+    @Override
+    public Schema getSchema() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Object get(int arg0) {
+      return this.value;
+    }
+
+    @Override
+    public void put(int arg0, Object arg1) {
+      throw new UnsupportedOperationException();
+    }
+
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java b/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java
index 442a252..eba7429 100644
--- a/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java
+++ b/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java
@@ -23,20 +23,20 @@ import static org.junit.Assert.assertTrue;
 import java.util.HashSet;
 import java.util.Iterator;
 
-import org.junit.Test;
-
 import org.apache.crunch.PCollection;
 import org.apache.crunch.Pair;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
 import com.google.common.collect.ImmutableList;
 
 public class CartesianTest {
 
   @Test
   public void testCartesianCollection() {
-    ImmutableList<ImmutableList<Integer>> testCases = ImmutableList.of(
-        ImmutableList.of(1, 2, 3, 4, 5), ImmutableList.<Integer>of(1, 2, 3), ImmutableList.<Integer>of());
+    ImmutableList<ImmutableList<Integer>> testCases = ImmutableList.of(ImmutableList.of(1, 2, 3, 4, 5),
+        ImmutableList.<Integer> of(1, 2, 3), ImmutableList.<Integer> of());
 
     for (int t1 = 0; t1 < testCases.size(); t1++) {
       ImmutableList<Integer> testCase1 = testCases.get(t1);
@@ -46,9 +46,9 @@ public class CartesianTest {
         PCollection<Integer> X = MemPipeline.typedCollectionOf(Writables.ints(), testCase1);
         PCollection<Integer> Y = MemPipeline.typedCollectionOf(Writables.ints(), testCase2);
 
-        PCollection<Pair<Integer,Integer>> cross = Cartesian.cross(X, Y);
+        PCollection<Pair<Integer, Integer>> cross = Cartesian.cross(X, Y);
         HashSet<Pair<Integer, Integer>> crossSet = new HashSet<Pair<Integer, Integer>>();
-        for (Iterator<Pair<Integer, Integer>> i = cross.materialize().iterator(); i.hasNext(); ) {
+        for (Iterator<Pair<Integer, Integer>> i = cross.materialize().iterator(); i.hasNext();) {
           crossSet.add(i.next());
         }
         assertEquals(crossSet.size(), testCase1.size() * testCase2.size());
@@ -61,5 +61,5 @@ public class CartesianTest {
       }
     }
   }
-	
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/lib/SampleTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/SampleTest.java b/crunch/src/test/java/org/apache/crunch/lib/SampleTest.java
index 7e75d44..0f75fb6 100644
--- a/crunch/src/test/java/org/apache/crunch/lib/SampleTest.java
+++ b/crunch/src/test/java/org/apache/crunch/lib/SampleTest.java
@@ -21,17 +21,17 @@ import static org.junit.Assert.assertEquals;
 
 import java.util.List;
 
+import org.apache.crunch.impl.mem.MemPipeline;
 import org.junit.Test;
 
-import org.apache.crunch.impl.mem.MemPipeline;
 import com.google.common.collect.ImmutableList;
 
 public class SampleTest {
   @Test
   public void testSampler() {
-	Iterable<Integer> sample = MemPipeline.collectionOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
-	    .sample(0.2, 123998).materialize();
-	List<Integer> sampleValues = ImmutableList.copyOf(sample);
-	assertEquals(ImmutableList.of(6, 7), sampleValues);
+    Iterable<Integer> sample = MemPipeline.collectionOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).sample(0.2, 123998)
+        .materialize();
+    List<Integer> sampleValues = ImmutableList.copyOf(sample);
+    assertEquals(ImmutableList.of(6, 7), sampleValues);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java b/crunch/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
index 51aa691..35ccc11 100644
--- a/crunch/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
+++ b/crunch/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
@@ -19,52 +19,50 @@ package org.apache.crunch.lib;
 
 import static org.junit.Assert.assertEquals;
 
+import org.apache.crunch.lib.join.JoinUtils.TupleWritablePartitioner;
+import org.apache.crunch.types.writable.TupleWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.crunch.lib.join.JoinUtils.TupleWritablePartitioner;
-import org.apache.crunch.types.writable.TupleWritable;
-
 public class TupleWritablePartitionerTest {
 
-	private TupleWritablePartitioner tupleWritableParitioner;
-	
-	@Before
-	public void setUp(){
-		tupleWritableParitioner = new TupleWritablePartitioner();
-	}
-	
-	@Test
-	public void testGetPartition() {
-		IntWritable intWritable = new IntWritable(3);
-		TupleWritable key = new TupleWritable(new Writable[]{intWritable});
-		assertEquals(3, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
-		assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
-	}
-	
-	@Test
-	public void testGetPartition_NegativeHashValue(){
-		IntWritable intWritable = new IntWritable(-3);
-		// Sanity check, if this doesn't work then the premise of this test is wrong
-		assertEquals(-3, intWritable.hashCode());
-		
-		TupleWritable key = new TupleWritable(new Writable[]{intWritable});
-		assertEquals(3, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
-		assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
-	}
-	
-	@Test
-	public void testGetPartition_IntegerMinValue(){
-		IntWritable intWritable = new IntWritable(Integer.MIN_VALUE);
-		// Sanity check, if this doesn't work then the premise of this test is wrong
-		assertEquals(Integer.MIN_VALUE, intWritable.hashCode());
-		
-		
-		TupleWritable key = new TupleWritable(new Writable[]{intWritable});
-		assertEquals(0, tupleWritableParitioner.getPartition(key, NullWritable.get(), Integer.MAX_VALUE));
-	}
+  private TupleWritablePartitioner tupleWritableParitioner;
+
+  @Before
+  public void setUp() {
+    tupleWritableParitioner = new TupleWritablePartitioner();
+  }
+
+  @Test
+  public void testGetPartition() {
+    IntWritable intWritable = new IntWritable(3);
+    TupleWritable key = new TupleWritable(new Writable[] { intWritable });
+    assertEquals(3, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
+    assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
+  }
+
+  @Test
+  public void testGetPartition_NegativeHashValue() {
+    IntWritable intWritable = new IntWritable(-3);
+    // Sanity check, if this doesn't work then the premise of this test is wrong
+    assertEquals(-3, intWritable.hashCode());
+
+    TupleWritable key = new TupleWritable(new Writable[] { intWritable });
+    assertEquals(3, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
+    assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
+  }
+
+  @Test
+  public void testGetPartition_IntegerMinValue() {
+    IntWritable intWritable = new IntWritable(Integer.MIN_VALUE);
+    // Sanity check, if this doesn't work then the premise of this test is wrong
+    assertEquals(Integer.MIN_VALUE, intWritable.hashCode());
+
+    TupleWritable key = new TupleWritable(new Writable[] { intWritable });
+    assertEquals(0, tupleWritableParitioner.getPartition(key, NullWritable.get(), Integer.MAX_VALUE));
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/test/java/org/apache/crunch/test/CountersTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/test/CountersTest.java b/crunch/src/test/java/org/apache/crunch/test/CountersTest.java
index 32a15f2..3df7657 100644
--- a/crunch/src/test/java/org/apache/crunch/test/CountersTest.java
+++ b/crunch/src/test/java/org/apache/crunch/test/CountersTest.java
@@ -19,34 +19,38 @@ package org.apache.crunch.test;
 
 import static org.junit.Assert.assertEquals;
 
-import org.junit.After;
-import org.junit.Test;
-
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
+import org.junit.After;
+import org.junit.Test;
 
 /**
  * A test to verify using counters inside of a unit test works. :)
  */
 public class CountersTest {
 
-  public enum CT { ONE, TWO, THREE };
-  
+  public enum CT {
+    ONE,
+    TWO,
+    THREE
+  };
+
   @After
   public void after() {
-	  TestCounters.clearCounters();
+    TestCounters.clearCounters();
   }
-  
+
   public static class CTFn extends DoFn<String, String> {
     @Override
     public void process(String input, Emitter<String> emitter) {
       getCounter(CT.ONE).increment(1);
       getCounter(CT.TWO).increment(4);
       getCounter(CT.THREE).increment(7);
-    }    
+    }
   }
-  
-  @Test public void test() {
+
+  @Test
+  public void test() {
     CTFn fn = new CTFn();
     fn.process("foo", null);
     fn.process("bar", null);
@@ -54,8 +58,9 @@ public class CountersTest {
     assertEquals(8L, TestCounters.getCounter(CT.TWO).getValue());
     assertEquals(14L, TestCounters.getCounter(CT.THREE).getValue());
   }
-  
-  @Test public void secondTest() {
+
+  @Test
+  public void secondTest() {
     CTFn fn = new CTFn();
     fn.process("foo", null);
     fn.process("bar", null);


Mime
View raw message