incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject [1/10] git commit: Format all sources according to formatting profile
Date Sat, 14 Jul 2012 18:14:55 GMT
Updated Branches:
  refs/heads/master 36a5ae37a -> 076837116


Format all sources according to formatting profile

Also update formatting profile to have line length of 120 instead
of 100.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/07683711
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/07683711
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/07683711

Branch: refs/heads/master
Commit: 076837116508d48d291dcbe37e5c28ee0ad1513c
Parents: 36a5ae3
Author: Gabriel Reid <gabriel.reid@gmail.com>
Authored: Sat Jul 14 20:11:13 2012 +0200
Committer: Gabriel Reid <gabriel.reid@gmail.com>
Committed: Sat Jul 14 20:11:13 2012 +0200

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/CollectionsIT.java   |   39 +-
 crunch/src/it/java/org/apache/crunch/MapsIT.java   |   88 ++--
 .../it/java/org/apache/crunch/MaterializeIT.java   |  139 +++---
 .../java/org/apache/crunch/MaterializeToMapIT.java |   42 +-
 .../java/org/apache/crunch/MultipleOutputIT.java   |   89 ++--
 .../org/apache/crunch/PCollectionGetSizeIT.java    |  169 ++++----
 .../java/org/apache/crunch/PTableKeyValueIT.java   |  124 +++---
 .../src/it/java/org/apache/crunch/PageRankIT.java  |  178 ++++----
 .../it/java/org/apache/crunch/TermFrequencyIT.java |   77 ++--
 .../src/it/java/org/apache/crunch/TextPairIT.java  |   21 +-
 crunch/src/it/java/org/apache/crunch/TfIdfIT.java  |  165 ++++----
 .../org/apache/crunch/TupleNClassCastBugIT.java    |   10 +-
 .../java/org/apache/crunch/WordCountHBaseIT.java   |   67 ++--
 .../src/it/java/org/apache/crunch/WordCountIT.java |   14 +-
 .../crunch/impl/mem/MemPipelineFileWritingIT.java  |    4 +-
 .../crunch/impl/mr/collect/UnionCollectionIT.java  |  165 ++++----
 .../apache/crunch/io/CompositePathIterableIT.java  |   47 +-
 .../crunch/io/avro/AvroFileSourceTargetIT.java     |  200 ++++-----
 .../org/apache/crunch/io/avro/AvroReflectIT.java   |  153 +++---
 .../it/java/org/apache/crunch/lib/AggregateIT.java |   65 ++--
 .../java/org/apache/crunch/lib/AvroTypeSortIT.java |  148 +++---
 .../it/java/org/apache/crunch/lib/CogroupIT.java   |   22 +-
 .../src/it/java/org/apache/crunch/lib/SetIT.java   |   46 +-
 .../src/it/java/org/apache/crunch/lib/SortIT.java  |   98 ++---
 .../apache/crunch/lib/SpecificAvroGroupByIT.java   |  122 +++---
 .../org/apache/crunch/lib/join/JoinTester.java     |   33 +-
 .../org/apache/crunch/lib/join/MapsideJoinIT.java  |   21 +-
 .../crunch/lib/join/MultiAvroSchemaJoinIT.java     |   44 +-
 .../src/main/java/org/apache/crunch/CombineFn.java |  365 ++++++++-------
 crunch/src/main/java/org/apache/crunch/DoFn.java   |   84 ++--
 .../src/main/java/org/apache/crunch/Emitter.java   |    8 +-
 .../src/main/java/org/apache/crunch/FilterFn.java  |   54 ++--
 .../java/org/apache/crunch/GroupingOptions.java    |   30 +-
 crunch/src/main/java/org/apache/crunch/MapFn.java  |    8 +-
 .../main/java/org/apache/crunch/PCollection.java   |  113 +++--
 .../main/java/org/apache/crunch/PGroupedTable.java |    5 +-
 crunch/src/main/java/org/apache/crunch/PTable.java |   62 ++--
 crunch/src/main/java/org/apache/crunch/Pair.java   |   16 +-
 .../src/main/java/org/apache/crunch/Pipeline.java  |   54 ++-
 .../java/org/apache/crunch/PipelineResult.java     |   29 +-
 crunch/src/main/java/org/apache/crunch/Source.java |   19 +-
 .../main/java/org/apache/crunch/SourceTarget.java  |    7 +-
 .../main/java/org/apache/crunch/TableSource.java   |    2 +-
 crunch/src/main/java/org/apache/crunch/Target.java |    4 +-
 crunch/src/main/java/org/apache/crunch/Tuple.java  |    6 +-
 crunch/src/main/java/org/apache/crunch/Tuple3.java |   16 +-
 crunch/src/main/java/org/apache/crunch/Tuple4.java |   21 +-
 crunch/src/main/java/org/apache/crunch/TupleN.java |   14 +-
 .../java/org/apache/crunch/fn/CompositeMapFn.java  |   17 +-
 .../java/org/apache/crunch/fn/ExtractKeyFn.java    |   12 +-
 .../main/java/org/apache/crunch/fn/IdentityFn.java |    2 +-
 .../main/java/org/apache/crunch/fn/MapKeysFn.java  |    2 +-
 .../java/org/apache/crunch/fn/MapValuesFn.java     |    2 +-
 .../main/java/org/apache/crunch/fn/PairMapFn.java  |    9 +-
 .../org/apache/crunch/impl/mem/MemPipeline.java    |   49 +-
 .../crunch/impl/mem/collect/MemCollection.java     |   35 +-
 .../crunch/impl/mem/collect/MemGroupedTable.java   |   26 +-
 .../apache/crunch/impl/mem/collect/MemTable.java   |   25 +-
 .../java/org/apache/crunch/impl/mr/MRPipeline.java |   31 +-
 .../crunch/impl/mr/collect/DoCollectionImpl.java   |    6 +-
 .../apache/crunch/impl/mr/collect/DoTableImpl.java |    9 +-
 .../crunch/impl/mr/collect/InputCollection.java    |    8 +-
 .../apache/crunch/impl/mr/collect/InputTable.java  |    7 +-
 .../crunch/impl/mr/collect/PCollectionImpl.java    |    5 +-
 .../crunch/impl/mr/collect/PGroupedTableImpl.java  |   22 +-
 .../apache/crunch/impl/mr/collect/PTableBase.java  |   39 +-
 .../crunch/impl/mr/collect/UnionCollection.java    |   13 +-
 .../apache/crunch/impl/mr/collect/UnionTable.java  |   16 +-
 .../crunch/impl/mr/emit/IntermediateEmitter.java   |    1 +
 .../crunch/impl/mr/emit/MultipleOutputEmitter.java |    9 +-
 .../apache/crunch/impl/mr/emit/OutputEmitter.java  |    6 +-
 .../org/apache/crunch/impl/mr/exec/CrunchJob.java  |   23 +-
 .../org/apache/crunch/impl/mr/exec/MRExecutor.java |    2 +-
 .../org/apache/crunch/impl/mr/plan/DoNode.java     |   59 +--
 .../apache/crunch/impl/mr/plan/JobNameBuilder.java |   23 +-
 .../apache/crunch/impl/mr/plan/JobPrototype.java   |   52 +--
 .../crunch/impl/mr/plan/MSCROutputHandler.java     |   16 +-
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java    |   84 ++--
 .../org/apache/crunch/impl/mr/plan/NodePath.java   |    7 +-
 .../crunch/impl/mr/plan/PlanningParameters.java    |    7 +-
 .../apache/crunch/impl/mr/run/CrunchCombiner.java  |    2 +-
 .../crunch/impl/mr/run/CrunchInputFormat.java      |   22 +-
 .../crunch/impl/mr/run/CrunchInputSplit.java       |   10 +-
 .../apache/crunch/impl/mr/run/CrunchInputs.java    |   19 +-
 .../apache/crunch/impl/mr/run/CrunchMapper.java    |    7 +-
 .../crunch/impl/mr/run/CrunchRecordReader.java     |   20 +-
 .../apache/crunch/impl/mr/run/CrunchReducer.java   |    9 +-
 .../crunch/impl/mr/run/CrunchRuntimeException.java |    8 +-
 .../crunch/impl/mr/run/CrunchTaskContext.java      |   13 +-
 .../org/apache/crunch/impl/mr/run/NodeContext.java |    4 +-
 .../java/org/apache/crunch/impl/mr/run/RTNode.java |   28 +-
 .../crunch/impl/mr/run/RuntimeParameters.java      |    9 +-
 .../impl/mr/run/TaskAttemptContextFactory.java     |    8 +-
 crunch/src/main/java/org/apache/crunch/io/At.java  |   57 ++--
 .../apache/crunch/io/CompositePathIterable.java    |   70 ++--
 .../src/main/java/org/apache/crunch/io/From.java   |   77 ++--
 .../java/org/apache/crunch/io/MapReduceTarget.java |    5 +-
 .../java/org/apache/crunch/io/PathTargetImpl.java  |   33 +-
 .../java/org/apache/crunch/io/ReadableSource.java  |    3 +-
 .../org/apache/crunch/io/ReadableSourceTarget.java |    9 +-
 .../org/apache/crunch/io/SourceTargetHelper.java   |   54 ++--
 crunch/src/main/java/org/apache/crunch/io/To.java  |   45 +-
 .../org/apache/crunch/io/avro/AvroFileSource.java  |    3 +-
 .../crunch/io/avro/AvroFileSourceTarget.java       |    7 +-
 .../org/apache/crunch/io/avro/AvroFileTarget.java  |   21 +-
 .../apache/crunch/io/hbase/HBaseSourceTarget.java  |   29 +-
 .../org/apache/crunch/io/hbase/HBaseTarget.java    |   33 +-
 .../org/apache/crunch/io/impl/FileSourceImpl.java  |   49 +--
 .../apache/crunch/io/impl/FileTableSourceImpl.java |   17 +-
 .../org/apache/crunch/io/impl/FileTargetImpl.java  |   48 +-
 .../org/apache/crunch/io/impl/InputBundle.java     |   24 +-
 .../io/impl/ReadableSourcePathTargetImpl.java      |   12 +-
 .../crunch/io/impl/ReadableSourceTargetImpl.java   |   12 +-
 .../crunch/io/impl/SourcePathTargetImpl.java       |   19 +-
 .../apache/crunch/io/impl/SourceTargetImpl.java    |   43 +-
 .../crunch/io/impl/TableSourcePathTargetImpl.java  |    9 +-
 .../crunch/io/impl/TableSourceTargetImpl.java      |    9 +-
 .../org/apache/crunch/io/seq/SeqFileHelper.java    |   14 +-
 .../apache/crunch/io/seq/SeqFileReaderFactory.java |   82 ++--
 .../org/apache/crunch/io/seq/SeqFileSource.java    |    9 +-
 .../apache/crunch/io/seq/SeqFileSourceTarget.java  |    7 +-
 .../crunch/io/seq/SeqFileTableReaderFactory.java   |   62 ++--
 .../apache/crunch/io/seq/SeqFileTableSource.java   |   15 +-
 .../crunch/io/seq/SeqFileTableSourceTarget.java    |   14 +-
 .../org/apache/crunch/io/seq/SeqFileTarget.java    |    9 +-
 .../crunch/io/text/BZip2TextInputFormat.java       |   43 +-
 .../apache/crunch/io/text/CBZip2InputStream.java   |  301 +++++-------
 .../crunch/io/text/TextFileReaderFactory.java      |   71 ++--
 .../org/apache/crunch/io/text/TextFileSource.java  |   58 ++--
 .../crunch/io/text/TextFileSourceTarget.java       |    9 +-
 .../org/apache/crunch/io/text/TextFileTarget.java  |    9 +-
 .../main/java/org/apache/crunch/lib/Aggregate.java |  182 ++++----
 .../main/java/org/apache/crunch/lib/Cartesian.java |  232 +++++-----
 .../main/java/org/apache/crunch/lib/Cogroup.java   |   22 +-
 .../src/main/java/org/apache/crunch/lib/Join.java  |  144 ++++---
 .../main/java/org/apache/crunch/lib/PTables.java   |   12 +-
 .../main/java/org/apache/crunch/lib/Sample.java    |    9 +-
 .../src/main/java/org/apache/crunch/lib/Set.java   |   55 +--
 .../src/main/java/org/apache/crunch/lib/Sort.java  |  294 ++++++-------
 .../apache/crunch/lib/join/FullOuterJoinFn.java    |   21 +-
 .../org/apache/crunch/lib/join/InnerJoinFn.java    |   21 +-
 .../java/org/apache/crunch/lib/join/JoinFn.java    |   45 +-
 .../java/org/apache/crunch/lib/join/JoinUtils.java |   31 +-
 .../apache/crunch/lib/join/LeftOuterJoinFn.java    |   21 +-
 .../org/apache/crunch/lib/join/MapsideJoin.java    |   21 +-
 .../apache/crunch/lib/join/RightOuterJoinFn.java   |   19 +-
 .../crunch/materialize/MaterializableIterable.java |   27 +-
 .../crunch/materialize/MaterializableMap.java      |    8 +-
 .../java/org/apache/crunch/test/FileHelper.java    |   10 +-
 .../org/apache/crunch/test/InMemoryEmitter.java    |   16 +-
 .../java/org/apache/crunch/test/TestCounters.java  |   10 +-
 .../java/org/apache/crunch/tool/CrunchTool.java    |   61 ++--
 .../java/org/apache/crunch/types/Converter.java    |   14 +-
 .../org/apache/crunch/types/PGroupedTableType.java |   29 +-
 .../java/org/apache/crunch/types/PTableType.java   |    6 +-
 .../main/java/org/apache/crunch/types/PType.java   |    3 +-
 .../java/org/apache/crunch/types/PTypeFamily.java  |   22 +-
 .../java/org/apache/crunch/types/PTypeUtils.java   |   14 +-
 .../java/org/apache/crunch/types/TupleFactory.java |   21 +-
 .../apache/crunch/types/avro/AvroDeepCopier.java   |    4 +-
 .../crunch/types/avro/AvroGroupedTableType.java    |    3 +-
 .../apache/crunch/types/avro/AvroInputFormat.java  |   34 +-
 .../apache/crunch/types/avro/AvroKeyConverter.java |   10 +-
 .../apache/crunch/types/avro/AvroOutputFormat.java |   21 +-
 .../crunch/types/avro/AvroPairConverter.java       |   21 +-
 .../apache/crunch/types/avro/AvroRecordReader.java |  131 +++---
 .../apache/crunch/types/avro/AvroTableType.java    |   10 +-
 .../org/apache/crunch/types/avro/AvroType.java     |    6 +-
 .../apache/crunch/types/avro/AvroTypeFamily.java   |    6 +-
 .../crunch/types/avro/AvroUtf8InputFormat.java     |   20 +-
 .../java/org/apache/crunch/types/avro/Avros.java   |   64 ++--
 .../crunch/types/avro/ReflectDataFactory.java      |    8 +-
 .../crunch/types/avro/SafeAvroSerialization.java   |   55 +--
 .../types/writable/GenericArrayWritable.java       |   27 +-
 .../crunch/types/writable/TupleWritable.java       |    5 +-
 .../types/writable/WritableGroupedTableType.java   |   21 +-
 .../types/writable/WritablePairConverter.java      |    6 +-
 .../crunch/types/writable/WritableTableType.java   |   50 +--
 .../apache/crunch/types/writable/WritableType.java |   43 +-
 .../crunch/types/writable/WritableTypeFamily.java  |   28 +-
 .../types/writable/WritableValueConverter.java     |    9 +-
 .../apache/crunch/types/writable/Writables.java    |  166 +++----
 .../main/java/org/apache/crunch/util/Collects.java |    7 +-
 .../java/org/apache/crunch/util/DistCache.java     |  164 ++++---
 .../main/java/org/apache/crunch/util/PTypes.java   |  114 +++---
 .../main/java/org/apache/crunch/util/Protos.java   |   45 +-
 .../main/java/org/apache/crunch/util/Tuples.java   |   51 +-
 .../lib/jobcontrol/CrunchControlledJob.java        |   19 +-
 .../mapreduce/lib/jobcontrol/CrunchJobControl.java |   30 +-
 .../lib/output/CrunchMultipleOutputs.java          |  296 ++++++------
 .../test/java/org/apache/crunch/CombineFnTest.java |  152 +++----
 .../test/java/org/apache/crunch/FilterFnTest.java  |    4 +-
 .../src/test/java/org/apache/crunch/PairTest.java  |    8 +-
 .../src/test/java/org/apache/crunch/TupleTest.java |   27 +-
 .../org/apache/crunch/fn/ExtractKeyFnTest.java     |   30 +-
 .../java/org/apache/crunch/fn/MapKeysTest.java     |   10 +-
 .../java/org/apache/crunch/fn/MapValuesTest.java   |    9 +-
 .../java/org/apache/crunch/fn/PairMapTest.java     |    9 +-
 .../org/apache/crunch/fn/StoreLastEmitter.java     |    6 +-
 .../org/apache/crunch/impl/mr/MRPipelineTest.java  |    5 +-
 .../impl/mr/collect/DoCollectionImplTest.java      |   38 +-
 .../crunch/impl/mr/collect/DoTableImplTest.java    |  106 ++---
 .../crunch/impl/mr/plan/JobNameBuilderTest.java    |    4 +-
 .../apache/crunch/io/SourceTargetHelperTest.java   |   50 +-
 .../crunch/io/avro/AvroFileReaderFactoryTest.java  |  218 +++++-----
 .../apache/crunch/io/avro/AvroFileSourceTest.java  |    8 +-
 .../lib/AvroIndexedRecordPartitionerTest.java      |  133 +++---
 .../java/org/apache/crunch/lib/CartesianTest.java  |   14 +-
 .../java/org/apache/crunch/lib/SampleTest.java     |   10 +-
 .../crunch/lib/TupleWritablePartitionerTest.java   |   76 ++--
 .../java/org/apache/crunch/test/CountersTest.java  |   29 +-
 .../test/java/org/apache/crunch/test/Employee.java |  115 +++--
 .../test/java/org/apache/crunch/test/Person.java   |  116 +++--
 .../org/apache/crunch/types/PTypeUtilsTest.java    |   18 +-
 .../crunch/types/avro/AvroDeepCopierTest.java      |   14 +-
 .../types/avro/AvroGroupedTableTypeTest.java       |    8 +-
 .../crunch/types/avro/AvroTableTypeTest.java       |    3 +-
 .../org/apache/crunch/types/avro/AvrosTest.java    |   20 +-
 .../writable/WritableGroupedTableTypeTest.java     |   14 +-
 .../types/writable/WritableTableTypeTest.java      |    6 +-
 .../crunch/types/writable/WritablesTest.java       |  108 ++---
 .../java/org/apache/crunch/util/DistCacheTest.java |   70 ++--
 .../apache/crunch/examples/AverageBytesByIP.java   |   62 ++--
 .../org/apache/crunch/examples/TotalBytesByIP.java |   44 +-
 .../java/org/apache/crunch/examples/WordCount.java |   10 +-
 res/crunch-formatting-styles.xml                   |    4 +-
 226 files changed, 4831 insertions(+), 5052 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CollectionsIT.java b/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
