incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject [2/10] Format all sources according to formatting profile
Date Sat, 14 Jul 2012 18:14:55 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/WordCountIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/WordCountIT.java b/crunch/src/it/java/org/apache/crunch/WordCountIT.java
index fce5d65..ba457ee 100644
--- a/crunch/src/it/java/org/apache/crunch/WordCountIT.java
+++ b/crunch/src/it/java/org/apache/crunch/WordCountIT.java
@@ -25,8 +25,6 @@ import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.List;
 
-import org.junit.Test;
-
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
 import org.apache.crunch.io.To;
@@ -35,6 +33,8 @@ import org.apache.crunch.test.FileHelper;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Test;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
@@ -119,11 +119,10 @@ public class WordCountIT {
 
     PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, tf.strings()));
     PTable<String, Long> wordCount = wordCount(shakespeare, tf);
-    List<Pair<String, Long>> top5 = Lists.newArrayList(Aggregate.top(wordCount, 5, true)
-        .materialize());
+    List<Pair<String, Long>> top5 = Lists.newArrayList(Aggregate.top(wordCount, 5, true).materialize());
     assertEquals(
-        ImmutableList.of(Pair.of("", 1470L), Pair.of("the", 620L), Pair.of("and", 427L),
-            Pair.of("of", 396L), Pair.of("to", 367L)), top5);
+        ImmutableList.of(Pair.of("", 1470L), Pair.of("the", 620L), Pair.of("and", 427L), Pair.of("of", 396L),
+            Pair.of("to", 367L)), top5);
   }
 
   public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
@@ -143,8 +142,7 @@ public class WordCountIT {
       File substrCount = File.createTempFile("substr", "");
       String substrPath = substrCount.getAbsolutePath();
       substrCount.delete();
-      PTable<String, Long> we = substr(wordCount).groupByKey().combineValues(
-          CombineFn.<String> SUM_LONGS());
+      PTable<String, Long> we = substr(wordCount).groupByKey().combineValues(CombineFn.<String> SUM_LONGS());
       pipeline.writeTextFile(we, substrPath);
     }
     PipelineResult res = pipeline.done();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java b/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
index b725558..f4a0a90 100644
--- a/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
+++ b/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
@@ -25,12 +25,12 @@ import java.util.List;
 
 import org.apache.crunch.PCollection;
 import org.apache.crunch.Pipeline;
+import org.junit.Test;
+
 import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableList;
 import com.google.common.io.Files;
 