index d66624e..c6a9c77 100644
--- a/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
+++ b/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
@@ -22,57 +22,56 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.util.Collection;
 
-import org.junit.Test;
-
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.FileHelper;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Test;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 @SuppressWarnings("serial")
 public class CollectionsIT {
-  
+
   public static class AggregateStringListFn implements CombineFn.Aggregator<Collection<String>> {
     private final Collection<String> rtn = Lists.newArrayList();
-    
+
     @Override
     public void reset() {
       rtn.clear();
     }
-    
+
     @Override
     public void update(Collection<String> values) {
       rtn.addAll(values);
-    }      
-    
+    }
+
     @Override
     public Iterable<Collection<String>> results() {
       return ImmutableList.of(rtn);
     }
   }
-  
+
   public static PTable<String, Collection<String>> listOfCharcters(PCollection<String> lines, PTypeFamily typeFamily) {
-     
+
     return lines.parallelDo(new DoFn<String, Pair<String, Collection<String>>>() {
       @Override
       public void process(String line, Emitter<Pair<String, Collection<String>>> emitter) {
         for (String word : line.split("\\s+")) {
           Collection<String> characters = Lists.newArrayList();
-          for(char c : word.toCharArray()) {
+          for (char c : word.toCharArray()) {
             characters.add(String.valueOf(c));
           }
           emitter.emit(Pair.of(word, characters));
         }
       }
-    }, typeFamily.tableOf(typeFamily.strings(), typeFamily.collections(typeFamily.strings())))
-    .groupByKey()
-    .combineValues(CombineFn.<String, Collection<String>>aggregator(new AggregateStringListFn()));
+    }, typeFamily.tableOf(typeFamily.strings(), typeFamily.collections(typeFamily.strings()))).groupByKey()
+        .combineValues(CombineFn.<String, Collection<String>> aggregator(new AggregateStringListFn()));
   }