-import org.junit.Test;
-
 public class MemPipelineFileWritingIT {
   @Test
   public void testMemPipelineFileWriter() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java b/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
index e2fa9a2..1bb3305 100644
--- a/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
+++ b/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
@@ -29,13 +29,6 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTableKeyValueIT;
 import org.apache.crunch.Pipeline;
@@ -48,114 +41,116 @@ import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
 import com.google.common.collect.Lists;
 
 @RunWith(value = Parameterized.class)
 public class UnionCollectionIT {
 
-	private static final Log LOG = LogFactory.getLog(UnionCollectionIT.class);
+  private static final Log LOG = LogFactory.getLog(UnionCollectionIT.class);
 
-	private PTypeFamily typeFamily;
-	private Pipeline pipeline;
-	private PCollection<String> union;
+  private PTypeFamily typeFamily;
+  private Pipeline pipeline;
+  private PCollection<String> union;
 
-	private ArrayList<String> EXPECTED = Lists.newArrayList("a", "a", "b", "c", "c", "d", "e");
+  private ArrayList<String> EXPECTED = Lists.newArrayList("a", "a", "b", "c", "c", "d", "e");
 
-	@Before
-	@SuppressWarnings("unchecked")
-	public void setUp() throws IOException {
-		String inputFile1 = FileHelper.createTempCopyOf("set1.txt");
-		String inputFile2 = FileHelper.createTempCopyOf("set2.txt");
+  @Before
+  @SuppressWarnings("unchecked")
+  public void setUp() throws IOException {
+    String inputFile1 = FileHelper.createTempCopyOf("set1.txt");
+    String inputFile2 = FileHelper.createTempCopyOf("set2.txt");
 
-		PCollection<String> firstCollection = pipeline.read(At.textFile(inputFile1,
-				typeFamily.strings()));
-		PCollection<String> secondCollection = pipeline.read(At.textFile(inputFile2,
-				typeFamily.strings()));
+    PCollection<String> firstCollection = pipeline.read(At.textFile(inputFile1, typeFamily.strings()));
+    PCollection<String> secondCollection = pipeline.read(At.textFile(inputFile2, typeFamily.strings()));
 
-		LOG.info("Test fixture: [" + pipeline.getClass().getSimpleName() + " : "
-				+ typeFamily.getClass().getSimpleName() + "]  First: "
-				+ Lists.newArrayList(firstCollection.materialize().iterator()) + ", Second: "
-				+ Lists.newArrayList(secondCollection.materialize().iterator()));
+    LOG.info("Test fixture: [" + pipeline.getClass().getSimpleName() + " : " + typeFamily.getClass().getSimpleName()
+        + "]  First: " + Lists.newArrayList(firstCollection.materialize().iterator()) + ", Second: "
+        + Lists.newArrayList(secondCollection.materialize().iterator()));
 
-		union = secondCollection.union(firstCollection);
-	}
+    union = secondCollection.union(firstCollection);
+  }
 
-	@After
-	public void tearDown() {
-		pipeline.done();
-	}
+  @After
+  public void tearDown() {
+    pipeline.done();
+  }
 
-	@Parameters
-	public static Collection<Object[]> data() throws IOException {
-		Object[][] data = new Object[][] {
-				{ WritableTypeFamily.getInstance(), new MRPipeline(PTableKeyValueIT.class) },
-				{ WritableTypeFamily.getInstance(), MemPipeline.getInstance() },
-				{ AvroTypeFamily.getInstance(), new MRPipeline(PTableKeyValueIT.class) },
-				{ AvroTypeFamily.getInstance(), MemPipeline.getInstance() } };
-		return Arrays.asList(data);
-	}
+  @Parameters
+  public static Collection<Object[]> data() throws IOException {
+    Object[][] data = new Object[][] { { WritableTypeFamily.getInstance(), new MRPipeline(PTableKeyValueIT.class) },
+        { WritableTypeFamily.getInstance(), MemPipeline.getInstance() },
+        { AvroTypeFamily.getInstance(), new MRPipeline(PTableKeyValueIT.class) },
+        { AvroTypeFamily.getInstance(), MemPipeline.getInstance() } };
+    return Arrays.asList(data);
+  }
 
-	public UnionCollectionIT(PTypeFamily typeFamily, Pipeline pipeline) {
-		this.typeFamily = typeFamily;
-		this.pipeline = pipeline;
-	}
+  public UnionCollectionIT(PTypeFamily typeFamily, Pipeline pipeline) {
+    this.typeFamily = typeFamily;
+    this.pipeline = pipeline;
+  }
 
-	@Test
-	public void unionMaterializeShouldNotThrowNPE() {
-		checkMaterialized(union.materialize());
-		checkMaterialized(pipeline.materialize(union));
-	}
+  @Test
+  public void unionMaterializeShouldNotThrowNPE() {
+    checkMaterialized(union.materialize());
+    checkMaterialized(pipeline.materialize(union));
+  }
 
-	private void checkMaterialized(Iterable<String> materialized) {
+  private void checkMaterialized(Iterable<String> materialized) {
 
-		List<String> materializedValues = Lists.newArrayList(materialized.iterator());
-		Collections.sort(materializedValues);
-		LOG.info("Materialized union: " + materializedValues);
+    List<String> materializedValues = Lists.newArrayList(materialized.iterator());
+    Collections.sort(materializedValues);
+    LOG.info("Materialized union: " + materializedValues);
 
-		assertEquals(EXPECTED, materializedValues);
-	}
+    assertEquals(EXPECTED, materializedValues);
+  }
 
-	@Test
-	public void unionWriteShouldNotThrowNPE() throws IOException {
+  @Test
+  public void unionWriteShouldNotThrowNPE() throws IOException {
 
-		File outputPath1 = FileHelper.createOutputPath();
-		File outputPath2 = FileHelper.createOutputPath();
-		File outputPath3 = FileHelper.createOutputPath();
+    File outputPath1 = FileHelper.createOutputPath();
+    File outputPath2 = FileHelper.createOutputPath();
+    File outputPath3 = FileHelper.createOutputPath();
 
-		if (typeFamily == AvroTypeFamily.getInstance()) {
-			union.write(To.avroFile(outputPath1.getAbsolutePath()));
-			pipeline.write(union, To.avroFile(outputPath2.getAbsolutePath()));
+    if (typeFamily == AvroTypeFamily.getInstance()) {
+      union.write(To.avroFile(outputPath1.getAbsolutePath()));
+      pipeline.write(union, To.avroFile(outputPath2.getAbsolutePath()));
 
-			pipeline.run();
+      pipeline.run();
 
-			checkFileContents(outputPath1.getAbsolutePath());
-			checkFileContents(outputPath2.getAbsolutePath());
+      checkFileContents(outputPath1.getAbsolutePath());
+      checkFileContents(outputPath2.getAbsolutePath());
 
-		} else {
+    } else {
 
-			union.write(To.textFile(outputPath1.getAbsolutePath()));
-			pipeline.write(union, To.textFile(outputPath2.getAbsolutePath()));
-			pipeline.writeTextFile(union, outputPath3.getAbsolutePath());
+      union.write(To.textFile(outputPath1.getAbsolutePath()));
+      pipeline.write(union, To.textFile(outputPath2.getAbsolutePath()));
+      pipeline.writeTextFile(union, outputPath3.getAbsolutePath());
 
-			pipeline.run();
+      pipeline.run();
 
-			checkFileContents(outputPath1.getAbsolutePath());
-			checkFileContents(outputPath2.getAbsolutePath());
-			checkFileContents(outputPath3.getAbsolutePath());
-		}
+      checkFileContents(outputPath1.getAbsolutePath());
+      checkFileContents(outputPath2.getAbsolutePath());
+      checkFileContents(outputPath3.getAbsolutePath());
+    }
 
-	}
+  }
 
-	private void checkFileContents(String filePath) throws IOException {
+  private void checkFileContents(String filePath) throws IOException {
 
-		List<String> fileContentValues = (typeFamily != AvroTypeFamily.getInstance() || !(pipeline instanceof MRPipeline)) ? Lists
-				.newArrayList(pipeline.read(At.textFile(filePath, typeFamily.strings())).materialize()
-						.iterator()) : Lists.newArrayList(pipeline.read(At.avroFile(filePath, Avros.strings()))
-				.materialize().iterator());
+    List<String> fileContentValues = (typeFamily != AvroTypeFamily.getInstance() || !(pipeline instanceof MRPipeline)) ? Lists
+        .newArrayList(pipeline.read(At.textFile(filePath, typeFamily.strings())).materialize().iterator()) : Lists
+        .newArrayList(pipeline.read(At.avroFile(filePath, Avros.strings())).materialize().iterator());
 
-		Collections.sort(fileContentValues);
+    Collections.sort(fileContentValues);
 
-		LOG.info("Saved Union: " + fileContentValues);
-		assertEquals(EXPECTED, fileContentValues);
-	}
+    LOG.info("Saved Union: " + fileContentValues);
+    assertEquals(EXPECTED, fileContentValues);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java b/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
index df05aa5..0f1a45e 100644
--- a/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
+++ b/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
@@ -24,58 +24,59 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.IOException;
 
+import org.apache.crunch.io.text.TextFileReaderFactory;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 
-import org.apache.crunch.io.text.TextFileReaderFactory;
-import org.apache.crunch.test.FileHelper;
-import org.apache.crunch.types.writable.Writables;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
-
 public class CompositePathIterableIT {
 
-  
   @Test
-  public void testCreate_FilePresent() throws IOException{
+  public void testCreate_FilePresent() throws IOException {
     String inputFilePath = FileHelper.createTempCopyOf("set1.txt");
     Configuration conf = new Configuration();
     LocalFileSystem local = FileSystem.getLocal(conf);
-    
-    Iterable<String> iterable = CompositePathIterable.create(local, new Path(inputFilePath), new TextFileReaderFactory<String>(Writables.strings(), conf));
-    
+
+    Iterable<String> iterable = CompositePathIterable.create(local, new Path(inputFilePath),
+        new TextFileReaderFactory<String>(Writables.strings(), conf));
+
     assertEquals(Lists.newArrayList("b", "c", "a", "e"), Lists.newArrayList(iterable));
-    
+
   }
-  
+
   @Test
-  public void testCreate_DirectoryPresentButNoFiles() throws IOException{
+  public void testCreate_DirectoryPresentButNoFiles() throws IOException {
     String inputFilePath = Files.createTempDir().getAbsolutePath();
- 
+
     Configuration conf = new Configuration();
     LocalFileSystem local = FileSystem.getLocal(conf);
-    
-    Iterable<String> iterable = CompositePathIterable.create(local, new Path(inputFilePath), new TextFileReaderFactory<String>(Writables.strings(), conf));
-    
+
+    Iterable<String> iterable = CompositePathIterable.create(local, new Path(inputFilePath),
+        new TextFileReaderFactory<String>(Writables.strings(), conf));
+
     assertTrue(Lists.newArrayList(iterable).isEmpty());
   }
-  
-  @Test(expected=IOException.class)
-  public void testCreate_DirectoryNotPresent() throws IOException{
+
+  @Test(expected = IOException.class)
+  public void testCreate_DirectoryNotPresent() throws IOException {
     File inputFileDir = Files.createTempDir();
     inputFileDir.delete();
-    
+
     // Sanity check
     assertFalse(inputFileDir.exists());
-    
+
     Configuration conf = new Configuration();
     LocalFileSystem local = FileSystem.getLocal(conf);
-    
-    CompositePathIterable.create(local, new Path(inputFileDir.getAbsolutePath()), new TextFileReaderFactory<String>(Writables.strings(), conf));
+
+    CompositePathIterable.create(local, new Path(inputFileDir.getAbsolutePath()), new TextFileReaderFactory<String>(
+        Writables.strings(), conf));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
index e25aec4..603086c 100644
--- a/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
+++ b/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
@@ -32,10 +32,6 @@ import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.ReflectData;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
 import org.apache.crunch.PCollection;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
@@ -43,111 +39,103 @@ import org.apache.crunch.io.At;
 import org.apache.crunch.io.avro.AvroFileReaderFactoryTest.PojoPerson;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.types.avro.Avros;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
 import com.google.common.collect.Lists;
 
 @SuppressWarnings("serial")
 public class AvroFileSourceTargetIT implements Serializable {
 
-	private transient File avroFile;
-
-	@Before
-	public void setUp() throws IOException {
-		avroFile = File.createTempFile("test", ".avro");
-	}
-
-	@After
-	public void tearDown() {
-		avroFile.delete();
-	}
-
-	private void populateGenericFile(List<GenericRecord> genericRecords,
-			Schema schema) throws IOException {
-		FileOutputStream outputStream = new FileOutputStream(this.avroFile);
-		GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(
-				schema);
-
-		DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(
-				genericDatumWriter);
-		dataFileWriter.create(schema, outputStream);
-
-		for (GenericRecord record : genericRecords) {
-			dataFileWriter.append(record);
-		}
-
-		dataFileWriter.close();
-		outputStream.close();
-
-	}
-
-	@Test
-	public void testSpecific() throws IOException {
-		GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
-		savedRecord.put("name", "John Doe");
-		savedRecord.put("age", 42);
-		savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-		populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
-
-		Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class);
-		PCollection<Person> genericCollection = pipeline.read(At.avroFile(
-				avroFile.getAbsolutePath(), Avros.records(Person.class)));
-
-		List<Person> personList = Lists.newArrayList(genericCollection
-				.materialize());
-
-		Person expectedPerson = new Person();
-		expectedPerson.setName("John Doe");
-		expectedPerson.setAge(42);
-
-		List<CharSequence> siblingNames = Lists.newArrayList();
-		siblingNames.add("Jimmy");
-		siblingNames.add("Jane");
-		expectedPerson.setSiblingnames(siblingNames);
-
-		assertEquals(Lists.newArrayList(expectedPerson),
-				Lists.newArrayList(personList));
-	}
-
-	@Test
-	public void testGeneric() throws IOException {
-		String genericSchemaJson = Person.SCHEMA$.toString().replace("Person",
-				"GenericPerson");
-		Schema genericPersonSchema = new Schema.Parser()
-				.parse(genericSchemaJson);
-		GenericRecord savedRecord = new GenericData.Record(genericPersonSchema);
-		savedRecord.put("name", "John Doe");
-		savedRecord.put("age", 42);
-		savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-		populateGenericFile(Lists.newArrayList(savedRecord),
-				genericPersonSchema);
-
-		Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class);
-		PCollection<Record> genericCollection = pipeline
-				.read(At.avroFile(avroFile.getAbsolutePath(),
-						Avros.generics(genericPersonSchema)));
-
-		List<Record> recordList = Lists.newArrayList(genericCollection
-				.materialize());
-
-		assertEquals(Lists.newArrayList(savedRecord),
-				Lists.newArrayList(recordList));
-	}
-
-	@Test
-	public void testReflect() throws IOException {
-		Schema pojoPersonSchema = ReflectData.get().getSchema(PojoPerson.class);
-		GenericRecord savedRecord = new GenericData.Record(pojoPersonSchema);
-		savedRecord.put("name", "John Doe");
-		populateGenericFile(Lists.newArrayList(savedRecord), pojoPersonSchema);
-
-		Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class);
-		PCollection<PojoPerson> personCollection = pipeline.read(At.avroFile(
-				avroFile.getAbsolutePath(), Avros.reflects(PojoPerson.class)));
-
-		List<PojoPerson> recordList = Lists.newArrayList(personCollection
-				.materialize());
-
-		assertEquals(1, recordList.size());
-		PojoPerson person = recordList.get(0);
-		assertEquals("John Doe", person.getName());
-	}
+  private transient File avroFile;
+
+  @Before
+  public void setUp() throws IOException {
+    avroFile = File.createTempFile("test", ".avro");
+  }
+
+  @After
+  public void tearDown() {
+    avroFile.delete();
+  }
+
+  private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException {
+    FileOutputStream outputStream = new FileOutputStream(this.avroFile);
+    GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(schema);
+
+    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter);
+    dataFileWriter.create(schema, outputStream);
+
+    for (GenericRecord record : genericRecords) {
+      dataFileWriter.append(record);
+    }
+
+    dataFileWriter.close();
+    outputStream.close();
+
+  }
+
+  @Test
+  public void testSpecific() throws IOException {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class);
+    PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+        Avros.records(Person.class)));
+
+    List<Person> personList = Lists.newArrayList(genericCollection.materialize());
+
+    Person expectedPerson = new Person();
+    expectedPerson.setName("John Doe");
+    expectedPerson.setAge(42);
+
+    List<CharSequence> siblingNames = Lists.newArrayList();
+    siblingNames.add("Jimmy");
+    siblingNames.add("Jane");
+    expectedPerson.setSiblingnames(siblingNames);
+
+    assertEquals(Lists.newArrayList(expectedPerson), Lists.newArrayList(personList));
+  }
+
+  @Test
+  public void testGeneric() throws IOException {
+    String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", "GenericPerson");
+    Schema genericPersonSchema = new Schema.Parser().parse(genericSchemaJson);
+    GenericRecord savedRecord = new GenericData.Record(genericPersonSchema);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), genericPersonSchema);
+
+    Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class);
+    PCollection<Record> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+        Avros.generics(genericPersonSchema)));
+
+    List<Record> recordList = Lists.newArrayList(genericCollection.materialize());
+
+    assertEquals(Lists.newArrayList(savedRecord), Lists.newArrayList(recordList));
+  }
+
+  @Test
+  public void testReflect() throws IOException {
+    Schema pojoPersonSchema = ReflectData.get().getSchema(PojoPerson.class);
+    GenericRecord savedRecord = new GenericData.Record(pojoPersonSchema);
+    savedRecord.put("name", "John Doe");
+    populateGenericFile(Lists.newArrayList(savedRecord), pojoPersonSchema);
+
+    Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class);
+    PCollection<PojoPerson> personCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+        Avros.reflects(PojoPerson.class)));
+
+    List<PojoPerson> recordList = Lists.newArrayList(personCollection.materialize());
+
+    assertEquals(1, recordList.size());
+    PojoPerson person = recordList.get(0);
+    assertEquals("John Doe", person.getName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
index f5c9902..2c6cbd7 100644
--- a/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
+++ b/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
@@ -23,93 +23,90 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 
-import org.junit.Test;
-
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.FileHelper;
 import org.apache.crunch.types.avro.Avros;
+import org.junit.Test;
+
 import com.google.common.collect.Lists;
 
 public class AvroReflectIT implements Serializable {
 
-	static class StringWrapper {
-		private String value;
-
-		public StringWrapper() {
-			this(null);
-		}
-
-		public StringWrapper(String value) {
-			this.value = value;
-		}
-
-		public String getValue() {
-			return value;
-		}
-
-		public void setValue(String value) {
-			this.value = value;
-		}
-
-		@Override
-		public String toString() {
-			return String.format("<StringWrapper(%s)>", value);
-		}
-
-		@Override
-		public int hashCode() {
-			final int prime = 31;
-			int result = 1;
-			result = prime * result + ((value == null) ? 0 : value.hashCode());
-			return result;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (this == obj)
-				return true;
-			if (obj == null)
-				return false;
-			if (getClass() != obj.getClass())
-				return false;
-			StringWrapper other = (StringWrapper) obj;
-			if (value == null) {
-				if (other.value != null)
-					return false;
-			} else if (!value.equals(other.value))
-				return false;
-			return true;
-		}
-
-	}
-
-	@Test
-	public void testReflection() throws IOException {
-		Pipeline pipeline = new MRPipeline(AvroReflectIT.class);
-		PCollection<StringWrapper> stringWrapperCollection = pipeline
-				.readTextFile(FileHelper.createTempCopyOf("set1.txt"))
-				.parallelDo(new MapFn<String, StringWrapper>() {
-
-					@Override
-					public StringWrapper map(String input) {
-						StringWrapper stringWrapper = new StringWrapper();
-						stringWrapper.setValue(input);
-						return stringWrapper;
-					}
-				}, Avros.reflects(StringWrapper.class));
-
-		List<StringWrapper> stringWrappers = Lists
-				.newArrayList(stringWrapperCollection.materialize());
-
-		pipeline.done();
-
-		assertEquals(Lists.newArrayList(new StringWrapper("b"),
-				new StringWrapper("c"), new StringWrapper("a"),
-				new StringWrapper("e")), stringWrappers);
-
-	}
+  static class StringWrapper {
+    private String value;
+
+    public StringWrapper() {
+      this(null);
+    }
+
+    public StringWrapper(String value) {
+      this.value = value;
+    }
+
+    public String getValue() {
+      return value;
+    }
+
+    public void setValue(String value) {
+      this.value = value;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("<StringWrapper(%s)>", value);
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((value == null) ? 0 : value.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      StringWrapper other = (StringWrapper) obj;
+      if (value == null) {
+        if (other.value != null)
+          return false;
+      } else if (!value.equals(other.value))
+        return false;
+      return true;
+    }
+
+  }
+
+  @Test
+  public void testReflection() throws IOException {
+    Pipeline pipeline = new MRPipeline(AvroReflectIT.class);
+    PCollection<StringWrapper> stringWrapperCollection = pipeline.readTextFile(FileHelper.createTempCopyOf("set1.txt"))
+        .parallelDo(new MapFn<String, StringWrapper>() {
+
+          @Override
+          public StringWrapper map(String input) {
+            StringWrapper stringWrapper = new StringWrapper();
+            stringWrapper.setValue(input);
+            return stringWrapper;
+          }
+        }, Avros.reflects(StringWrapper.class));
+
+    List<StringWrapper> stringWrappers = Lists.newArrayList(stringWrapperCollection.materialize());
+
+    pipeline.done();
+
+    assertEquals(Lists.newArrayList(new StringWrapper("b"), new StringWrapper("c"), new StringWrapper("a"),
+        new StringWrapper("e")), stringWrappers);
+
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java b/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java
index 33f1f0c..dfb296e 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java
@@ -26,9 +26,6 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
 
-import org.apache.hadoop.io.Text;
-import org.junit.Test;
-
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
@@ -36,7 +33,6 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.test.Employee;
 import org.apache.crunch.test.FileHelper;
 import org.apache.crunch.types.PTableType;
@@ -45,13 +41,17 @@ import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 public class AggregateIT {
 
-  @Test public void testWritables() throws Exception {
+  @Test
+  public void testWritables() throws Exception {
     Pipeline pipeline = new MRPipeline(AggregateIT.class);
     String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
     PCollection<String> shakes = pipeline.readTextFile(shakesInputPath);
@@ -59,7 +59,8 @@ public class AggregateIT {
     pipeline.done();
   }
 
-  @Test public void testAvro() throws Exception {
+  @Test
+  public void testAvro() throws Exception {
     Pipeline pipeline = new MRPipeline(AggregateIT.class);
     String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
     PCollection<String> shakes = pipeline.readTextFile(shakesInputPath);
@@ -67,12 +68,12 @@ public class AggregateIT {
     pipeline.done();
   }
 
-  @Test public void testInMemoryAvro() throws Exception {
-    PCollection<String> someText = MemPipeline.collectionOf(
-        "first line", "second line", "third line");
+  @Test
+  public void testInMemoryAvro() throws Exception {
+    PCollection<String> someText = MemPipeline.collectionOf("first line", "second line", "third line");
     runMinMax(someText, AvroTypeFamily.getInstance());
   }
-  
+
   public static void runMinMax(PCollection<String> shakes, PTypeFamily family) throws Exception {
     PCollection<Integer> lengths = shakes.parallelDo(new MapFn<String, Integer>() {
       @Override
@@ -92,21 +93,21 @@ public class AggregateIT {
     assertTrue(minLengths != null);
     assertEquals(maxLengths.intValue(), -minLengths.intValue());
   }
-  
+
   private static class SplitFn extends MapFn<String, Pair<String, String>> {
     @Override
     public Pair<String, String> map(String input) {
       String[] p = input.split("\\s+");
       return Pair.of(p[0], p[1]);
-    }  
+    }
   }
-  
-  @Test public void testCollectUrls() throws Exception {
+
+  @Test
+  public void testCollectUrls() throws Exception {
     Pipeline p = new MRPipeline(AggregateIT.class);
     String urlsInputPath = FileHelper.createTempCopyOf("urls.txt");
-    PTable<String, Collection<String>> urls = Aggregate.collectValues(
-        p.readTextFile(urlsInputPath)
-        .parallelDo(new SplitFn(), tableOf(strings(), strings())));
+    PTable<String, Collection<String>> urls = Aggregate.collectValues(p.readTextFile(urlsInputPath).parallelDo(
+        new SplitFn(), tableOf(strings(), strings())));
     for (Pair<String, Collection<String>> e : urls.materialize()) {
       String key = e.first();
       int expectedSize = 0;
@@ -121,14 +122,15 @@ public class AggregateIT {
       p.done();
     }
   }
-  
-  @Test public void testTopN() throws Exception {
+
+  @Test
+  public void testTopN() throws Exception {
     PTableType<String, Integer> ptype = Avros.tableOf(Avros.strings(), Avros.ints());
     PTable<String, Integer> counts = MemPipeline.typedTableOf(ptype, "foo", 12, "bar", 17, "baz", 29);
-    
+
     PTable<String, Integer> top2 = Aggregate.top(counts, 2, true);
     assertEquals(ImmutableList.of(Pair.of("baz", 29), Pair.of("bar", 17)), top2.materialize());
-    
+
     PTable<String, Integer> bottom2 = Aggregate.top(counts, 2, false);
     assertEquals(ImmutableList.of(Pair.of("foo", 12), Pair.of("bar", 17)), bottom2.materialize());
   }
@@ -136,16 +138,13 @@ public class AggregateIT {
   @Test
   public void testCollectValues_Writables() throws IOException {
     Pipeline pipeline = new MRPipeline(AggregateIT.class);
-    Map<Integer, Collection<Text>> collectionMap = pipeline
-        .readTextFile(FileHelper.createTempCopyOf("set2.txt"))
-        .parallelDo(new MapStringToTextPair(),
-            Writables.tableOf(Writables.ints(), Writables.writables(Text.class))
-        ).collectValues().materializeToMap();
+    Map<Integer, Collection<Text>> collectionMap = pipeline.readTextFile(FileHelper.createTempCopyOf("set2.txt"))
+        .parallelDo(new MapStringToTextPair(), Writables.tableOf(Writables.ints(), Writables.writables(Text.class)))
+        .collectValues().materializeToMap();
 
     assertEquals(1, collectionMap.size());
 
-    assertEquals(Lists.newArrayList(new Text("c"), new Text("d"), new Text("a")),
-        collectionMap.get(1));
+    assertEquals(Lists.newArrayList(new Text("c"), new Text("d"), new Text("a")), collectionMap.get(1));
   }
 
   @Test
@@ -153,10 +152,8 @@ public class AggregateIT {
 
     MapStringToEmployeePair mapFn = new MapStringToEmployeePair();
     Pipeline pipeline = new MRPipeline(AggregateIT.class);
-    Map<Integer, Collection<Employee>> collectionMap = pipeline
-        .readTextFile(FileHelper.createTempCopyOf("set2.txt"))
-        .parallelDo(mapFn,
-            Avros.tableOf(Avros.ints(), Avros.records(Employee.class))).collectValues()
+    Map<Integer, Collection<Employee>> collectionMap = pipeline.readTextFile(FileHelper.createTempCopyOf("set2.txt"))
+        .parallelDo(mapFn, Avros.tableOf(Avros.ints(), Avros.records(Employee.class))).collectValues()
         .materializeToMap();
 
     assertEquals(1, collectionMap.size());
@@ -165,8 +162,7 @@ public class AggregateIT {
     Employee empD = mapFn.map("d").second();
     Employee empA = mapFn.map("a").second();
 
-    assertEquals(Lists.newArrayList(empC, empD, empA),
-        collectionMap.get(1));
+    assertEquals(Lists.newArrayList(empC, empD, empA), collectionMap.get(1));
   }
 
   private static class MapStringToTextPair extends MapFn<String, Pair<Integer, Text>> {
@@ -211,7 +207,6 @@ public class AggregateIT {
       return String.format("PojoText<%s>", this.value);
     }
 
-
     @Override
     public boolean equals(Object obj) {
       if (this == obj)

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java b/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java
index 0aece4b..9f1e35c 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java
@@ -17,10 +17,10 @@
  */
 package org.apache.crunch.lib;
 
+import static junit.framework.Assert.assertEquals;
 import static org.apache.crunch.types.avro.Avros.ints;
 import static org.apache.crunch.types.avro.Avros.records;
 import static org.apache.crunch.types.avro.Avros.strings;
-import static junit.framework.Assert.assertEquals;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -30,15 +30,15 @@ import java.util.List;
 
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.specific.SpecificDatumWriter;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
 import org.apache.crunch.test.Person;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
 import com.google.common.collect.Lists;
 
 /**
@@ -46,103 +46,95 @@ import com.google.common.collect.Lists;
  */
 public class AvroTypeSortIT implements Serializable {
 
-	private static final long serialVersionUID = 1344118240353796561L;
+  private static final long serialVersionUID = 1344118240353796561L;
 
-	private transient File avroFile;
+  private transient File avroFile;
 
-	@Before
-	public void setUp() throws IOException {
-		avroFile = File.createTempFile("avrotest", ".avro");
-	}
+  @Before
+  public void setUp() throws IOException {
+    avroFile = File.createTempFile("avrotest", ".avro");
+  }
 
-	@After
-	public void tearDown() {
-		avroFile.delete();
-	}
+  @After
+  public void tearDown() {
+    avroFile.delete();
+  }
 
-	@Test
-	public void testSortAvroTypesBySelectedFields() throws Exception {
+  @Test
+  public void testSortAvroTypesBySelectedFields() throws Exception {
 
-		MRPipeline pipeline = new MRPipeline(AvroTypeSortIT.class);
+    MRPipeline pipeline = new MRPipeline(AvroTypeSortIT.class);
 
-		Person ccc10 = createPerson("CCC", 10);
-		Person bbb20 = createPerson("BBB", 20);
-		Person aaa30 = createPerson("AAA", 30);
+    Person ccc10 = createPerson("CCC", 10);
+    Person bbb20 = createPerson("BBB", 20);
+    Person aaa30 = createPerson("AAA", 30);
 
-		writeAvroFile(Lists.newArrayList(ccc10, bbb20, aaa30), avroFile);
+    writeAvroFile(Lists.newArrayList(ccc10, bbb20, aaa30), avroFile);
 
-		PCollection<Person> unsorted = pipeline.read(At.avroFile(
-				avroFile.getAbsolutePath(), records(Person.class)));
+    PCollection<Person> unsorted = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), records(Person.class)));
 
-		// Sort by Name
-		MapFn<Person, String> nameExtractor = new MapFn<Person, String>() {
+    // Sort by Name
+    MapFn<Person, String> nameExtractor = new MapFn<Person, String>() {
 
-			@Override
-			public String map(Person input) {
-				return input.getName().toString();
-			}
-		};
+      @Override
+      public String map(Person input) {
+        return input.getName().toString();
+      }
+    };
 
-		PCollection<Person> sortedByName = unsorted
-				.by(nameExtractor, strings()).groupByKey().ungroup().values();
+    PCollection<Person> sortedByName = unsorted.by(nameExtractor, strings()).groupByKey().ungroup().values();
 
-		List<Person> sortedByNameList = Lists.newArrayList(sortedByName
-				.materialize());
+    List<Person> sortedByNameList = Lists.newArrayList(sortedByName.materialize());
 
-		assertEquals(3, sortedByNameList.size());
-		assertEquals(aaa30, sortedByNameList.get(0));
-		assertEquals(bbb20, sortedByNameList.get(1));
-		assertEquals(ccc10, sortedByNameList.get(2));
+    assertEquals(3, sortedByNameList.size());
+    assertEquals(aaa30, sortedByNameList.get(0));
+    assertEquals(bbb20, sortedByNameList.get(1));
+    assertEquals(ccc10, sortedByNameList.get(2));
 
-		// Sort by Age
+    // Sort by Age
 
-		MapFn<Person, Integer> ageExtractor = new MapFn<Person, Integer>() {
+    MapFn<Person, Integer> ageExtractor = new MapFn<Person, Integer>() {
 
-			@Override
-			public Integer map(Person input) {
-				return input.getAge();
-			}
-		};
+      @Override
+      public Integer map(Person input) {
+        return input.getAge();
+      }
+    };
 
-		PCollection<Person> sortedByAge = unsorted.by(ageExtractor, ints())
-				.groupByKey().ungroup().values();
+    PCollection<Person> sortedByAge = unsorted.by(ageExtractor, ints()).groupByKey().ungroup().values();
 
-		List<Person> sortedByAgeList = Lists.newArrayList(sortedByAge
-				.materialize());
+    List<Person> sortedByAgeList = Lists.newArrayList(sortedByAge.materialize());
 
-		assertEquals(3, sortedByAgeList.size());
-		assertEquals(ccc10, sortedByAgeList.get(0));
-		assertEquals(bbb20, sortedByAgeList.get(1));
-		assertEquals(aaa30, sortedByAgeList.get(2));
+    assertEquals(3, sortedByAgeList.size());
+    assertEquals(ccc10, sortedByAgeList.get(0));
+    assertEquals(bbb20, sortedByAgeList.get(1));
+    assertEquals(aaa30, sortedByAgeList.get(2));
 
-		pipeline.done();
-	}
+    pipeline.done();
+  }
 
-	private void writeAvroFile(List<Person> people, File avroFile)
-			throws IOException {
+  private void writeAvroFile(List<Person> people, File avroFile) throws IOException {
 
-		FileOutputStream outputStream = new FileOutputStream(avroFile);
-		SpecificDatumWriter<Person> writer = new SpecificDatumWriter<Person>(
-				Person.class);
+    FileOutputStream outputStream = new FileOutputStream(avroFile);
+    SpecificDatumWriter<Person> writer = new SpecificDatumWriter<Person>(Person.class);
 
-		DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(
-				writer);
-		dataFileWriter.create(Person.SCHEMA$, outputStream);
-		for (Person person : people) {
-			dataFileWriter.append(person);
-		}
-		dataFileWriter.close();
-		outputStream.close();
-	}
+    DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(writer);
+    dataFileWriter.create(Person.SCHEMA$, outputStream);
+    for (Person person : people) {
+      dataFileWriter.append(person);
+    }
+    dataFileWriter.close();
+    outputStream.close();
+  }
 
-	private Person createPerson(String name, int age) throws IOException {
+  private Person createPerson(String name, int age) throws IOException {
 
-		Person person = new Person();
-		person.setAge(age);
-		person.setName(name);
-		List<CharSequence> siblingNames = Lists.newArrayList();
-		person.setSiblingnames(siblingNames);
+    Person person = new Person();
+    person.setAge(age);
+    person.setName(name);
+    List<CharSequence> siblingNames = Lists.newArrayList();
+    person.setSiblingnames(siblingNames);
 
-		return person;
-	}
+    return person;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
index cc67257..c64b8ec 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
@@ -25,8 +25,6 @@ import java.nio.charset.Charset;
 import java.util.Collection;
 import java.util.List;
 
-import org.junit.Test;
-
 import org.apache.crunch.CombineFn;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
@@ -43,6 +41,8 @@ import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Test;
+
 import com.google.common.base.Splitter;
 import com.google.common.io.Files;
 
@@ -57,14 +57,12 @@ public class CogroupIT {
     }
   }
 
-  public static PTable<String, Long> join(PCollection<String> w1,
-      PCollection<String> w2, PTypeFamily ptf) {
+  public static PTable<String, Long> join(PCollection<String> w1, PCollection<String> w2, PTypeFamily ptf) {
     PTableType<String, Long> ntt = ptf.tableOf(ptf.strings(), ptf.longs());
     PTable<String, Long> ws1 = w1.parallelDo("ws1", new WordSplit(), ntt);
     PTable<String, Long> ws2 = w2.parallelDo("ws2", new WordSplit(), ntt);
     PTable<String, Pair<Collection<Long>, Collection<Long>>> cg = Cogroup.cogroup(ws1, ws2);
-    PTable<String, Long> sums = cg.parallelDo(
-        "wc",
+    PTable<String, Long> sums = cg.parallelDo("wc",
         new MapValuesFn<String, Pair<Collection<Long>, Collection<Long>>, Long>() {
           @Override
           public Long map(Pair<Collection<Long>, Collection<Long>> v) {
@@ -87,29 +85,29 @@ public class CogroupIT {
           return "";
         }
       }
-    }, ntt).groupByKey().combineValues(CombineFn.<String>SUM_LONGS());
+    }, ntt).groupByKey().combineValues(CombineFn.<String> SUM_LONGS());
   }
 
   @Test
   public void testWritableJoin() throws Exception {
     run(new MRPipeline(CogroupIT.class), WritableTypeFamily.getInstance());
   }
-  
+
   @Test
   public void testAvroJoin() throws Exception {
     run(new MRPipeline(CogroupIT.class), AvroTypeFamily.getInstance());
   }
-  
+
   public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
     String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
     String maughamInputPath = FileHelper.createTempCopyOf("maugham.txt");
     File output = FileHelper.createOutputPath();
-    
+
     PCollection<String> shakespeare = pipeline.read(From.textFile(shakesInputPath));
     PCollection<String> maugham = pipeline.read(From.textFile(maughamInputPath));
     pipeline.writeTextFile(join(shakespeare, maugham, typeFamily), output.getAbsolutePath());
     pipeline.done();
-    
+
     File outputFile = new File(output, "part-r-00000");
     List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
     boolean passed = false;
@@ -120,7 +118,7 @@ public class CogroupIT {
       }
     }
     assertTrue(passed);
-    
+
     output.deleteOnExit();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/lib/SetIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SetIT.java b/crunch/src/it/java/org/apache/crunch/lib/SetIT.java
index ab09bef..4b07fa2 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/SetIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/SetIT.java
@@ -25,13 +25,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
 import org.apache.crunch.PCollection;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.Tuple3;
@@ -41,30 +34,34 @@ import org.apache.crunch.test.FileHelper;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
 import com.google.common.collect.Lists;
 
 @RunWith(value = Parameterized.class)
 public class SetIT {
-  
+
   private PTypeFamily typeFamily;
-  
+
   private Pipeline pipeline;
   private PCollection<String> set1;
   private PCollection<String> set2;
-  
+
   public SetIT(PTypeFamily typeFamily) {
     this.typeFamily = typeFamily;
   }
-  
+
   @Parameters
   public static Collection<Object[]> data() {
-    Object[][] data = new Object[][] {
-        { WritableTypeFamily.getInstance() },
-        { AvroTypeFamily.getInstance() }
-    };
+    Object[][] data = new Object[][] { { WritableTypeFamily.getInstance() }, { AvroTypeFamily.getInstance() } };
     return Arrays.asList(data);
   }
-  
+
   @Before
   public void setUp() throws IOException {
     String set1InputPath = FileHelper.createTempCopyOf("set1.txt");
@@ -73,26 +70,24 @@ public class SetIT {
     set1 = pipeline.read(At.textFile(set1InputPath, typeFamily.strings()));
     set2 = pipeline.read(At.textFile(set2InputPath, typeFamily.strings()));
   }
-  
+
   @After
   public void tearDown() {
     pipeline.done();
   }
-  
+
   @Test
   public void testDifference() throws Exception {
     PCollection<String> difference = Set.difference(set1, set2);
-    assertEquals(Lists.newArrayList("b", "e"),
-        Lists.newArrayList(difference.materialize()));
+    assertEquals(Lists.newArrayList("b", "e"), Lists.newArrayList(difference.materialize()));
   }
-  
+
   @Test
   public void testIntersection() throws Exception {
     PCollection<String> intersection = Set.intersection(set1, set2);
-    assertEquals(Lists.newArrayList("a", "c"),
-        Lists.newArrayList(intersection.materialize()));
+    assertEquals(Lists.newArrayList("a", "c"), Lists.newArrayList(intersection.materialize()));
   }
-  
+
   @Test
   public void testComm() throws Exception {
     PCollection<Tuple3<String, String, String>> comm = Set.comm(set1, set2);
@@ -105,8 +100,7 @@ public class SetIT {
     assertFalse(i.hasNext());
   }
 
-  private void checkEquals(String s1, String s2, String s3,
-      Tuple3<String, String, String> tuple) {
+  private void checkEquals(String s1, String s2, String s3, Tuple3<String, String, String> tuple) {
     assertEquals("first string", s1, tuple.first());
     assertEquals("second string", s2, tuple.second());
     assertEquals("third string", s3, tuple.third());

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
index 616ec15..a0357c1 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
@@ -39,7 +39,6 @@ import org.apache.crunch.Tuple3;
 import org.apache.crunch.Tuple4;
 import org.apache.crunch.TupleN;
 import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.lib.Sort;
 import org.apache.crunch.lib.Sort.ColumnOrder;
 import org.apache.crunch.lib.Sort.Order;
 import org.apache.crunch.test.FileHelper;
@@ -70,32 +69,32 @@ public class SortIT implements Serializable {
 
   @Test
   public void testWritableSortAscDesc() throws Exception {
-    runPair(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(1, ASCENDING),
-        by(2, DESCENDING), "A", "this doc has this text");
+    runPair(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING), "A",
+        "this doc has this text");
   }
 
   @Test
   public void testWritableSortSecondDescFirstDesc() throws Exception {
-    runPair(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(2, DESCENDING),
-        by(1, ASCENDING), "A", "this doc has this text");
+    runPair(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
+        "this doc has this text");
   }
 
   @Test
   public void testWritableSortTripleAscDescAsc() throws Exception {
-    runTriple(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(1, ASCENDING),
-        by(2, DESCENDING), by(3, ASCENDING), "A", "this", "doc");
+    runTriple(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+        by(3, ASCENDING), "A", "this", "doc");
   }
 
   @Test
   public void testWritableSortQuadAscDescAscDesc() throws Exception {
-    runQuad(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(1, ASCENDING),
-        by(2, DESCENDING), by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
+    runQuad(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+        by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
   }
 
   @Test
   public void testWritableSortTupleNAscDesc() throws Exception {
-    runTupleN(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), new ColumnOrder[] {
-        by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" });
+    runTupleN(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(),
+        new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" });
   }
 
   @Test
@@ -105,53 +104,49 @@ public class SortIT implements Serializable {
 
   @Test
   public void testAvroSortAsc() throws Exception {
-    runSingle(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), Order.ASCENDING,
-        "A\tand this text as well");
+    runSingle(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), Order.ASCENDING, "A\tand this text as well");
   }
 
   @Test
   public void testAvroSortDesc() throws Exception {
-    runSingle(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), Order.DESCENDING,
-        "B\tthis doc has some text");
+    runSingle(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), Order.DESCENDING, "B\tthis doc has some text");
   }
 
   @Test
   public void testAvroSortPairAscAsc() throws Exception {
-    runPair(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(1, ASCENDING),
-        by(2, DESCENDING), "A", "this doc has this text");
+    runPair(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING), "A",
+        "this doc has this text");
   }
 
   @Test
   @Ignore("Avro sorting only works in field order at the moment")
   public void testAvroSortPairSecondAscFirstDesc() throws Exception {
-    runPair(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(2, DESCENDING),
-        by(1, ASCENDING), "A", "this doc has this text");
+    runPair(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
+        "this doc has this text");
   }
 
   @Test
   public void testAvroSortTripleAscDescAsc() throws Exception {
-    runTriple(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(1, ASCENDING),
-        by(2, DESCENDING), by(3, ASCENDING), "A", "this", "doc");
+    runTriple(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+        by(3, ASCENDING), "A", "this", "doc");
   }
 
   @Test
   public void testAvroSortQuadAscDescAscDesc() throws Exception {
-    runQuad(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(1, ASCENDING),
-        by(2, DESCENDING), by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
+    runQuad(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+        by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
   }
 
   @Test
   public void testAvroSortTupleNAscDesc() throws Exception {
     runTupleN(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(),
-        new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A",
-            "this doc has this text" });
+        new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" });
   }
 
   @Test
   public void testAvroReflectSortPair() throws IOException {
     Pipeline pipeline = new MRPipeline(SortIT.class);
-    PCollection<Pair<String, StringWrapper>> sorted = pipeline
-        .readTextFile(FileHelper.createTempCopyOf("set2.txt"))
+    PCollection<Pair<String, StringWrapper>> sorted = pipeline.readTextFile(FileHelper.createTempCopyOf("set2.txt"))
         .parallelDo(new MapFn<String, Pair<String, StringWrapper>>() {
 
           @Override
@@ -171,8 +166,7 @@ public class SortIT implements Serializable {
   @Test
   public void testAvroReflectSortTable() throws IOException {
     Pipeline pipeline = new MRPipeline(SortIT.class);
-    PTable<String, StringWrapper> unsorted = pipeline.readTextFile(
-        FileHelper.createTempCopyOf("set2.txt")).parallelDo(
+    PTable<String, StringWrapper> unsorted = pipeline.readTextFile(FileHelper.createTempCopyOf("set2.txt")).parallelDo(
         new MapFn<String, Pair<String, StringWrapper>>() {
 
           @Override
@@ -196,8 +190,7 @@ public class SortIT implements Serializable {
     runTable(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), "A");
   }
 
-  private void runSingle(Pipeline pipeline, PTypeFamily typeFamily, Order order, String firstLine)
-      throws IOException {
+  private void runSingle(Pipeline pipeline, PTypeFamily typeFamily, Order order, String firstLine) throws IOException {
     String inputPath = FileHelper.createTempCopyOf("docs.txt");
 
     PCollection<String> input = pipeline.readTextFile(inputPath);
@@ -215,19 +208,18 @@ public class SortIT implements Serializable {
     pipeline.done(); // TODO: finally
   }
 
-  private void runPair(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first,
-      ColumnOrder second, String firstField, String secondField) throws IOException {
+  private void runPair(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first, ColumnOrder second,
+      String firstField, String secondField) throws IOException {
     String inputPath = FileHelper.createTempCopyOf("docs.txt");
 
     PCollection<String> input = pipeline.readTextFile(inputPath);
-    PCollection<Pair<String, String>> kv = input.parallelDo(
-        new DoFn<String, Pair<String, String>>() {
-          @Override
-          public void process(String input, Emitter<Pair<String, String>> emitter) {
-            String[] split = input.split("[\t]+");
-            emitter.emit(Pair.of(split[0], split[1]));
-          }
-        }, typeFamily.pairs(typeFamily.strings(), typeFamily.strings()));
+    PCollection<Pair<String, String>> kv = input.parallelDo(new DoFn<String, Pair<String, String>>() {
+      @Override
+      public void process(String input, Emitter<Pair<String, String>> emitter) {
+        String[] split = input.split("[\t]+");
+        emitter.emit(Pair.of(split[0], split[1]));
+      }
+    }, typeFamily.pairs(typeFamily.strings(), typeFamily.strings()));
     PCollection<Pair<String, String>> sorted = Sort.sortPairs(kv, first, second);
     Iterable<Pair<String, String>> lines = sorted.materialize();
     Pair<String, String> l = lines.iterator().next();
@@ -236,9 +228,8 @@ public class SortIT implements Serializable {
     pipeline.done();
   }
 
-  private void runTriple(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first,
-      ColumnOrder second, ColumnOrder third, String firstField, String secondField,
-      String thirdField) throws IOException {
+  private void runTriple(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first, ColumnOrder second,
+      ColumnOrder third, String firstField, String secondField, String thirdField) throws IOException {
     String inputPath = FileHelper.createTempCopyOf("docs.txt");
 
     PCollection<String> input = pipeline.readTextFile(inputPath);
@@ -260,9 +251,9 @@ public class SortIT implements Serializable {
     pipeline.done();
   }
 
-  private void runQuad(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first,
-      ColumnOrder second, ColumnOrder third, ColumnOrder fourth, String firstField,
-      String secondField, String thirdField, String fourthField) throws IOException {
+  private void runQuad(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first, ColumnOrder second,
+      ColumnOrder third, ColumnOrder fourth, String firstField, String secondField, String thirdField,
+      String fourthField) throws IOException {
     String inputPath = FileHelper.createTempCopyOf("docs.txt");
 
     PCollection<String> input = pipeline.readTextFile(inputPath);
@@ -274,10 +265,8 @@ public class SortIT implements Serializable {
             int len = split.length;
             emitter.emit(Tuple4.of(split[0], split[1 % len], split[2 % len], split[3 % len]));
           }
-        }, typeFamily.quads(typeFamily.strings(), typeFamily.strings(), typeFamily.strings(),
-            typeFamily.strings()));
-    PCollection<Tuple4<String, String, String, String>> sorted = Sort.sortQuads(kv, first, second,
-        third, fourth);
+        }, typeFamily.quads(typeFamily.strings(), typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
+    PCollection<Tuple4<String, String, String, String>> sorted = Sort.sortQuads(kv, first, second, third, fourth);
     Iterable<Tuple4<String, String, String, String>> lines = sorted.materialize();
     Tuple4<String, String, String, String> l = lines.iterator().next();
     assertEquals(firstField, l.first());
@@ -287,8 +276,8 @@ public class SortIT implements Serializable {
     pipeline.done();
   }
 
-  private void runTupleN(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder[] orders,
-      String[] fields) throws IOException {
+  private void runTupleN(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder[] orders, String[] fields)
+      throws IOException {
     String inputPath = FileHelper.createTempCopyOf("docs.txt");
 
     PCollection<String> input = pipeline.readTextFile(inputPath);
@@ -311,8 +300,7 @@ public class SortIT implements Serializable {
     pipeline.done();
   }
 
-  private void runTable(Pipeline pipeline, PTypeFamily typeFamily, String firstKey)
-      throws IOException {
+  private void runTable(Pipeline pipeline, PTypeFamily typeFamily, String firstKey) throws IOException {
     String inputPath = FileHelper.createTempCopyOf("docs.txt");
 
     PCollection<String> input = pipeline.readTextFile(inputPath);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java b/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
index 69f0717..7165ba2 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
@@ -28,10 +28,6 @@ import java.util.List;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.mapred.AvroJob;
 import org.apache.avro.specific.SpecificDatumWriter;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
@@ -42,6 +38,10 @@ import org.apache.crunch.test.Person;
 import org.apache.crunch.test.Person.Builder;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.avro.SafeAvroSerialization;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
 import com.google.common.collect.Lists;
 
 /**
@@ -49,91 +49,83 @@ import com.google.common.collect.Lists;
  */
 public class SpecificAvroGroupByIT implements Serializable {
 
-	private static final long serialVersionUID = 1344118240353796561L;
+  private static final long serialVersionUID = 1344118240353796561L;
 
-	private transient File avroFile;
+  private transient File avroFile;
 
-	@Before
-	public void setUp() throws IOException {
-		avroFile = File.createTempFile("avrotest", ".avro");
-	}
+  @Before
+  public void setUp() throws IOException {
+    avroFile = File.createTempFile("avrotest", ".avro");
+  }
 
-	@After
-	public void tearDown() {
-		avroFile.delete();
-	}
+  @After
+  public void tearDown() {
+    avroFile.delete();
+  }
 
-	@Test
-	public void testGrouByWithSpecificAvroType() throws Exception {
+  @Test
+  public void testGrouByWithSpecificAvroType() throws Exception {
 
-		MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class);
+    MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class);
 
-		testSpecificAvro(pipeline);
-	}
+    testSpecificAvro(pipeline);
+  }
 
   @Test
-  public void testGrouByOnSpecificAvroButReflectionDatumReader()
-      throws Exception {
+  public void testGrouByOnSpecificAvroButReflectionDatumReader() throws Exception {
     MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class);
 
-    // https://issues.apache.org/jira/browse/AVRO-1046  resolves 
-    // the ClassCastException when reading specific Avro types with 
+    // https://issues.apache.org/jira/browse/AVRO-1046 resolves
+    // the ClassCastException when reading specific Avro types with
     // ReflectDatumReader
-    
-    pipeline.getConfiguration().setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT,
-        true);
+
+    pipeline.getConfiguration().setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, true);
 
     testSpecificAvro(pipeline);
   }
 
-	public void testSpecificAvro(MRPipeline pipeline) throws Exception {
+  public void testSpecificAvro(MRPipeline pipeline) throws Exception {
+
+    createPersonAvroFile(avroFile);
 
-		createPersonAvroFile(avroFile);
+    PCollection<Person> unsorted = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), Avros.records(Person.class)));
 
-		PCollection<Person> unsorted = pipeline.read(At.avroFile(
-				avroFile.getAbsolutePath(), Avros.records(Person.class)));
+    PTable<String, Person> sorted = unsorted.parallelDo(new MapFn<Person, Pair<String, Person>>() {
 
-		PTable<String, Person> sorted = unsorted
-				.parallelDo(new MapFn<Person, Pair<String, Person>>() {
+      @Override
+      public Pair<String, Person> map(Person input) {
+        String key = input.getName().toString();
+        return Pair.of(key, input);
 
-					@Override
-					public Pair<String, Person> map(Person input) {
-						String key = input.getName().toString();
-						return Pair.of(key, input);
+      }
+    }, Avros.tableOf(Avros.strings(), Avros.records(Person.class))).groupByKey().ungroup();
 
-					}
-				}, Avros.tableOf(Avros.strings(), Avros.records(Person.class)))
-				.groupByKey().ungroup();
+    List<Pair<String, Person>> outputPersonList = Lists.newArrayList(sorted.materialize());
 
-		List<Pair<String, Person>> outputPersonList = Lists.newArrayList(sorted
-				.materialize());
+    assertEquals(1, outputPersonList.size());
+    assertEquals(String.class, outputPersonList.get(0).first().getClass());
+    assertEquals(Person.class, outputPersonList.get(0).second().getClass());
 
-		assertEquals(1, outputPersonList.size());
-		assertEquals(String.class, outputPersonList.get(0).first().getClass());
-		assertEquals(Person.class, outputPersonList.get(0).second().getClass());
-		
-		pipeline.done();
-	}
+    pipeline.done();
+  }
 
-	private void createPersonAvroFile(File avroFile) throws IOException {
+  private void createPersonAvroFile(File avroFile) throws IOException {
 
-		Builder person = Person.newBuilder();
-		person.setAge(40);
-		person.setName("Bob");
-		List<CharSequence> siblingNames = Lists.newArrayList();
-		siblingNames.add("Bob" + "1");
-		siblingNames.add("Bob" + "2");
-		person.setSiblingnames(siblingNames);
+    Builder person = Person.newBuilder();
+    person.setAge(40);
+    person.setName("Bob");
+    List<CharSequence> siblingNames = Lists.newArrayList();
+    siblingNames.add("Bob" + "1");
+    siblingNames.add("Bob" + "2");
+    person.setSiblingnames(siblingNames);
 
-		FileOutputStream outputStream = new FileOutputStream(avroFile);
-		SpecificDatumWriter<Person> writer = new SpecificDatumWriter<Person>(
-				Person.class);
+    FileOutputStream outputStream = new FileOutputStream(avroFile);
+    SpecificDatumWriter<Person> writer = new SpecificDatumWriter<Person>(Person.class);
 
-		DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(
-				writer);
-		dataFileWriter.create(Person.SCHEMA$, outputStream);
-		dataFileWriter.append(person.build());
-		dataFileWriter.close();
-		outputStream.close();
-	}
+    DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(writer);
+    dataFileWriter.create(Person.SCHEMA$, outputStream);
+    dataFileWriter.append(person.build());
+    dataFileWriter.close();
+    outputStream.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java b/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java
index c5a9f39..54c4945 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java
@@ -20,8 +20,6 @@ package org.apache.crunch.lib.join;
 import java.io.IOException;
 import java.io.Serializable;
 
-import org.junit.Test;
-
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.PCollection;
@@ -36,6 +34,7 @@ import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Test;
 
 public abstract class JoinTester implements Serializable {
   private static class WordSplit extends DoFn<String, String> {
@@ -47,24 +46,21 @@ public abstract class JoinTester implements Serializable {
     }
   }
 
-  protected PTable<String, Long> join(PCollection<String> w1, PCollection<String> w2,
-        PTypeFamily ptf) {
+  protected PTable<String, Long> join(PCollection<String> w1, PCollection<String> w2, PTypeFamily ptf) {
     PTableType<String, Long> ntt = ptf.tableOf(ptf.strings(), ptf.longs());
     PTable<String, Long> ws1 = Aggregate.count(w1.parallelDo("ws1", new WordSplit(), ptf.strings()));
     PTable<String, Long> ws2 = Aggregate.count(w2.parallelDo("ws2", new WordSplit(), ptf.strings()));
 
     PTable<String, Pair<Long, Long>> join = Join.join(ws1, ws2, getJoinFn(ptf));
 
-    PTable<String, Long> sums = join.parallelDo("cnt",
-        new DoFn<Pair<String, Pair<Long, Long>>, Pair<String, Long>>() {
-          @Override
-          public void process(Pair<String, Pair<Long, Long>> input,
-                              Emitter<Pair<String, Long>> emitter) {
-            Pair<Long, Long> pair = input.second();
-            long sum = (pair.first() != null ? pair.first() : 0) + (pair.second() != null ? pair.second() : 0);
-            emitter.emit(Pair.of(input.first(), sum));
-          }
-        }, ntt);
+    PTable<String, Long> sums = join.parallelDo("cnt", new DoFn<Pair<String, Pair<Long, Long>>, Pair<String, Long>>() {
+      @Override
+      public void process(Pair<String, Pair<Long, Long>> input, Emitter<Pair<String, Long>> emitter) {
+        Pair<Long, Long> pair = input.second();
+        long sum = (pair.first() != null ? pair.first() : 0) + (pair.second() != null ? pair.second() : 0);
+        emitter.emit(Pair.of(input.first(), sum));
+      }
+    }, ntt);
 
     return sums;
   }
@@ -84,10 +80,10 @@ public abstract class JoinTester implements Serializable {
   }
 
   @Test
-   public void testWritableJoin() throws Exception {
+  public void testWritableJoin() throws Exception {
     run(new MRPipeline(InnerJoinIT.class), WritableTypeFamily.getInstance());
   }
-  
+
   @Test
   public void testAvroJoin() throws Exception {
     run(new MRPipeline(InnerJoinIT.class), AvroTypeFamily.getInstance());
@@ -95,8 +91,9 @@ public abstract class JoinTester implements Serializable {
 
   /**
    * Used to check that the result of the join makes sense.
-   *
-   * @param lines The result of the join.
+   * 
+   * @param lines
+   *          The result of the join.
    */
   public abstract void assertPassed(Iterable<Pair<String, Long>> lines);
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
index baf3d4f..f7f8f09 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
@@ -24,8 +24,6 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-import org.junit.Test;
-
 import org.apache.crunch.FilterFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PTable;
@@ -36,6 +34,8 @@ import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
 import org.apache.crunch.test.FileHelper;
 import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
 import com.google.common.collect.Lists;
 
 public class MapsideJoinIT {
@@ -70,14 +70,12 @@ public class MapsideJoinIT {
     PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
     PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
 
-    PTable<Integer, String> filteredOrderTable = orderTable.parallelDo(new NegativeFilter(),
-        orderTable.getPTableType());
+    PTable<Integer, String> filteredOrderTable = orderTable
+        .parallelDo(new NegativeFilter(), orderTable.getPTableType());
 
-    PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(customerTable,
-        filteredOrderTable);
+    PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(customerTable, filteredOrderTable);
 
-    List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined
-        .materialize());
+    List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined.materialize());
 
     assertTrue(materializedJoin.isEmpty());
 
@@ -100,8 +98,7 @@ public class MapsideJoinIT {
     expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet plunger")));
     expectedJoinResult.add(Pair.of(333, Pair.of("Someone Else", "Toilet brush")));
 
-    List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(joined
-        .materialize());
+    List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(joined.materialize());
     Collections.sort(joinedResultList);
 
     assertEquals(expectedJoinResult, joinedResultList);
@@ -109,8 +106,8 @@ public class MapsideJoinIT {
 
   private static PTable<Integer, String> readTable(Pipeline pipeline, String filename) {
     try {
-      return pipeline.readTextFile(FileHelper.createTempCopyOf(filename)).parallelDo("asTable",
-          new LineSplitter(), Writables.tableOf(Writables.ints(), Writables.strings()));
+      return pipeline.readTextFile(FileHelper.createTempCopyOf(filename)).parallelDo("asTable", new LineSplitter(),
+          Writables.tableOf(Writables.ints(), Writables.strings()));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/07683711/crunch/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java
index 5fcef72..2531d24 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java
@@ -29,10 +29,6 @@ import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.specific.SpecificRecord;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.Pair;
@@ -41,6 +37,10 @@ import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.From;
 import org.apache.crunch.test.Employee;
 import org.apache.crunch.test.Person;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
@@ -48,7 +48,7 @@ public class MultiAvroSchemaJoinIT {
 
   private File personFile;
   private File employeeFile;
-  
+
   @Before
   public void setUp() throws Exception {
     this.personFile = File.createTempFile("person", ".avro");
@@ -60,20 +60,20 @@ public class MultiAvroSchemaJoinIT {
     Person p1 = new Person();
     p1.setName("Josh");
     p1.setAge(19);
-    p1.setSiblingnames(ImmutableList.<CharSequence>of("Kate", "Mike"));
+    p1.setSiblingnames(ImmutableList.<CharSequence> of("Kate", "Mike"));
     pfw.append(p1);
     Person p2 = new Person();
     p2.setName("Kate");
     p2.setAge(17);
-    p2.setSiblingnames(ImmutableList.<CharSequence>of("Josh", "Mike"));
+    p2.setSiblingnames(ImmutableList.<CharSequence> of("Josh", "Mike"));
     pfw.append(p2);
     Person p3 = new Person();
     p3.setName("Mike");
     p3.setAge(12);
-    p3.setSiblingnames(ImmutableList.<CharSequence>of("Josh", "Kate"));
+    p3.setSiblingnames(ImmutableList.<CharSequence> of("Josh", "Kate"));
     pfw.append(p3);
     pfw.close();
-    
+
     DatumWriter<Employee> edw = new SpecificDatumWriter<Employee>();
     DataFileWriter<Employee> efw = new DataFileWriter<Employee>(edw);
     efw.create(Employee.SCHEMA$, employeeFile);
@@ -84,13 +84,13 @@ public class MultiAvroSchemaJoinIT {
     efw.append(e1);
     efw.close();
   }
-  
+
   @After
   public void tearDown() throws Exception {
     personFile.delete();
     employeeFile.delete();
   }
-  
+
   public static class NameFn<K extends SpecificRecord> extends MapFn<K, String> {
     @Override
     public String map(K input) {
@@ -99,18 +99,18 @@ public class MultiAvroSchemaJoinIT {
       return input.get(f.pos()).toString();
     }
   }
-  
+
   @Test
   public void testJoin() throws Exception {
-     Pipeline p = new MRPipeline(MultiAvroSchemaJoinIT.class);
-     PCollection<Person> people = p.read(From.avroFile(personFile.getAbsolutePath(), records(Person.class)));
-     PCollection<Employee> employees = p.read(From.avroFile(employeeFile.getAbsolutePath(), records(Employee.class)));
-     
-     Iterable<Pair<Person, Employee>> result = people.by(new NameFn<Person>(), strings())
-         .join(employees.by(new NameFn<Employee>(), strings())).values().materialize();
-     List<Pair<Person, Employee>> v = Lists.newArrayList(result);
-     assertEquals(1, v.size());
-     assertEquals("Kate", v.get(0).first().getName().toString());
-     assertEquals("Kate", v.get(0).second().getName().toString());
+    Pipeline p = new MRPipeline(MultiAvroSchemaJoinIT.class);
+    PCollection<Person> people = p.read(From.avroFile(personFile.getAbsolutePath(), records(Person.class)));
+    PCollection<Employee> employees = p.read(From.avroFile(employeeFile.getAbsolutePath(), records(Employee.class)));
+
+    Iterable<Pair<Person, Employee>> result = people.by(new NameFn<Person>(), strings())
+        .join(employees.by(new NameFn<Employee>(), strings())).values().materialize();
+    List<Pair<Person, Employee>> v = Lists.newArrayList(result);
+    assertEquals(1, v.size());
+    assertEquals("Kate", v.get(0).first().getName().toString());
+    assertEquals("Kate", v.get(0).second().getName().toString());
   }
 }


Mime
View raw message