-  
+
   @Test
   public void testWritables() throws IOException {
     run(new MRPipeline(CollectionsIT.class), WritableTypeFamily.getInstance());
@@ -92,21 +91,21 @@ public class CollectionsIT {
   public void testInMemoryAvro() throws IOException {
     run(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
   }
-  
+
   public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-	String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
-    
+    String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
+
     PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
     Iterable<Pair<String, Collection<String>>> lines = listOfCharcters(shakespeare, typeFamily).materialize();
-    
+
     boolean passed = false;
     for (Pair<String, Collection<String>> line : lines) {
-      if(line.first().startsWith("yellow")) {
+      if (line.first().startsWith("yellow")) {
         passed = true;
         break;
       }
     }
     pipeline.done();
     assertTrue(passed);
-  }  
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/MapsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MapsIT.java b/crunch/src/it/java/org/apache/crunch/MapsIT.java
index cedb9a3..b1c1298 100644
--- a/crunch/src/it/java/org/apache/crunch/MapsIT.java
+++ b/crunch/src/it/java/org/apache/crunch/MapsIT.java
@@ -19,13 +19,13 @@ package org.apache.crunch;
 
 import java.util.Map;
 
-import org.junit.Test;
-
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.FileHelper;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Test;
+
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
@@ -33,59 +33,57 @@ public class MapsIT {
 
   @Test
   public void testWritables() throws Exception {
-	run(WritableTypeFamily.getInstance());
+    run(WritableTypeFamily.getInstance());
   }
-  
+
   @Test
   public void testAvros() throws Exception {
-	run(AvroTypeFamily.getInstance());
+    run(AvroTypeFamily.getInstance());
   }
-  
+
   public static void run(PTypeFamily typeFamily) throws Exception {
-	Pipeline pipeline = new MRPipeline(MapsIT.class);
+    Pipeline pipeline = new MRPipeline(MapsIT.class);
     String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
     PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
-    Iterable<Pair<String, Map<String, Long>>> output = shakespeare.parallelDo(
-      new DoFn<String, Pair<String, Map<String, Long>>>() {
-	    @Override
-	    public void process(String input,
-		    Emitter<Pair<String, Map<String, Long>>> emitter) {
-		  String last = null;
-		  for (String word : input.toLowerCase().split("\\W+")) {
-		    if (!word.isEmpty()) {
-			  String firstChar = word.substring(0, 1);
-		      if (last != null) {
-		    	Map<String, Long> cc = ImmutableMap.of(firstChar, 1L);
-			    emitter.emit(Pair.of(last, cc));
-		      }
-		      last = firstChar;
-		    }
-		  }
-	    }
-      }, typeFamily.tableOf(typeFamily.strings(), typeFamily.maps(typeFamily.longs())))
-      .groupByKey()
-      .combineValues(new CombineFn<String, Map<String, Long>>() {
-	    @Override
-	    public void process(Pair<String, Iterable<Map<String, Long>>> input,
-		    Emitter<Pair<String, Map<String, Long>>> emitter) {
-		  Map<String, Long> agg = Maps.newHashMap();
-		  for (Map<String, Long> in : input.second()) {
-		    for (Map.Entry<String, Long> e : in.entrySet()) {
-			  if (!agg.containsKey(e.getKey())) {
-			    agg.put(e.getKey(), e.getValue());
-			  } else {
-			    agg.put(e.getKey(), e.getValue() + agg.get(e.getKey()));
-			  }
-		    }
-		  }
-		  emitter.emit(Pair.of(input.first(), agg));
-	    }
-	  }).materialize();
+    Iterable<Pair<String, Map<String, Long>>> output = shakespeare
+        .parallelDo(new DoFn<String, Pair<String, Map<String, Long>>>() {
+          @Override
+          public void process(String input, Emitter<Pair<String, Map<String, Long>>> emitter) {
+            String last = null;
+            for (String word : input.toLowerCase().split("\\W+")) {
+              if (!word.isEmpty()) {
+                String firstChar = word.substring(0, 1);
+                if (last != null) {
+                  Map<String, Long> cc = ImmutableMap.of(firstChar, 1L);
+                  emitter.emit(Pair.of(last, cc));
+                }
+                last = firstChar;
+              }
+            }
+          }
+        }, typeFamily.tableOf(typeFamily.strings(), typeFamily.maps(typeFamily.longs()))).groupByKey()
+        .combineValues(new CombineFn<String, Map<String, Long>>() {
+          @Override
+          public void process(Pair<String, Iterable<Map<String, Long>>> input,
+              Emitter<Pair<String, Map<String, Long>>> emitter) {
+            Map<String, Long> agg = Maps.newHashMap();
+            for (Map<String, Long> in : input.second()) {
+              for (Map.Entry<String, Long> e : in.entrySet()) {
+                if (!agg.containsKey(e.getKey())) {
+                  agg.put(e.getKey(), e.getValue());
+                } else {
+                  agg.put(e.getKey(), e.getValue() + agg.get(e.getKey()));
+                }
+              }
+            }
+            emitter.emit(Pair.of(input.first(), agg));
+          }
+        }).materialize();
     boolean passed = false;
     for (Pair<String, Map<String, Long>> v : output) {
       if (v.first() == "k" && v.second().get("n") == 8L) {
-    	passed = true;
-    	break;
+        passed = true;
+        break;
       }
     }
     pipeline.done();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/MaterializeIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MaterializeIT.java b/crunch/src/it/java/org/apache/crunch/MaterializeIT.java
index 95e239d..3f30e70 100644
--- a/crunch/src/it/java/org/apache/crunch/MaterializeIT.java
+++ b/crunch/src/it/java/org/apache/crunch/MaterializeIT.java
@@ -23,85 +23,82 @@ import static junit.framework.Assert.assertTrue;
 import java.io.IOException;
 import java.util.List;
 
-import org.junit.Test;
-
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.FileHelper;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Test;
+
 import com.google.common.collect.Lists;
 
 public class MaterializeIT {
 
-	/** Filter that rejects everything. */
-	@SuppressWarnings("serial")
-	private static class FalseFilterFn extends FilterFn<String> {
-
-		@Override
-		public boolean accept(final String input) {
-			return false;
-		}
-	}
-
-	@Test
-	public void testMaterializeInput_Writables() throws IOException {
-		runMaterializeInput(new MRPipeline(MaterializeIT.class), WritableTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeInput_Avro() throws IOException {
-		runMaterializeInput(new MRPipeline(MaterializeIT.class), AvroTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeInput_InMemoryWritables() throws IOException {
-		runMaterializeInput(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeInput_InMemoryAvro() throws IOException {
-		runMaterializeInput(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeEmptyIntermediate_Writables() throws IOException {
-		runMaterializeEmptyIntermediate(new MRPipeline(MaterializeIT.class),
-				WritableTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeEmptyIntermediate_Avro() throws IOException {
-		runMaterializeEmptyIntermediate(new MRPipeline(MaterializeIT.class),
-				AvroTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeEmptyIntermediate_InMemoryWritables() throws IOException {
-		runMaterializeEmptyIntermediate(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeEmptyIntermediate_InMemoryAvro() throws IOException {
-		runMaterializeEmptyIntermediate(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
-	}
-
-	public void runMaterializeInput(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-		List<String> expectedContent = Lists.newArrayList("b", "c", "a", "e");
-		String inputPath = FileHelper.createTempCopyOf("set1.txt");
-
-		PCollection<String> lines = pipeline.readTextFile(inputPath);
-		assertEquals(expectedContent, Lists.newArrayList(lines.materialize()));
-		pipeline.done();
-	}
-
-	public void runMaterializeEmptyIntermediate(Pipeline pipeline, PTypeFamily typeFamily)
-			throws IOException {
-		String inputPath = FileHelper.createTempCopyOf("set1.txt");
-		PCollection<String> empty = pipeline.readTextFile(inputPath).filter(new FalseFilterFn());
-
-		assertTrue(Lists.newArrayList(empty.materialize()).isEmpty());
-		pipeline.done();
-	}
+  /** Filter that rejects everything. */
+  @SuppressWarnings("serial")
+  private static class FalseFilterFn extends FilterFn<String> {
+
+    @Override
+    public boolean accept(final String input) {
+      return false;
+    }
+  }
+
+  @Test
+  public void testMaterializeInput_Writables() throws IOException {
+    runMaterializeInput(new MRPipeline(MaterializeIT.class), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testMaterializeInput_Avro() throws IOException {
+    runMaterializeInput(new MRPipeline(MaterializeIT.class), AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testMaterializeInput_InMemoryWritables() throws IOException {
+    runMaterializeInput(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testMaterializeInput_InMemoryAvro() throws IOException {
+    runMaterializeInput(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testMaterializeEmptyIntermediate_Writables() throws IOException {
+    runMaterializeEmptyIntermediate(new MRPipeline(MaterializeIT.class), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testMaterializeEmptyIntermediate_Avro() throws IOException {
+    runMaterializeEmptyIntermediate(new MRPipeline(MaterializeIT.class), AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testMaterializeEmptyIntermediate_InMemoryWritables() throws IOException {
+    runMaterializeEmptyIntermediate(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testMaterializeEmptyIntermediate_InMemoryAvro() throws IOException {
+    runMaterializeEmptyIntermediate(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
+  }
+
+  public void runMaterializeInput(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+    List<String> expectedContent = Lists.newArrayList("b", "c", "a", "e");
+    String inputPath = FileHelper.createTempCopyOf("set1.txt");
+
+    PCollection<String> lines = pipeline.readTextFile(inputPath);
+    assertEquals(expectedContent, Lists.newArrayList(lines.materialize()));
+    pipeline.done();
+  }
+
+  public void runMaterializeEmptyIntermediate(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+    String inputPath = FileHelper.createTempCopyOf("set1.txt");
+    PCollection<String> empty = pipeline.readTextFile(inputPath).filter(new FalseFilterFn());
+
+    assertTrue(Lists.newArrayList(empty.materialize()).isEmpty());
+    pipeline.done();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/MaterializeToMapIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MaterializeToMapIT.java b/crunch/src/it/java/org/apache/crunch/MaterializeToMapIT.java
index 6215dd9..db7449a 100644
--- a/crunch/src/it/java/org/apache/crunch/MaterializeToMapIT.java
+++ b/crunch/src/it/java/org/apache/crunch/MaterializeToMapIT.java
@@ -22,55 +22,55 @@ import static junit.framework.Assert.assertTrue;
 import java.io.IOException;
 import java.util.Map;
 
-import org.junit.Test;
-
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.FileHelper;
 import org.apache.crunch.types.PTypeFamily;
+import org.junit.Test;
+
 import com.google.common.collect.ImmutableList;
 
 public class MaterializeToMapIT {
-  
-  static final ImmutableList<Pair<Integer,String>> kvPairs = 
-      ImmutableList.of(
-          Pair.of(0, "a"),
-          Pair.of(1, "b"),
-          Pair.of(2, "c"),
-          Pair.of(3, "e"));
-  
-  public void assertMatches(Map<Integer,String> m) {
+
+  static final ImmutableList<Pair<Integer, String>> kvPairs = ImmutableList.of(Pair.of(0, "a"), Pair.of(1, "b"),
+      Pair.of(2, "c"), Pair.of(3, "e"));
+
+  public void assertMatches(Map<Integer, String> m) {
     for (Integer k : m.keySet()) {
       System.out.println(k + " " + kvPairs.get(k).second() + " " + m.get(k));
       assertTrue(kvPairs.get(k).second().equals(m.get(k)));
     }
   }
-  
+
   @Test
   public void testMemMaterializeToMap() {
     assertMatches(MemPipeline.tableOf(kvPairs).materializeToMap());
   }
-  
-  private static class Set1Mapper extends MapFn<String,Pair<Integer,String>> {
+
+  private static class Set1Mapper extends MapFn<String, Pair<Integer, String>> {
     @Override
     public Pair<Integer, String> map(String input) {
-      
+
       int k = -1;
-      if (input.equals("a")) k = 0;
-      else if (input.equals("b")) k = 1;
-      else if (input.equals("c")) k = 2;
-      else if (input.equals("e")) k = 3;
+      if (input.equals("a"))
+        k = 0;
+      else if (input.equals("b"))
+        k = 1;
+      else if (input.equals("c"))
+        k = 2;
+      else if (input.equals("e"))
+        k = 3;
       return Pair.of(k, input);
     }
   }
-  
+
   @Test
   public void testMRMaterializeToMap() throws IOException {
     Pipeline p = new MRPipeline(MaterializeToMapIT.class);
     String inputFile = FileHelper.createTempCopyOf("set1.txt");
     PCollection<String> c = p.readTextFile(inputFile);
     PTypeFamily tf = c.getTypeFamily();
-    PTable<Integer,String> t = c.parallelDo(new Set1Mapper(), tf.tableOf(tf.ints(), tf.strings()));
+    PTable<Integer, String> t = c.parallelDo(new Set1Mapper(), tf.tableOf(tf.ints(), tf.strings()));
     Map<Integer, String> m = t.materializeToMap();
     assertMatches(m);
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java b/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
index 63120e5..7f50b54 100644
--- a/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
+++ b/crunch/src/it/java/org/apache/crunch/MultipleOutputIT.java
@@ -25,48 +25,49 @@ import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.List;
 
-import org.junit.Test;
-
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
 import org.apache.crunch.test.FileHelper;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Test;
+
 import com.google.common.io.Files;
 
 public class MultipleOutputIT {
-  
+
   public static PCollection<String> evenCountLetters(PCollection<String> words, PTypeFamily typeFamily) {
-    return words.parallelDo("even", new FilterFn<String>(){
+    return words.parallelDo("even", new FilterFn<String>() {
 
-        @Override
-        public boolean accept(String input) {
-            return input.length() % 2 == 0;
-        }}, typeFamily.strings());
+      @Override
+      public boolean accept(String input) {
+        return input.length() % 2 == 0;
+      }
+    }, typeFamily.strings());
   }
-  
+
   public static PCollection<String> oddCountLetters(PCollection<String> words, PTypeFamily typeFamily) {
-      return words.parallelDo("odd", new FilterFn<String>(){
-
-        @Override
-        public boolean accept(String input) {
-            return input.length() % 2 != 0;
-        }}, typeFamily.strings());
-       
-    }
-  
+    return words.parallelDo("odd", new FilterFn<String>() {
+
+      @Override
+      public boolean accept(String input) {
+        return input.length() % 2 != 0;
+      }
+    }, typeFamily.strings());
+
+  }
+
   public static PTable<String, Long> substr(PTable<String, Long> ptable) {
-	return ptable.parallelDo(new DoFn<Pair<String, Long>, Pair<String, Long>>() {
-	  public void process(Pair<String, Long> input,
-		  Emitter<Pair<String, Long>> emitter) {
-		if (input.first().length() > 0) {
-		  emitter.emit(Pair.of(input.first().substring(0, 1), input.second()));
-		}
-	  }      
+    return ptable.parallelDo(new DoFn<Pair<String, Long>, Pair<String, Long>>() {
+      public void process(Pair<String, Long> input, Emitter<Pair<String, Long>> emitter) {
+        if (input.first().length() > 0) {
+          emitter.emit(Pair.of(input.first().substring(0, 1), input.second()));
+        }
+      }
     }, ptable.getPTableType());
   }
-  
+
   @Test
   public void testWritables() throws IOException {
     run(new MRPipeline(MultipleOutputIT.class), WritableTypeFamily.getInstance());
@@ -76,33 +77,31 @@ public class MultipleOutputIT {
   public void testAvro() throws IOException {
     run(new MRPipeline(MultipleOutputIT.class), AvroTypeFamily.getInstance());
   }
- 
-  
+
   public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-	String inputPath = FileHelper.createTempCopyOf("letters.txt");
-	File outputEven = FileHelper.createOutputPath();
-	File outputOdd = FileHelper.createOutputPath();
-	String outputPathEven = outputEven.getAbsolutePath();
-	String outputPathOdd = outputOdd.getAbsolutePath();
-	
-    PCollection<String> words = pipeline.read(
-         At.textFile(inputPath, typeFamily.strings()));
-    
+    String inputPath = FileHelper.createTempCopyOf("letters.txt");
+    File outputEven = FileHelper.createOutputPath();
+    File outputOdd = FileHelper.createOutputPath();
+    String outputPathEven = outputEven.getAbsolutePath();
+    String outputPathOdd = outputOdd.getAbsolutePath();
+
+    PCollection<String> words = pipeline.read(At.textFile(inputPath, typeFamily.strings()));
+
     PCollection<String> evenCountWords = evenCountLetters(words, typeFamily);
     PCollection<String> oddCountWords = oddCountLetters(words, typeFamily);
     pipeline.writeTextFile(evenCountWords, outputPathEven);
     pipeline.writeTextFile(oddCountWords, outputPathOdd);
-    
+
     pipeline.done();
-   
+
     checkFileContents(outputPathEven, Arrays.asList("bb"));
     checkFileContents(outputPathOdd, Arrays.asList("a"));
-   
-	outputEven.deleteOnExit();
-	outputOdd.deleteOnExit();
-  }  
-  
-  private void checkFileContents(String filePath, List<String> expected) throws IOException{
+
+    outputEven.deleteOnExit();
+    outputOdd.deleteOnExit();
+  }
+
+  private void checkFileContents(String filePath, List<String> expected) throws IOException {
     File outputFile = new File(filePath, "part-m-00000");
     List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
     assertEquals(expected, lines);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java b/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
index 14a3b3f..a6571bb 100644
--- a/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
+++ b/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
@@ -17,138 +17,137 @@
  */
 package org.apache.crunch;
 
+import static com.google.common.collect.Lists.newArrayList;
 import static org.apache.crunch.io.At.sequenceFile;
 import static org.apache.crunch.io.At.textFile;
 import static org.apache.crunch.types.writable.Writables.strings;
-import static com.google.common.collect.Lists.newArrayList;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
 import java.io.IOException;
 
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.FileHelper;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
 
 public class PCollectionGetSizeIT {
 
-    private String emptyInputPath;
-    private String nonEmptyInputPath;
-    private String outputPath;
+  private String emptyInputPath;
+  private String nonEmptyInputPath;
+  private String outputPath;
 
-    /** Filter that rejects everything. */
-    @SuppressWarnings("serial")
-    private static class FalseFilterFn extends FilterFn<String> {
+  /** Filter that rejects everything. */
+  @SuppressWarnings("serial")
+  private static class FalseFilterFn extends FilterFn<String> {
 
-        @Override
-        public boolean accept(final String input) {
-            return false;
-        }
+    @Override
+    public boolean accept(final String input) {
+      return false;
     }
+  }
 
-    @Before
-    public void setUp() throws IOException {
-        emptyInputPath = FileHelper.createTempCopyOf("emptyTextFile.txt");
-        nonEmptyInputPath = FileHelper.createTempCopyOf("set1.txt");
-        outputPath = FileHelper.createOutputPath().getAbsolutePath();
-    }
+  @Before
+  public void setUp() throws IOException {
+    emptyInputPath = FileHelper.createTempCopyOf("emptyTextFile.txt");
+    nonEmptyInputPath = FileHelper.createTempCopyOf("set1.txt");
+    outputPath = FileHelper.createOutputPath().getAbsolutePath();
+  }
 
-    @Test
-    public void testGetSizeOfEmptyInput_MRPipeline() throws IOException {
-        testCollectionGetSizeOfEmptyInput(new MRPipeline(this.getClass()));
-    }
+  @Test
+  public void testGetSizeOfEmptyInput_MRPipeline() throws IOException {
+    testCollectionGetSizeOfEmptyInput(new MRPipeline(this.getClass()));
+  }
 
-    @Test
-    public void testGetSizeOfEmptyInput_MemPipeline() throws IOException {
-        testCollectionGetSizeOfEmptyInput(MemPipeline.getInstance());
-    }
+  @Test
+  public void testGetSizeOfEmptyInput_MemPipeline() throws IOException {
+    testCollectionGetSizeOfEmptyInput(MemPipeline.getInstance());
+  }
 
-    private void testCollectionGetSizeOfEmptyInput(Pipeline pipeline) throws IOException {
+  private void testCollectionGetSizeOfEmptyInput(Pipeline pipeline) throws IOException {
 
-        assertThat(pipeline.read(textFile(emptyInputPath)).getSize(), is(0L));
-    }
+    assertThat(pipeline.read(textFile(emptyInputPath)).getSize(), is(0L));
+  }
 
-    @Test
-    public void testMaterializeEmptyInput_MRPipeline() throws IOException {
-        testMaterializeEmptyInput(new MRPipeline(this.getClass()));
-    }
+  @Test
+  public void testMaterializeEmptyInput_MRPipeline() throws IOException {
+    testMaterializeEmptyInput(new MRPipeline(this.getClass()));
+  }
 
-    @Test
-    public void testMaterializeEmptyImput_MemPipeline() throws IOException {
-        testMaterializeEmptyInput(MemPipeline.getInstance());
-    }
+  @Test
+  public void testMaterializeEmptyImput_MemPipeline() throws IOException {
+    testMaterializeEmptyInput(MemPipeline.getInstance());
+  }
 
-    private void testMaterializeEmptyInput(Pipeline pipeline) throws IOException {
-        assertThat(newArrayList(pipeline.readTextFile(emptyInputPath).materialize().iterator()).size(), is(0));
-    }
+  private void testMaterializeEmptyInput(Pipeline pipeline) throws IOException {
+    assertThat(newArrayList(pipeline.readTextFile(emptyInputPath).materialize().iterator()).size(), is(0));
+  }
 
-    @Test
-    public void testGetSizeOfEmptyIntermediatePCollection_MRPipeline() throws IOException {
+  @Test
+  public void testGetSizeOfEmptyIntermediatePCollection_MRPipeline() throws IOException {
 
-        PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(new MRPipeline(this.getClass()));
+    PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(new MRPipeline(this.getClass()));
 
-        assertThat(emptyIntermediate.getSize(), is(0L));
-    }
+    assertThat(emptyIntermediate.getSize(), is(0L));
+  }
 
-    @Test
-    @Ignore("GetSize of a DoCollection is only an estimate based on scale factor, so we can't count on it being reported as 0")
-    public void testGetSizeOfEmptyIntermediatePCollection_NoSave_MRPipeline() throws IOException {
+  @Test
+  @Ignore("GetSize of a DoCollection is only an estimate based on scale factor, so we can't count on it being reported as 0")
+  public void testGetSizeOfEmptyIntermediatePCollection_NoSave_MRPipeline() throws IOException {
 
-        PCollection<String> data = new MRPipeline(this.getClass()).readTextFile(nonEmptyInputPath);
+    PCollection<String> data = new MRPipeline(this.getClass()).readTextFile(nonEmptyInputPath);
 
-        PCollection<String> emptyPCollection = data.filter(new FalseFilterFn());
+    PCollection<String> emptyPCollection = data.filter(new FalseFilterFn());
 
-        assertThat(emptyPCollection.getSize(), is(0L));
-    }
+    assertThat(emptyPCollection.getSize(), is(0L));
+  }
 
-    @Test
-    public void testGetSizeOfEmptyIntermediatePCollection_MemPipeline() {
+  @Test
+  public void testGetSizeOfEmptyIntermediatePCollection_MemPipeline() {
 
-        PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(MemPipeline.getInstance());
+    PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(MemPipeline.getInstance());
 
-        assertThat(emptyIntermediate.getSize(), is(0L));
-    }
+    assertThat(emptyIntermediate.getSize(), is(0L));
+  }
 
-    @Test
-    public void testMaterializeOfEmptyIntermediatePCollection_MRPipeline() throws IOException {
+  @Test
+  public void testMaterializeOfEmptyIntermediatePCollection_MRPipeline() throws IOException {
 
-        PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(new MRPipeline(this.getClass()));
+    PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(new MRPipeline(this.getClass()));
 
-        assertThat(newArrayList(emptyIntermediate.materialize()).size(), is(0));
-    }
+    assertThat(newArrayList(emptyIntermediate.materialize()).size(), is(0));
+  }
 
-    @Test
-    public void testMaterializeOfEmptyIntermediatePCollection_MemPipeline() {
+  @Test
+  public void testMaterializeOfEmptyIntermediatePCollection_MemPipeline() {
 
-        PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(MemPipeline.getInstance());
+    PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(MemPipeline.getInstance());
 
-        assertThat(newArrayList(emptyIntermediate.materialize()).size(), is(0));
-    }
+    assertThat(newArrayList(emptyIntermediate.materialize()).size(), is(0));
+  }
 
-    private PCollection<String> createPesistentEmptyIntermediate(Pipeline pipeline) {
+  private PCollection<String> createPesistentEmptyIntermediate(Pipeline pipeline) {
 
-        PCollection<String> data = pipeline.readTextFile(nonEmptyInputPath);
+    PCollection<String> data = pipeline.readTextFile(nonEmptyInputPath);
 
-        PCollection<String> emptyPCollection = data.filter(new FalseFilterFn());
+    PCollection<String> emptyPCollection = data.filter(new FalseFilterFn());
 
-        emptyPCollection.write(sequenceFile(outputPath, strings()));
+    emptyPCollection.write(sequenceFile(outputPath, strings()));
 
-        pipeline.run();
+    pipeline.run();
 
-        return pipeline.read(sequenceFile(outputPath, strings()));
-    }
+    return pipeline.read(sequenceFile(outputPath, strings()));
+  }
 
-    @Test(expected = IllegalStateException.class)
-    public void testExpectExceptionForGettingSizeOfNonExistingFile_MRPipeline() throws IOException {
-        new MRPipeline(this.getClass()).readTextFile("non_existing.file").getSize();
-    }
+  @Test(expected = IllegalStateException.class)
+  public void testExpectExceptionForGettingSizeOfNonExistingFile_MRPipeline() throws IOException {
+    new MRPipeline(this.getClass()).readTextFile("non_existing.file").getSize();
+  }
 
-    @Test(expected = IllegalStateException.class)
-    public void testExpectExceptionForGettingSizeOfNonExistingFile_MemPipeline() {
-        MemPipeline.getInstance().readTextFile("non_existing.file").getSize();
-    }
+  @Test(expected = IllegalStateException.class)
+  public void testExpectExceptionForGettingSizeOfNonExistingFile_MemPipeline() {
+    MemPipeline.getInstance().readTextFile("non_existing.file").getSize();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/PTableKeyValueIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PTableKeyValueIT.java b/crunch/src/it/java/org/apache/crunch/PTableKeyValueIT.java
index 197dfa9..de370b0 100644
--- a/crunch/src/it/java/org/apache/crunch/PTableKeyValueIT.java
+++ b/crunch/src/it/java/org/apache/crunch/PTableKeyValueIT.java
@@ -25,6 +25,12 @@ import java.util.Collection;
 
 import junit.framework.Assert;
 
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -32,76 +38,62 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.test.FileHelper;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
 import com.google.common.collect.Lists;
 
 @RunWith(value = Parameterized.class)
 public class PTableKeyValueIT implements Serializable {
 
-	private static final long serialVersionUID = 4374227704751746689L;
-
-	private transient PTypeFamily typeFamily;
-	private transient MRPipeline pipeline;
-	private transient String inputFile;
-
-	@Before
-	public void setUp() throws IOException {
-		pipeline = new MRPipeline(PTableKeyValueIT.class);
-		inputFile = FileHelper.createTempCopyOf("set1.txt");
-	}
-
-	@After
-	public void tearDown() {
-		pipeline.done();
-	}
-
-	public PTableKeyValueIT(PTypeFamily typeFamily) {
-		this.typeFamily = typeFamily;
-	}
-
-	@Parameters
-	public static Collection<Object[]> data() {
-		Object[][] data = new Object[][] {
-				{ WritableTypeFamily.getInstance() },
-				{ AvroTypeFamily.getInstance() } };
-		return Arrays.asList(data);
-	}
-
-	@Test
-	public void testKeysAndValues() throws Exception {
-
-		PCollection<String> collection = pipeline.read(At.textFile(inputFile,
-				typeFamily.strings()));
-
-		PTable<String, String> table = collection.parallelDo(
-				new DoFn<String, Pair<String, String>>() {
-
-					@Override
-					public void process(String input,
-							Emitter<Pair<String, String>> emitter) {
-						emitter.emit(Pair.of(input.toUpperCase(), input));
-
-					}
-				}, typeFamily.tableOf(typeFamily.strings(),
-						typeFamily.strings()));
-
-		PCollection<String> keys = table.keys();
-		PCollection<String> values = table.values();
-
-		ArrayList<String> keyList = Lists.newArrayList(keys.materialize()
-				.iterator());
-		ArrayList<String> valueList = Lists.newArrayList(values.materialize()
-				.iterator());
-
-		Assert.assertEquals(keyList.size(), valueList.size());
-		for (int i = 0; i < keyList.size(); i++) {
-			Assert.assertEquals(keyList.get(i), valueList.get(i).toUpperCase());
-		}
-	}
+  private static final long serialVersionUID = 4374227704751746689L;
+
+  private transient PTypeFamily typeFamily;
+  private transient MRPipeline pipeline;
+  private transient String inputFile;
+
+  @Before
+  public void setUp() throws IOException {
+    pipeline = new MRPipeline(PTableKeyValueIT.class);
+    inputFile = FileHelper.createTempCopyOf("set1.txt");
+  }
+
+  @After
+  public void tearDown() {
+    pipeline.done();
+  }
+
+  public PTableKeyValueIT(PTypeFamily typeFamily) {
+    this.typeFamily = typeFamily;
+  }
+
+  @Parameters
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][] { { WritableTypeFamily.getInstance() }, { AvroTypeFamily.getInstance() } };
+    return Arrays.asList(data);
+  }
+
+  @Test
+  public void testKeysAndValues() throws Exception {
+
+    PCollection<String> collection = pipeline.read(At.textFile(inputFile, typeFamily.strings()));
+
+    PTable<String, String> table = collection.parallelDo(new DoFn<String, Pair<String, String>>() {
+
+      @Override
+      public void process(String input, Emitter<Pair<String, String>> emitter) {
+        emitter.emit(Pair.of(input.toUpperCase(), input));
+
+      }
+    }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
+
+    PCollection<String> keys = table.keys();
+    PCollection<String> values = table.values();
+
+    ArrayList<String> keyList = Lists.newArrayList(keys.materialize().iterator());
+    ArrayList<String> valueList = Lists.newArrayList(values.materialize().iterator());
+
+    Assert.assertEquals(keyList.size(), valueList.size());
+    for (int i = 0; i < keyList.size(); i++) {
+      Assert.assertEquals(keyList.get(i), valueList.get(i).toUpperCase());
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/PageRankIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PageRankIT.java b/crunch/src/it/java/org/apache/crunch/PageRankIT.java
index 8eaa49b..98b6c25 100644
--- a/crunch/src/it/java/org/apache/crunch/PageRankIT.java
+++ b/crunch/src/it/java/org/apache/crunch/PageRankIT.java
@@ -22,8 +22,6 @@ import static org.junit.Assert.assertEquals;
 import java.util.Collection;
 import java.util.List;
 
-import org.junit.Test;
-
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.lib.Aggregate;
@@ -34,102 +32,110 @@ import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.apache.crunch.util.PTypes;
+import org.junit.Test;
+
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 public class PageRankIT {
 
   public static class PageRankData {
-	public float score;
-	public float lastScore;
-	public List<String> urls;
-	
-	public PageRankData() { }
-	
-	public PageRankData(float score, float lastScore, Iterable<String> urls) {
-	  this.score = score;
-	  this.lastScore = lastScore;
-	  this.urls = Lists.newArrayList(urls);
-	}
-	
-	public PageRankData next(float newScore) {
-	  return new PageRankData(newScore, score, urls);
-	}
-	
-	public float propagatedScore() {
-	  return score / urls.size();
-	}
-	
-	@Override
-	public String toString() {
-	  return score + " " + lastScore + " " + urls;
-	}
+    public float score;
+    public float lastScore;
+    public List<String> urls;
+
+    public PageRankData() {
+    }
+
+    public PageRankData(float score, float lastScore, Iterable<String> urls) {
+      this.score = score;
+      this.lastScore = lastScore;
+      this.urls = Lists.newArrayList(urls);
+    }
+
+    public PageRankData next(float newScore) {
+      return new PageRankData(newScore, score, urls);
+    }
+
+    public float propagatedScore() {
+      return score / urls.size();
+    }
+
+    @Override
+    public String toString() {
+      return score + " " + lastScore + " " + urls;
+    }
   }
-  
-  @Test public void testAvroReflect() throws Exception {
-	PTypeFamily tf = AvroTypeFamily.getInstance();
-	PType<PageRankData> prType = Avros.reflects(PageRankData.class);
-    run(new MRPipeline(PageRankIT.class), prType, tf);	
+
+  @Test
+  public void testAvroReflect() throws Exception {
+    PTypeFamily tf = AvroTypeFamily.getInstance();
+    PType<PageRankData> prType = Avros.reflects(PageRankData.class);
+    run(new MRPipeline(PageRankIT.class), prType, tf);
   }
-  
-  @Test public void testAvroMReflectInMemory() throws Exception {
+
+  @Test
+  public void testAvroMReflectInMemory() throws Exception {
     PTypeFamily tf = AvroTypeFamily.getInstance();
     PType<PageRankData> prType = Avros.reflects(PageRankData.class);
-    run(MemPipeline.getInstance(), prType, tf);        
+    run(MemPipeline.getInstance(), prType, tf);
   }
-  
-  @Test public void testAvroJSON() throws Exception {
-	PTypeFamily tf = AvroTypeFamily.getInstance();
-	PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
+
+  @Test
+  public void testAvroJSON() throws Exception {
+    PTypeFamily tf = AvroTypeFamily.getInstance();
+    PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
     run(new MRPipeline(PageRankIT.class), prType, tf);
   }
 
-  @Test public void testAvroBSON() throws Exception {
-	PTypeFamily tf = AvroTypeFamily.getInstance();
-	PType<PageRankData> prType = PTypes.smile(PageRankData.class, tf);
+  @Test
+  public void testAvroBSON() throws Exception {
+    PTypeFamily tf = AvroTypeFamily.getInstance();
+    PType<PageRankData> prType = PTypes.smile(PageRankData.class, tf);
     run(new MRPipeline(PageRankIT.class), prType, tf);
   }
-  
-  @Test public void testWritablesJSON() throws Exception {
-	PTypeFamily tf = WritableTypeFamily.getInstance();
-	PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
+
+  @Test
+  public void testWritablesJSON() throws Exception {
+    PTypeFamily tf = WritableTypeFamily.getInstance();
+    PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
     run(new MRPipeline(PageRankIT.class), prType, tf);
   }
 
-  @Test public void testWritablesBSON() throws Exception {
-	PTypeFamily tf = WritableTypeFamily.getInstance();
-	PType<PageRankData> prType = PTypes.smile(PageRankData.class, tf);
+  @Test
+  public void testWritablesBSON() throws Exception {
+    PTypeFamily tf = WritableTypeFamily.getInstance();
+    PType<PageRankData> prType = PTypes.smile(PageRankData.class, tf);
     run(new MRPipeline(PageRankIT.class), prType, tf);
   }
-  
+
   public static PTable<String, PageRankData> pageRank(PTable<String, PageRankData> input, final float d) {
     PTypeFamily ptf = input.getTypeFamily();
-    PTable<String, Float> outbound = input.parallelDo(
-        new DoFn<Pair<String, PageRankData>, Pair<String, Float>>() {
+    PTable<String, Float> outbound = input.parallelDo(new DoFn<Pair<String, PageRankData>, Pair<String, Float>>() {
+      @Override
+      public void process(Pair<String, PageRankData> input, Emitter<Pair<String, Float>> emitter) {
+        PageRankData prd = input.second();
+        for (String link : prd.urls) {
+          emitter.emit(Pair.of(link, prd.propagatedScore()));
+        }
+      }
+    }, ptf.tableOf(ptf.strings(), ptf.floats()));
+
+    return input.cogroup(outbound).parallelDo(
+        new MapFn<Pair<String, Pair<Collection<PageRankData>, Collection<Float>>>, Pair<String, PageRankData>>() {
           @Override
-          public void process(Pair<String, PageRankData> input, Emitter<Pair<String, Float>> emitter) {
-            PageRankData prd = input.second();
-            for (String link : prd.urls) {
-              emitter.emit(Pair.of(link, prd.propagatedScore()));
+          public Pair<String, PageRankData> map(Pair<String, Pair<Collection<PageRankData>, Collection<Float>>> input) {
+            PageRankData prd = Iterables.getOnlyElement(input.second().first());
+            Collection<Float> propagatedScores = input.second().second();
+            float sum = 0.0f;
+            for (Float s : propagatedScores) {
+              sum += s;
             }
+            return Pair.of(input.first(), prd.next(d + (1.0f - d) * sum));
           }
-        }, ptf.tableOf(ptf.strings(), ptf.floats()));
-    
-    return input.cogroup(outbound).parallelDo(
-        new MapFn<Pair<String, Pair<Collection<PageRankData>, Collection<Float>>>, Pair<String, PageRankData>>() {
-              @Override
-              public Pair<String, PageRankData> map(Pair<String, Pair<Collection<PageRankData>, Collection<Float>>> input) {
-                PageRankData prd = Iterables.getOnlyElement(input.second().first());
-                Collection<Float> propagatedScores = input.second().second();
-                float sum = 0.0f;
-                for (Float s : propagatedScores) {
-                  sum += s;
-                }
-                return Pair.of(input.first(), prd.next(d + (1.0f - d)*sum));
-              }
-            }, input.getPTableType());
+        }, input.getPTableType());
   }
-  
+
   public static void run(Pipeline pipeline, PType<PageRankData> prType, PTypeFamily ptf) throws Exception {
     String urlInput = FileHelper.createTempCopyOf("urls.txt");
     PTable<String, PageRankData> scores = pipeline.readTextFile(urlInput)
@@ -139,27 +145,25 @@ public class PageRankIT {
             String[] urls = input.split("\\t");
             return Pair.of(urls[0], urls[1]);
           }
-        }, ptf.tableOf(ptf.strings(), ptf.strings()))
-        .groupByKey()
+        }, ptf.tableOf(ptf.strings(), ptf.strings())).groupByKey()
         .parallelDo(new MapFn<Pair<String, Iterable<String>>, Pair<String, PageRankData>>() {
-              @Override
-              public Pair<String, PageRankData> map(Pair<String, Iterable<String>> input) {
-                return Pair.of(input.first(), new PageRankData(1.0f, 0.0f, input.second()));
-              }
-            }, ptf.tableOf(ptf.strings(), prType));
-    
+          @Override
+          public Pair<String, PageRankData> map(Pair<String, Iterable<String>> input) {
+            return Pair.of(input.first(), new PageRankData(1.0f, 0.0f, input.second()));
+          }
+        }, ptf.tableOf(ptf.strings(), prType));
+
     Float delta = 1.0f;
     while (delta > 0.01) {
       scores = pageRank(scores, 0.5f);
       scores.materialize().iterator(); // force the write
-      delta = Iterables.getFirst(Aggregate.max(
-          scores.parallelDo(new MapFn<Pair<String, PageRankData>, Float>() {
-            @Override
-            public Float map(Pair<String, PageRankData> input) {
-              PageRankData prd = input.second();
-              return Math.abs(prd.score - prd.lastScore);
-            }
-          }, ptf.floats())).materialize(), null);
+      delta = Iterables.getFirst(Aggregate.max(scores.parallelDo(new MapFn<Pair<String, PageRankData>, Float>() {
+        @Override
+        public Float map(Pair<String, PageRankData> input) {
+          PageRankData prd = input.second();
+          return Math.abs(prd.score - prd.lastScore);
+        }
+      }, ptf.floats())).materialize(), null);
     }
     assertEquals(0.0048, delta, 0.001);
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/TermFrequencyIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/TermFrequencyIT.java b/crunch/src/it/java/org/apache/crunch/TermFrequencyIT.java
index d3c877b..e0d0b47 100644
--- a/crunch/src/it/java/org/apache/crunch/TermFrequencyIT.java
+++ b/crunch/src/it/java/org/apache/crunch/TermFrequencyIT.java
@@ -24,8 +24,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 
-import org.junit.Test;
-
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
@@ -34,89 +32,88 @@ import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.test.FileHelper;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Test;
 
 @SuppressWarnings("serial")
-public class TermFrequencyIT implements Serializable {  
-  
+public class TermFrequencyIT implements Serializable {
+
   @Test
   public void testTermFrequencyWithNoTransform() throws IOException {
     run(new MRPipeline(TermFrequencyIT.class), WritableTypeFamily.getInstance(), false);
   }
-  
+
   @Test
   public void testTermFrequencyWithTransform() throws IOException {
     run(new MRPipeline(TermFrequencyIT.class), WritableTypeFamily.getInstance(), true);
   }
-  
+
   @Test
   public void testTermFrequencyNoTransformInMemory() throws IOException {
-    run(MemPipeline.getInstance(), WritableTypeFamily.getInstance(), false);  
+    run(MemPipeline.getInstance(), WritableTypeFamily.getInstance(), false);
   }
 
   @Test
   public void testTermFrequencyWithTransformInMemory() throws IOException {
     run(MemPipeline.getInstance(), WritableTypeFamily.getInstance(), true);
   }
-  
 
   public void run(Pipeline pipeline, PTypeFamily typeFamily, boolean transformTF) throws IOException {
     String input = FileHelper.createTempCopyOf("docs.txt");
-    
+
     File transformedOutput = FileHelper.createOutputPath();
     File tfOutput = FileHelper.createOutputPath();
-    
+
     PCollection<String> docs = pipeline.readTextFile(input);
-    
+
     PTypeFamily ptf = docs.getTypeFamily();
-    
+
     /*
-     * Input: String
-     * Input title  text
+     * Input: String Input title text
      * 
-     * Output: PTable<Pair<String, String>, Long> 
-     * Pair<Pair<word, title>, count in title>
+     * Output: PTable<Pair<String, String>, Long> Pair<Pair<word, title>, count
+     * in title>
      */
     PTable<Pair<String, String>, Long> tf = Aggregate.count(docs.parallelDo("term document frequency",
         new DoFn<String, Pair<String, String>>() {
-      @Override
-      public void process(String doc, Emitter<Pair<String, String>> emitter) {
-        String[] kv = doc.split("\t");
-        String title = kv[0];
-        String text = kv[1];
-        for (String word : text.split("\\W+")) {
-          if(word.length() > 0) {
-            Pair<String, String> pair = Pair.of(word.toLowerCase(), title);
-            emitter.emit(pair);
+          @Override
+          public void process(String doc, Emitter<Pair<String, String>> emitter) {
+            String[] kv = doc.split("\t");
+            String title = kv[0];
+            String text = kv[1];
+            for (String word : text.split("\\W+")) {
+              if (word.length() > 0) {
+                Pair<String, String> pair = Pair.of(word.toLowerCase(), title);
+                emitter.emit(pair);
+              }
+            }
           }
-        }
-      }
-    }, ptf.pairs(ptf.strings(), ptf.strings())));
-    
-    if(transformTF) {
+        }, ptf.pairs(ptf.strings(), ptf.strings())));
+
+    if (transformTF) {
       /*
-       * Input: Pair<Pair<String, String>, Long>
-       * Pair<Pair<word, title>, count in title>
+       * Input: Pair<Pair<String, String>, Long> Pair<Pair<word, title>, count
+       * in title>
        * 
-       * Output: PTable<String, Pair<String, Long>>
-       * PTable<word, Pair<title, count in title>>
+       * Output: PTable<String, Pair<String, Long>> PTable<word, Pair<title,
+       * count in title>>
        */
       PTable<String, Pair<String, Long>> wordDocumentCountPair = tf.parallelDo("transform wordDocumentPairCount",
           new MapFn<Pair<Pair<String, String>, Long>, Pair<String, Pair<String, Long>>>() {
             @Override
             public Pair<String, Pair<String, Long>> map(Pair<Pair<String, String>, Long> input) {
-              Pair<String, String> wordDocumentPair = input.first();            
+              Pair<String, String> wordDocumentPair = input.first();
               return Pair.of(wordDocumentPair.first(), Pair.of(wordDocumentPair.second(), input.second()));
             }
-        }, ptf.tableOf(ptf.strings(), ptf.pairs(ptf.strings(), ptf.longs())));
-      
+          }, ptf.tableOf(ptf.strings(), ptf.pairs(ptf.strings(), ptf.longs())));
+
       pipeline.writeTextFile(wordDocumentCountPair, transformedOutput.getAbsolutePath());
     }
-    
+
     SourceTarget<String> st = At.textFile(tfOutput.getAbsolutePath());
     pipeline.write(tf, st);
-    
+
     pipeline.run();
-    
+
     // test the case we should see
     Iterable<String> lines = ((ReadableSourceTarget<String>) st).read(pipeline.getConfiguration());
     boolean passed = false;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/TextPairIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/TextPairIT.java b/crunch/src/it/java/org/apache/crunch/TextPairIT.java
index a42b6d5..7694bad 100644
--- a/crunch/src/it/java/org/apache/crunch/TextPairIT.java
+++ b/crunch/src/it/java/org/apache/crunch/TextPairIT.java
@@ -21,27 +21,26 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 
-import org.junit.Test;
-
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.From;
 import org.apache.crunch.test.FileHelper;
 import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
 
-public class TextPairIT  {
+public class TextPairIT {
 
   @Test
   public void testWritables() throws IOException {
     run(new MRPipeline(TextPairIT.class));
   }
-  
+
   private static final String CANARY = "Writables.STRING_TO_TEXT";
-  
+
   public static PCollection<Pair<String, String>> wordDuplicate(PCollection<String> words) {
     return words.parallelDo("my word duplicator", new DoFn<String, Pair<String, String>>() {
       public void process(String line, Emitter<Pair<String, String>> emitter) {
         for (String word : line.split("\\W+")) {
-          if(word.length() > 0) {
+          if (word.length() > 0) {
             Pair<String, String> pair = Pair.of(CANARY, word);
             emitter.emit(pair);
           }
@@ -49,12 +48,12 @@ public class TextPairIT  {
       }
     }, Writables.pairs(Writables.strings(), Writables.strings()));
   }
-  
+
   public void run(Pipeline pipeline) throws IOException {
     String input = FileHelper.createTempCopyOf("shakes.txt");
-        
+
     PCollection<String> shakespeare = pipeline.read(From.textFile(input));
-    Iterable<Pair<String, String>> lines = pipeline.materialize(wordDuplicate(shakespeare));    
+    Iterable<Pair<String, String>> lines = pipeline.materialize(wordDuplicate(shakespeare));
     boolean passed = false;
     for (Pair<String, String> line : lines) {
       if (line.first().contains(CANARY)) {
@@ -62,8 +61,8 @@ public class TextPairIT  {
         break;
       }
     }
-    
+
     pipeline.done();
     assertTrue(passed);
-  }  
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/TfIdfIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/TfIdfIT.java b/crunch/src/it/java/org/apache/crunch/TfIdfIT.java
index 2472449..f48de8c 100644
--- a/crunch/src/it/java/org/apache/crunch/TfIdfIT.java
+++ b/crunch/src/it/java/org/apache/crunch/TfIdfIT.java
@@ -28,9 +28,6 @@ import java.nio.charset.Charset;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.hadoop.fs.Path;
-import org.junit.Test;
-
 import org.apache.crunch.fn.MapKeysFn;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.seq.SeqFileSourceTarget;
@@ -38,14 +35,17 @@ import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.lib.Join;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
 @SuppressWarnings("serial")
-public class TfIdfIT implements Serializable {  
+public class TfIdfIT implements Serializable {
   // total number of documents, should calculate
   protected static final double N = 2;
-  
+
   @Test
   public void testWritablesSingleRun() throws IOException {
     run(new MRPipeline(TfIdfIT.class), WritableTypeFamily.getInstance(), true);
@@ -59,140 +59,143 @@ public class TfIdfIT implements Serializable {
   /**
    * This method should generate a TF-IDF score for the input.
    */
-  public PTable<String, Collection<Pair<String, Double>>>  generateTFIDF(PCollection<String> docs,
-      Path termFreqPath, PTypeFamily ptf) throws IOException {    
-    
+  public PTable<String, Collection<Pair<String, Double>>> generateTFIDF(PCollection<String> docs, Path termFreqPath,
+      PTypeFamily ptf) throws IOException {
+
     /*
-     * Input: String
-     * Input title  text
+     * Input: String Input title text
      * 
-     * Output: PTable<Pair<String, String>, Long> 
-     * Pair<Pair<word, title>, count in title>
+     * Output: PTable<Pair<String, String>, Long> Pair<Pair<word, title>, count
+     * in title>
      */
     PTable<Pair<String, String>, Long> tf = Aggregate.count(docs.parallelDo("term document frequency",
         new DoFn<String, Pair<String, String>>() {
-      @Override
-      public void process(String doc, Emitter<Pair<String, String>> emitter) {
-        String[] kv = doc.split("\t");
-        String title = kv[0];
-        String text = kv[1];
-        for (String word : text.split("\\W+")) {
-          if(word.length() > 0) {
-            Pair<String, String> pair = Pair.of(word.toLowerCase(), title);
-            emitter.emit(pair);
+          @Override
+          public void process(String doc, Emitter<Pair<String, String>> emitter) {
+            String[] kv = doc.split("\t");
+            String title = kv[0];
+            String text = kv[1];
+            for (String word : text.split("\\W+")) {
+              if (word.length() > 0) {
+                Pair<String, String> pair = Pair.of(word.toLowerCase(), title);
+                emitter.emit(pair);
+              }
+            }
           }
-        }
-      }
-    }, ptf.pairs(ptf.strings(), ptf.strings())));
-    
+        }, ptf.pairs(ptf.strings(), ptf.strings())));
+
     tf.write(new SeqFileSourceTarget<Pair<Pair<String, String>, Long>>(termFreqPath, tf.getPType()));
-    
+
     /*
-     * Input: Pair<Pair<String, String>, Long>
-     * Pair<Pair<word, title>, count in title>
+     * Input: Pair<Pair<String, String>, Long> Pair<Pair<word, title>, count in
+     * title>
      * 
-     * Output: PTable<String, Long>
-     * PTable<word, # of docs containing word>
+     * Output: PTable<String, Long> PTable<word, # of docs containing word>
      */
-    PTable<String, Long> n = Aggregate.count(tf.parallelDo("little n (# of docs contain word)",  
+    PTable<String, Long> n = Aggregate.count(tf.parallelDo("little n (# of docs contain word)",
         new DoFn<Pair<Pair<String, String>, Long>, String>() {
-      @Override
-      public void process(Pair<Pair<String, String>, Long> input,
-          Emitter<String> emitter) {
-        emitter.emit(input.first().first());
-      }
-    }, ptf.strings()));
-    
+          @Override
+          public void process(Pair<Pair<String, String>, Long> input, Emitter<String> emitter) {
+            emitter.emit(input.first().first());
+          }
+        }, ptf.strings()));
+
     /*
-     * Input: Pair<Pair<String, String>, Long>
-     * Pair<Pair<word, title>, count in title>
+     * Input: Pair<Pair<String, String>, Long> Pair<Pair<word, title>, count in
+     * title>
      * 
-     * Output: PTable<String, Pair<String, Long>>
-     * PTable<word, Pair<title, count in title>>
+     * Output: PTable<String, Pair<String, Long>> PTable<word, Pair<title, count
+     * in title>>
      */
-    PTable<String, Collection<Pair<String, Long>>> wordDocumentCountPair = tf.parallelDo("transform wordDocumentPairCount",
+    PTable<String, Collection<Pair<String, Long>>> wordDocumentCountPair = tf.parallelDo(
+        "transform wordDocumentPairCount",
         new DoFn<Pair<Pair<String, String>, Long>, Pair<String, Collection<Pair<String, Long>>>>() {
           Collection<Pair<String, Long>> buffer;
           String key;
+
           @Override
           public void process(Pair<Pair<String, String>, Long> input,
-        	  Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
+              Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
             Pair<String, String> wordDocumentPair = input.first();
-            if(!wordDocumentPair.first().equals(key)) {
+            if (!wordDocumentPair.first().equals(key)) {
               flush(emitter);
               key = wordDocumentPair.first();
               buffer = Lists.newArrayList();
             }
-            buffer.add(Pair.of(wordDocumentPair.second(), input.second()));            
+            buffer.add(Pair.of(wordDocumentPair.second(), input.second()));
           }
+
           protected void flush(Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
-            if(buffer != null) {
+            if (buffer != null) {
               emitter.emit(Pair.of(key, buffer));
               buffer = null;
             }
           }
+
           @Override
           public void cleanup(Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
             flush(emitter);
           }
-      }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.longs()))));
+        }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.longs()))));
 
     PTable<String, Pair<Long, Collection<Pair<String, Long>>>> joinedResults = Join.join(n, wordDocumentCountPair);
 
     /*
-     * Input: Pair<String, Pair<Long, Collection<Pair<String, Long>>>
-     * Pair<word, Pair<# of docs containing word, Collection<Pair<title, term frequency>>>
+     * Input: Pair<String, Pair<Long, Collection<Pair<String, Long>>> Pair<word,
+     * Pair<# of docs containing word, Collection<Pair<title, term frequency>>>
      * 
-     * Output: Pair<String, Collection<Pair<String, Double>>>
-     * Pair<word, Collection<Pair<title, tfidf>>>
+     * Output: Pair<String, Collection<Pair<String, Double>>> Pair<word,
+     * Collection<Pair<title, tfidf>>>
      */
-    return joinedResults.parallelDo("calculate tfidf",
-        new MapFn<Pair<String, Pair<Long, Collection<Pair<String, Long>>>>, Pair<String, Collection<Pair<String, Double>>>>() {
-          @Override
-          public Pair<String, Collection<Pair<String, Double>>> map(Pair<String, Pair<Long, Collection<Pair<String, Long>>>> input) {
-            Collection<Pair<String, Double>> tfidfs = Lists.newArrayList();
-            String word = input.first();
-            double n = input.second().first();
-            double idf = Math.log(N / n);
-            for(Pair<String, Long> tf : input.second().second()) {
-              double tfidf = tf.second() * idf;
-              tfidfs.add(Pair.of(tf.first(), tfidf));
-            }
-            return Pair.of(word, tfidfs);
-          }
-      
-    }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.doubles()))));
+    return joinedResults
+        .parallelDo(
+            "calculate tfidf",
+            new MapFn<Pair<String, Pair<Long, Collection<Pair<String, Long>>>>, Pair<String, Collection<Pair<String, Double>>>>() {
+              @Override
+              public Pair<String, Collection<Pair<String, Double>>> map(
+                  Pair<String, Pair<Long, Collection<Pair<String, Long>>>> input) {
+                Collection<Pair<String, Double>> tfidfs = Lists.newArrayList();
+                String word = input.first();
+                double n = input.second().first();
+                double idf = Math.log(N / n);
+                for (Pair<String, Long> tf : input.second().second()) {
+                  double tfidf = tf.second() * idf;
+                  tfidfs.add(Pair.of(tf.first(), tfidf));
+                }
+                return Pair.of(word, tfidfs);
+              }
+
+            }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.doubles()))));
   }
-  
+
   public void run(Pipeline pipeline, PTypeFamily typeFamily, boolean singleRun) throws IOException {
     File input = File.createTempFile("docs", "txt");
     input.deleteOnExit();
     Files.copy(newInputStreamSupplier(getResource("docs.txt")), input);
-    
+
     String outputPath1 = getOutput();
     String outputPath2 = getOutput();
-    
+
     Path tfPath = new Path(getOutput("termfreq"));
-    
+
     PCollection<String> docs = pipeline.readTextFile(input.getAbsolutePath());
-        
-    PTable<String, Collection<Pair<String, Double>>> results =
-        generateTFIDF(docs, tfPath, typeFamily);
+
+    PTable<String, Collection<Pair<String, Double>>> results = generateTFIDF(docs, tfPath, typeFamily);
     pipeline.writeTextFile(results, outputPath1);
     if (!singleRun) {
       pipeline.run();
     }
-    
+
     PTable<String, Collection<Pair<String, Double>>> uppercased = results.parallelDo(
         new MapKeysFn<String, String, Collection<Pair<String, Double>>>() {
           @Override
           public String map(String k1) {
             return k1.toUpperCase();
-          } 
+          }
         }, results.getPTableType());
     pipeline.writeTextFile(uppercased, outputPath2);
     pipeline.done();
-    
+
     // Check the lowercase version...
     File outputFile = new File(outputPath1, "part-r-00000");
     outputFile.deleteOnExit();
@@ -205,7 +208,7 @@ public class TfIdfIT implements Serializable {
       }
     }
     assertTrue(passed);
-    
+
     // ...and the uppercase version
     outputFile = new File(outputPath2, "part-r-00000");
     outputFile.deleteOnExit();
@@ -219,11 +222,11 @@ public class TfIdfIT implements Serializable {
     }
     assertTrue(passed);
   }
-  
+
   public static String getOutput() throws IOException {
     return getOutput("output");
   }
-  
+
   public static String getOutput(String prefix) throws IOException {
     File output = File.createTempFile(prefix, "");
     String path = output.getAbsolutePath();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java b/crunch/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java
index 9e65244..39919ba 100644
--- a/crunch/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java
+++ b/crunch/src/it/java/org/apache/crunch/TupleNClassCastBugIT.java
@@ -26,12 +26,12 @@ import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.List;
 
-import org.junit.Test;
-
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Test;
+
 import com.google.common.io.Files;
 
 public class TupleNClassCastBugIT {
@@ -80,8 +80,10 @@ public class TupleNClassCastBugIT {
     pipeline.writeTextFile(mapGroupDo(docLines, typeFamily), outputPath);
     pipeline.done();
 
-    // *** We are not directly testing the output, we are looking for a ClassCastException
-    // *** which is thrown in a different thread during the reduce phase. If all is well
+    // *** We are not directly testing the output, we are looking for a
+    // ClassCastException
+    // *** which is thrown in a different thread during the reduce phase. If all
+    // is well
     // *** the file will exist and have six lines. Otherwise the bug is present.
     File outputFile = new File(output, "part-r-00000");
     List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/WordCountHBaseIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/WordCountHBaseIT.java b/crunch/src/it/java/org/apache/crunch/WordCountHBaseIT.java
index 1fa922c..9ff171d 100644
--- a/crunch/src/it/java/org/apache/crunch/WordCountHBaseIT.java
+++ b/crunch/src/it/java/org/apache/crunch/WordCountHBaseIT.java
@@ -30,7 +30,13 @@ import java.util.jar.JarOutputStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.hbase.HBaseSourceTarget;
+import org.apache.crunch.io.hbase.HBaseTarget;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -44,17 +50,10 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.filecache.DistributedCache;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.hbase.HBaseSourceTarget;
-import org.apache.crunch.io.hbase.HBaseTarget;
-import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.types.writable.Writables;
-import org.apache.crunch.util.DistCache;
 import com.google.common.io.ByteStreams;
 
 public class WordCountHBaseIT {
@@ -64,7 +63,7 @@ public class WordCountHBaseIT {
   private static final byte[] WORD_COLFAM = Bytes.toBytes("cf");
 
   private HBaseTestingUtility hbaseTestUtil = new HBaseTestingUtility();
-  
+
   @SuppressWarnings("serial")
   public static PCollection<Put> wordCount(PTable<ImmutableBytesWritable, Result> words) {
     PTable<String, Long> counts = Aggregate.count(words.parallelDo(
@@ -78,17 +77,15 @@ public class WordCountHBaseIT {
           }
         }, words.getTypeFamily().strings()));
 
-    return counts.parallelDo("convert to put",
-        new DoFn<Pair<String, Long>, Put>() {
-          @Override
-          public void process(Pair<String, Long> input, Emitter<Put> emitter) {
-            Put put = new Put(Bytes.toBytes(input.first()));
-            put.add(COUNTS_COLFAM, null,
-                Bytes.toBytes(input.second()));
-            emitter.emit(put);
-          }
+    return counts.parallelDo("convert to put", new DoFn<Pair<String, Long>, Put>() {
+      @Override
+      public void process(Pair<String, Long> input, Emitter<Put> emitter) {
+        Put put = new Put(Bytes.toBytes(input.first()));
+        put.add(COUNTS_COLFAM, null, Bytes.toBytes(input.second()));
+        emitter.emit(put);
+      }
 
-        }, Writables.writables(Put.class));
+    }, Writables.writables(Put.class));
   }
 
   @SuppressWarnings("deprecation")
@@ -107,7 +104,7 @@ public class WordCountHBaseIT {
     hbaseTestUtil.startMiniZKCluster();
     hbaseTestUtil.startMiniCluster();
     hbaseTestUtil.startMiniMapReduceCluster(1);
-    
+
     // For Hadoop-2.0.0, we have to do a bit more work.
     if (TaskAttemptContext.class.isInterface()) {
       conf = hbaseTestUtil.getConfiguration();
@@ -119,7 +116,7 @@ public class WordCountHBaseIT {
         fs.copyFromLocalFile(jarFile.getPath(), target);
         DistributedCache.addFileToClassPath(target, conf, fs);
       }
-    
+
       // Create a programmatic container for this jar.
       JarOutputStream jos = new JarOutputStream(new FileOutputStream("WordCountHBaseTest.jar"));
       File baseDir = new File("target/test-classes");
@@ -134,7 +131,7 @@ public class WordCountHBaseIT {
       DistributedCache.addFileToClassPath(target, conf, fs);
     }
   }
-  
+
   private void jarUp(JarOutputStream jos, File baseDir, String classDir) throws IOException {
     File file = new File(baseDir, classDir);
     JarEntry e = new JarEntry(classDir);
@@ -143,7 +140,7 @@ public class WordCountHBaseIT {
     ByteStreams.copy(new FileInputStream(file), jos);
     jos.closeEntry();
   }
-  
+
   @Test
   public void testWordCount() throws IOException {
     run(new MRPipeline(WordCountHBaseIT.class, hbaseTestUtil.getConfiguration()));
@@ -155,21 +152,19 @@ public class WordCountHBaseIT {
     hbaseTestUtil.shutdownMiniCluster();
     hbaseTestUtil.shutdownMiniZKCluster();
   }
-  
+
   public void run(Pipeline pipeline) throws IOException {
-    
+
     Random rand = new Random();
     int postFix = Math.abs(rand.nextInt());
     String inputTableName = "crunch_words_" + postFix;
     String outputTableName = "crunch_counts_" + postFix;
 
     try {
-      
-      HTable inputTable = hbaseTestUtil.createTable(Bytes.toBytes(inputTableName),
-          WORD_COLFAM);
-      HTable outputTable = hbaseTestUtil.createTable(Bytes.toBytes(outputTableName),
-          COUNTS_COLFAM);
-  
+
+      HTable inputTable = hbaseTestUtil.createTable(Bytes.toBytes(inputTableName), WORD_COLFAM);
+      HTable outputTable = hbaseTestUtil.createTable(Bytes.toBytes(outputTableName), COUNTS_COLFAM);
+
       int key = 0;
       key = put(inputTable, key, "cat");
       key = put(inputTable, key, "cat");
@@ -180,26 +175,26 @@ public class WordCountHBaseIT {
       PTable<ImmutableBytesWritable, Result> shakespeare = pipeline.read(source);
       pipeline.write(wordCount(shakespeare), new HBaseTarget(outputTableName));
       pipeline.done();
-      
+
       assertIsLong(outputTable, "cat", 2);
-      assertIsLong(outputTable, "dog", 1);    
+      assertIsLong(outputTable, "dog", 1);
     } finally {
       // not quite sure...
     }
   }
-  
+
   protected int put(HTable table, int key, String value) throws IOException {
     Put put = new Put(Bytes.toBytes(key));
-    put.add(WORD_COLFAM, null, Bytes.toBytes(value));    
+    put.add(WORD_COLFAM, null, Bytes.toBytes(value));
     table.put(put);
     return key + 1;
   }
-  
+
   protected void assertIsLong(HTable table, String key, long i) throws IOException {
     Get get = new Get(Bytes.toBytes(key));
     get.addColumn(COUNTS_COLFAM, null);
     Result result = table.get(get);
-    
+
     byte[] rawCount = result.getValue(COUNTS_COLFAM, null);
     assertTrue(rawCount != null);
     assertEquals(new Long(i), new Long(Bytes.toLong(rawCount)));


Mime
View raw message