incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [13/33] CRUNCH-8: Moving the code into multiple Maven modules. Contributed by Matthias Friedrich
Date Wed, 11 Jul 2012 05:14:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/lib/SampleTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/SampleTest.java b/crunch/src/test/java/org/apache/crunch/lib/SampleTest.java
new file mode 100644
index 0000000..7e75d44
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/SampleTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.crunch.impl.mem.MemPipeline;
+import com.google.common.collect.ImmutableList;
+
+public class SampleTest {
+  @Test
+  public void testSampler() {
+	Iterable<Integer> sample = MemPipeline.collectionOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+	    .sample(0.2, 123998).materialize();
+	List<Integer> sampleValues = ImmutableList.copyOf(sample);
+	assertEquals(ImmutableList.of(6, 7), sampleValues);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/lib/SetTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/SetTest.java b/crunch/src/test/java/org/apache/crunch/lib/SetTest.java
new file mode 100644
index 0000000..52022ad
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/SetTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import com.google.common.collect.Lists;
+
+@RunWith(value = Parameterized.class)
+public class SetTest {
+  
+  private PTypeFamily typeFamily;
+  
+  private Pipeline pipeline;
+  private PCollection<String> set1;
+  private PCollection<String> set2;
+  
+  public SetTest(PTypeFamily typeFamily) {
+    this.typeFamily = typeFamily;
+  }
+  
+  @Parameters
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][] {
+        { WritableTypeFamily.getInstance() },
+        { AvroTypeFamily.getInstance() }
+    };
+    return Arrays.asList(data);
+  }
+  
+  @Before
+  public void setUp() throws IOException {
+    String set1InputPath = FileHelper.createTempCopyOf("set1.txt");
+    String set2InputPath = FileHelper.createTempCopyOf("set2.txt");
+    pipeline = new MRPipeline(SetTest.class);
+    set1 = pipeline.read(At.textFile(set1InputPath, typeFamily.strings()));
+    set2 = pipeline.read(At.textFile(set2InputPath, typeFamily.strings()));
+  }
+  
+  @After
+  public void tearDown() {
+    pipeline.done();
+  }
+  
+  @Test
+  public void testDifference() throws Exception {
+    PCollection<String> difference = Set.difference(set1, set2);
+    assertEquals(Lists.newArrayList("b", "e"),
+        Lists.newArrayList(difference.materialize()));
+  }
+  
+  @Test
+  public void testIntersection() throws Exception {
+    PCollection<String> intersection = Set.intersection(set1, set2);
+    assertEquals(Lists.newArrayList("a", "c"),
+        Lists.newArrayList(intersection.materialize()));
+  }
+  
+  @Test
+  public void testComm() throws Exception {
+    PCollection<Tuple3<String, String, String>> comm = Set.comm(set1, set2);
+    Iterator<Tuple3<String, String, String>> i = comm.materialize().iterator();
+    checkEquals(null, null, "a", i.next());
+    checkEquals("b", null, null, i.next());
+    checkEquals(null, null, "c", i.next());
+    checkEquals(null, "d", null, i.next());
+    checkEquals("e", null, null, i.next());
+    assertFalse(i.hasNext());
+  }
+
+  private void checkEquals(String s1, String s2, String s3,
+      Tuple3<String, String, String> tuple) {
+    assertEquals("first string", s1, tuple.first());
+    assertEquals("second string", s2, tuple.second());
+    assertEquals("third string", s3, tuple.third());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/lib/SortTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/SortTest.java b/crunch/src/test/java/org/apache/crunch/lib/SortTest.java
new file mode 100644
index 0000000..8d2838f
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/SortTest.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.apache.crunch.lib.Sort.ColumnOrder.by;
+import static org.apache.crunch.lib.Sort.Order.ASCENDING;
+import static org.apache.crunch.lib.Sort.Order.DESCENDING;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.lib.Sort.ColumnOrder;
+import org.apache.crunch.lib.Sort.Order;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+
+public class SortTest implements Serializable {
+  
+  @Test
+  public void testWritableSortAsc() throws Exception {
+    runSingle(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
+        Order.ASCENDING, "A\tand this text as well");
+  }
+
+  @Test
+  public void testWritableSortDesc() throws Exception {
+    runSingle(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
+        Order.DESCENDING, "B\tthis doc has some text");
+  }
+  
+  @Test
+  public void testWritableSortAscDesc() throws Exception {
+    runPair(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
+        by(1, ASCENDING), by(2, DESCENDING), "A", "this doc has this text");
+  }
+
+  @Test
+  public void testWritableSortSecondDescFirstDesc() throws Exception {
+    runPair(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
+        by(2, DESCENDING), by(1, ASCENDING), "A", "this doc has this text");
+  }
+
+  @Test
+  public void testWritableSortTripleAscDescAsc() throws Exception {
+    runTriple(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
+        by(1, ASCENDING), by(2, DESCENDING), by(3, ASCENDING), "A", "this", "doc");
+  }
+
+  @Test
+  public void testWritableSortQuadAscDescAscDesc() throws Exception {
+    runQuad(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
+        by(1, ASCENDING), by(2, DESCENDING), by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
+  }
+
+  @Test
+  public void testWritableSortTupleNAscDesc() throws Exception {
+    runTupleN(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
+        new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING)}, new String[] { "A", "this doc has this text" });
+  }
+
+  @Test
+  public void testWritableSortTable() throws Exception {
+    runTable(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
+        "A");
+  }
+  
+  @Test
+  public void testAvroSortAsc() throws Exception {
+    runSingle(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
+        Order.ASCENDING, "A\tand this text as well");
+  }
+  
+  @Test
+  public void testAvroSortDesc() throws Exception {
+    runSingle(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
+        Order.DESCENDING, "B\tthis doc has some text");
+  }
+  
+  @Test
+  public void testAvroSortPairAscAsc() throws Exception {
+    runPair(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
+        by(1, ASCENDING), by(2, DESCENDING), "A", "this doc has this text");
+  }
+  
+  @Test
+  @Ignore("Avro sorting only works in field order at the moment")
+  public void testAvroSortPairSecondAscFirstDesc() throws Exception {
+    runPair(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
+        by(2, DESCENDING), by(1, ASCENDING), "A", "this doc has this text");
+  }
+  
+  @Test
+  public void testAvroSortTripleAscDescAsc() throws Exception {
+    runTriple(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
+        by(1, ASCENDING), by(2, DESCENDING), by(3, ASCENDING), "A", "this", "doc");
+  }
+
+  @Test
+  public void testAvroSortQuadAscDescAscDesc() throws Exception {
+    runQuad(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
+        by(1, ASCENDING), by(2, DESCENDING), by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
+  }
+
+  @Test
+  public void testAvroSortTupleNAscDesc() throws Exception {
+    runTupleN(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
+        new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" });
+  }
+  
+  @Test
+  public void testAvroSortTable() throws Exception {
+    runTable(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), "A");
+  }
+
+  private void runSingle(Pipeline pipeline, PTypeFamily typeFamily,
+      Order order, String firstLine) throws IOException {
+    String inputPath = FileHelper.createTempCopyOf("docs.txt");
+    
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    // following turns the input from Writables to required type family
+    PCollection<String> input2 = input.parallelDo(new DoFn<String, String>() {
+      @Override
+      public void process(String input, Emitter<String> emitter) {
+        emitter.emit(input);
+      }
+    }, typeFamily.strings());
+    PCollection<String> sorted = Sort.sort(input2, order);
+    Iterable<String> lines = sorted.materialize();
+    
+    assertEquals(firstLine, lines.iterator().next());
+    pipeline.done(); // TODO: finally
+  }
+  
+  private void runPair(Pipeline pipeline, PTypeFamily typeFamily,
+      ColumnOrder first, ColumnOrder second, String firstField, String secondField) throws IOException {
+    String inputPath = FileHelper.createTempCopyOf("docs.txt");
+    
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PCollection<Pair<String, String>> kv = input.parallelDo(
+      new DoFn<String, Pair<String, String>>() {
+        @Override
+        public void process(String input, Emitter<Pair<String, String>> emitter) {
+          String[] split = input.split("[\t]+");
+          emitter.emit(Pair.of(split[0], split[1]));
+        }
+    }, typeFamily.pairs(typeFamily.strings(), typeFamily.strings()));
+    PCollection<Pair<String, String>> sorted = Sort.sortPairs(kv, first, second);
+    Iterable<Pair<String, String>> lines = sorted.materialize();
+    Pair<String, String> l = lines.iterator().next();
+    assertEquals(firstField, l.first());
+    assertEquals(secondField, l.second());
+    pipeline.done();
+  }
+  
+  private void runTriple(Pipeline pipeline, PTypeFamily typeFamily,
+      ColumnOrder first, ColumnOrder second, ColumnOrder third, String firstField, String secondField, String thirdField) throws IOException {
+    String inputPath = FileHelper.createTempCopyOf("docs.txt");
+    
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PCollection<Tuple3<String, String, String>> kv = input.parallelDo(
+      new DoFn<String, Tuple3<String, String, String>>() {
+        @Override
+        public void process(String input, Emitter<Tuple3<String, String, String>> emitter) {
+          String[] split = input.split("[\t ]+");
+          int len = split.length;
+          emitter.emit(Tuple3.of(split[0], split[1 % len], split[2 % len]));
+        }
+    }, typeFamily.triples(typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
+    PCollection<Tuple3<String, String, String>> sorted = Sort.sortTriples(kv, first, second, third);
+    Iterable<Tuple3<String, String, String>> lines = sorted.materialize();
+    Tuple3<String, String, String> l = lines.iterator().next();
+    assertEquals(firstField, l.first());
+    assertEquals(secondField, l.second());
+    assertEquals(thirdField, l.third());
+    pipeline.done();
+  }
+  
+  private void runQuad(Pipeline pipeline, PTypeFamily typeFamily,
+      ColumnOrder first, ColumnOrder second, ColumnOrder third, ColumnOrder fourth,
+      String firstField, String secondField, String thirdField, String fourthField) throws IOException {
+    String inputPath = FileHelper.createTempCopyOf("docs.txt");
+    
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PCollection<Tuple4<String, String, String, String>> kv = input.parallelDo(
+      new DoFn<String, Tuple4<String, String, String, String>>() {
+        @Override
+        public void process(String input, Emitter<Tuple4<String, String, String, String>> emitter) {
+          String[] split = input.split("[\t ]+");
+          int len = split.length;
+          emitter.emit(Tuple4.of(split[0], split[1 % len], split[2 % len], split[3 % len]));
+        }
+    }, typeFamily.quads(typeFamily.strings(), typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
+    PCollection<Tuple4<String, String, String, String>> sorted = Sort.sortQuads(kv, first, second, third, fourth);
+    Iterable<Tuple4<String, String, String, String>> lines = sorted.materialize();
+    Tuple4<String, String, String, String> l = lines.iterator().next();
+    assertEquals(firstField, l.first());
+    assertEquals(secondField, l.second());
+    assertEquals(thirdField, l.third());
+    assertEquals(fourthField, l.fourth());
+    pipeline.done();
+  }
+  
+  private void runTupleN(Pipeline pipeline, PTypeFamily typeFamily,
+      ColumnOrder[] orders, String[] fields) throws IOException {
+    String inputPath = FileHelper.createTempCopyOf("docs.txt");
+    
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PType[] types = new PType[orders.length];
+    Arrays.fill(types, typeFamily.strings());
+    PCollection<TupleN> kv = input.parallelDo(
+      new DoFn<String, TupleN>() {
+        @Override
+        public void process(String input, Emitter<TupleN> emitter) {
+          String[] split = input.split("[\t]+");
+          emitter.emit(new TupleN(split));
+        }
+    }, typeFamily.tuples(types));
+    PCollection<TupleN> sorted = Sort.sortTuples(kv, orders);
+    Iterable<TupleN> lines = sorted.materialize();
+    TupleN l = lines.iterator().next();
+    int i = 0;
+    for (String field : fields) {
+      assertEquals(field, l.get(i++));      
+    }
+    pipeline.done();
+  }
+
+  private void runTable(Pipeline pipeline, PTypeFamily typeFamily,
+      String firstKey) throws IOException {
+    String inputPath = FileHelper.createTempCopyOf("docs.txt");
+    
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PTable<String, String> table = input.parallelDo(
+        new DoFn<String, Pair<String, String>>() {
+          @Override
+          public void process(String input, Emitter<Pair<String, String>> emitter) {
+            String[] split = input.split("[\t]+");
+            emitter.emit(Pair.of(split[0], split[1]));
+          }
+      }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
+    
+    PTable<String, String> sorted = Sort.sort(table);
+    Iterable<Pair<String, String>> lines = sorted.materialize();
+    Pair<String, String> l = lines.iterator().next();
+    assertEquals(firstKey, l.first());
+    pipeline.done();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/lib/SpecificAvroGroupByTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/SpecificAvroGroupByTest.java b/crunch/src/test/java/org/apache/crunch/lib/SpecificAvroGroupByTest.java
new file mode 100644
index 0000000..1b83426
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/SpecificAvroGroupByTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.Person.Builder;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.avro.SafeAvroSerialization;
+import com.google.common.collect.Lists;
+
+/**
+ * Test {@link SafeAvroSerialization} with Specific Avro types
+ */
+public class SpecificAvroGroupByTest implements Serializable {
+
+	private static final long serialVersionUID = 1344118240353796561L;
+
+	private transient File avroFile;
+
+	@Before
+	public void setUp() throws IOException {
+		avroFile = File.createTempFile("avrotest", ".avro");
+	}
+
+	@After
+	public void tearDown() {
+		avroFile.delete();
+	}
+
+	@Test
+	public void testGrouByWithSpecificAvroType() throws Exception {
+
+		MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByTest.class);
+
+		testSpecificAvro(pipeline);
+	}
+
+  @Test
+  public void testGrouByOnSpecificAvroButReflectionDatumReader()
+      throws Exception {
+    MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByTest.class);
+
+    // https://issues.apache.org/jira/browse/AVRO-1046  resolves 
+    // the ClassCastException when reading specific Avro types with 
+    // ReflectDatumReader
+    
+    pipeline.getConfiguration().setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT,
+        true);
+
+    testSpecificAvro(pipeline);
+  }
+
+	public void testSpecificAvro(MRPipeline pipeline) throws Exception {
+
+		createPersonAvroFile(avroFile);
+
+		PCollection<Person> unsorted = pipeline.read(At.avroFile(
+				avroFile.getAbsolutePath(), Avros.records(Person.class)));
+
+		PTable<String, Person> sorted = unsorted
+				.parallelDo(new MapFn<Person, Pair<String, Person>>() {
+
+					@Override
+					public Pair<String, Person> map(Person input) {
+						String key = input.getName().toString();
+						return Pair.of(key, input);
+
+					}
+				}, Avros.tableOf(Avros.strings(), Avros.records(Person.class)))
+				.groupByKey().ungroup();
+
+		List<Pair<String, Person>> outputPersonList = Lists.newArrayList(sorted
+				.materialize());
+
+		assertEquals(1, outputPersonList.size());
+		assertEquals(String.class, outputPersonList.get(0).first().getClass());
+		assertEquals(Person.class, outputPersonList.get(0).second().getClass());
+		
+		pipeline.done();
+	}
+
+	private void createPersonAvroFile(File avroFile) throws IOException {
+
+		Builder person = Person.newBuilder();
+		person.setAge(40);
+		person.setName("Bob");
+		List<CharSequence> siblingNames = Lists.newArrayList();
+		siblingNames.add("Bob" + "1");
+		siblingNames.add("Bob" + "2");
+		person.setSiblingnames(siblingNames);
+
+		FileOutputStream outputStream = new FileOutputStream(avroFile);
+		SpecificDatumWriter<Person> writer = new SpecificDatumWriter<Person>(
+				Person.class);
+
+		DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(
+				writer);
+		dataFileWriter.create(Person.SCHEMA$, outputStream);
+		dataFileWriter.append(person.build());
+		dataFileWriter.close();
+		outputStream.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java b/crunch/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
new file mode 100644
index 0000000..51aa691
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.crunch.lib.join.JoinUtils.TupleWritablePartitioner;
+import org.apache.crunch.types.writable.TupleWritable;
+
+public class TupleWritablePartitionerTest {
+
+	private TupleWritablePartitioner tupleWritableParitioner;
+	
+	@Before
+	public void setUp(){
+		tupleWritableParitioner = new TupleWritablePartitioner();
+	}
+	
+	@Test
+	public void testGetPartition() {
+		IntWritable intWritable = new IntWritable(3);
+		TupleWritable key = new TupleWritable(new Writable[]{intWritable});
+		assertEquals(3, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
+		assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
+	}
+	
+	@Test
+	public void testGetPartition_NegativeHashValue(){
+		IntWritable intWritable = new IntWritable(-3);
+		// Sanity check, if this doesn't work then the premise of this test is wrong
+		assertEquals(-3, intWritable.hashCode());
+		
+		TupleWritable key = new TupleWritable(new Writable[]{intWritable});
+		assertEquals(3, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
+		assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
+	}
+	
+	@Test
+	public void testGetPartition_IntegerMinValue(){
+		IntWritable intWritable = new IntWritable(Integer.MIN_VALUE);
+		// Sanity check, if this doesn't work then the premise of this test is wrong
+		assertEquals(Integer.MIN_VALUE, intWritable.hashCode());
+		
+		
+		TupleWritable key = new TupleWritable(new Writable[]{intWritable});
+		assertEquals(0, tupleWritableParitioner.getPartition(key, NullWritable.get(), Integer.MAX_VALUE));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/lib/join/FullOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/join/FullOuterJoinTest.java b/crunch/src/test/java/org/apache/crunch/lib/join/FullOuterJoinTest.java
new file mode 100644
index 0000000..a188f7f
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/join/FullOuterJoinTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTypeFamily;
+
+public class FullOuterJoinTest extends JoinTester {
+  @Override
+  public void assertPassed(Iterable<Pair<String, Long>> lines) {
+    boolean passed1 = false;
+    boolean passed2 = false;
+    boolean passed3 = false;
+    for (Pair<String, Long> line : lines) {
+      if ("wretched".equals(line.first()) && 24 == line.second()) {
+        passed1 = true;
+      }
+      if ("againe".equals(line.first()) && 10 == line.second()) {
+        passed2 = true;
+      }
+      if ("Montparnasse.".equals(line.first()) && 2 == line.second()) {
+        passed3 = true;
+      }
+    }
+    assertTrue(passed1);
+    assertTrue(passed2);
+    assertTrue(passed3);
+  }
+
+  @Override
+  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+    return new FullOuterJoinFn<String, Long, Long>(typeFamily.longs());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/lib/join/InnerJoinTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/join/InnerJoinTest.java b/crunch/src/test/java/org/apache/crunch/lib/join/InnerJoinTest.java
new file mode 100644
index 0000000..3b2b334
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/join/InnerJoinTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTypeFamily;
+
+public class InnerJoinTest extends JoinTester {
+  @Override
+  public void assertPassed(Iterable<Pair<String, Long>> lines) {
+    boolean passed1 = false;
+    boolean passed2 = true;
+    boolean passed3 = true;
+    for (Pair<String, Long> line : lines) {
+      if ("wretched".equals(line.first()) && 24 == line.second()) {
+        passed1 = true;
+      }
+      if ("againe".equals(line.first())) {
+        passed2 = false;
+      }
+      if ("Montparnasse.".equals(line.first())) {
+        passed3 = false;
+      }
+    }
+    assertTrue(passed1);
+    assertTrue(passed2);
+    assertTrue(passed3);
+  }
+
+  @Override
+  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+    return new InnerJoinFn<String, Long, Long>(typeFamily.longs());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/lib/join/JoinTester.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/join/JoinTester.java b/crunch/src/test/java/org/apache/crunch/lib/join/JoinTester.java
new file mode 100644
index 0000000..c3e0034
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/join/JoinTester.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.junit.Test;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.lib.Join;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+
+public abstract class JoinTester implements Serializable {
+  private static class WordSplit extends DoFn<String, String> {
+    @Override
+    public void process(String input, Emitter<String> emitter) {
+      for (String word : input.split("\\s+")) {
+        emitter.emit(word);
+      }
+    }
+  }
+
+  protected PTable<String, Long> join(PCollection<String> w1, PCollection<String> w2,
+        PTypeFamily ptf) {
+    PTableType<String, Long> ntt = ptf.tableOf(ptf.strings(), ptf.longs());
+    PTable<String, Long> ws1 = Aggregate.count(w1.parallelDo("ws1", new WordSplit(), ptf.strings()));
+    PTable<String, Long> ws2 = Aggregate.count(w2.parallelDo("ws2", new WordSplit(), ptf.strings()));
+
+    PTable<String, Pair<Long, Long>> join = Join.join(ws1, ws2, getJoinFn(ptf));
+
+    PTable<String, Long> sums = join.parallelDo("cnt",
+        new DoFn<Pair<String, Pair<Long, Long>>, Pair<String, Long>>() {
+          @Override
+          public void process(Pair<String, Pair<Long, Long>> input,
+                              Emitter<Pair<String, Long>> emitter) {
+            Pair<Long, Long> pair = input.second();
+            long sum = (pair.first() != null ? pair.first() : 0) + (pair.second() != null ? pair.second() : 0);
+            emitter.emit(Pair.of(input.first(), sum));
+          }
+        }, ntt);
+
+    return sums;
+  }
+
+  protected void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+    String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
+    String maughamInputPath = FileHelper.createTempCopyOf("maugham.txt");
+
+    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
+    PCollection<String> maugham = pipeline.readTextFile(maughamInputPath);
+    PTable<String, Long> joined = join(shakespeare, maugham, typeFamily);
+    Iterable<Pair<String, Long>> lines = joined.materialize();
+
+    assertPassed(lines);
+
+    pipeline.done();
+  }
+
+  @Test
+   public void testWritableJoin() throws Exception {
+    run(new MRPipeline(InnerJoinTest.class), WritableTypeFamily.getInstance());
+  }
+  
+  @Test
+  public void testAvroJoin() throws Exception {
+    run(new MRPipeline(InnerJoinTest.class), AvroTypeFamily.getInstance());
+  }
+
+  /**
+   * Used to check that the result of the join makes sense.
+   *
+   * @param lines The result of the join.
+   */
+  public abstract void assertPassed(Iterable<Pair<String, Long>> lines);
+
+  /**
+   * @return The JoinFn to use.
+   */
+  protected abstract JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily);
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/lib/join/LeftOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/join/LeftOuterJoinTest.java b/crunch/src/test/java/org/apache/crunch/lib/join/LeftOuterJoinTest.java
new file mode 100644
index 0000000..e8e3190
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/join/LeftOuterJoinTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTypeFamily;
+
+public class LeftOuterJoinTest extends JoinTester {
+  @Override
+  public void assertPassed(Iterable<Pair<String, Long>> lines) {
+    boolean passed1 = false;
+    boolean passed2 = false;
+    boolean passed3 = true;
+    for (Pair<String, Long> line : lines) {
+      if ("wretched".equals(line.first()) && 24 == line.second()) {
+        passed1 = true;
+      }
+      if ("againe".equals(line.first()) && 10 == line.second()) {
+        passed2 = true;
+      }
+      if ("Montparnasse.".equals(line.first())) {
+        passed3 = false;
+      }
+    }
+    assertTrue(passed1);
+    assertTrue(passed2);
+    assertTrue(passed3);
+  }
+
+  @Override
+  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+    return new LeftOuterJoinFn<String, Long, Long>(typeFamily.longs());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/lib/join/MapsideJoinTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/join/MapsideJoinTest.java b/crunch/src/test/java/org/apache/crunch/lib/join/MapsideJoinTest.java
new file mode 100644
index 0000000..ed1d745
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/join/MapsideJoinTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.crunch.FilterFn;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.writable.Writables;
+import com.google.common.collect.Lists;
+
+public class MapsideJoinTest {
+
+  private static class LineSplitter extends MapFn<String, Pair<Integer, String>> {
+
+    @Override
+    public Pair<Integer, String> map(String input) {
+      String[] fields = input.split("\\|");
+      return Pair.of(Integer.parseInt(fields[0]), fields[1]);
+    }
+
+  }
+
+  private static class NegativeFilter extends FilterFn<Pair<Integer, String>> {
+
+    @Override
+    public boolean accept(Pair<Integer, String> input) {
+      return false;
+    }
+
+  }
+
+  @Test(expected = CrunchRuntimeException.class)
+  public void testNonMapReducePipeline() {
+    runMapsideJoin(MemPipeline.getInstance());
+  }
+
+  @Test
+  public void testMapsideJoin_RightSideIsEmpty() throws IOException {
+    MRPipeline pipeline = new MRPipeline(MapsideJoinTest.class);
+    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+    PTable<Integer, String> filteredOrderTable = orderTable.parallelDo(new NegativeFilter(),
+        orderTable.getPTableType());
+
+    PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(customerTable,
+        filteredOrderTable);
+
+    List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined
+        .materialize());
+
+    assertTrue(materializedJoin.isEmpty());
+
+  }
+
+  @Test
+  public void testMapsideJoin() throws IOException {
+    runMapsideJoin(new MRPipeline(MapsideJoinTest.class));
+  }
+
+  private void runMapsideJoin(Pipeline pipeline) {
+    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+    PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(customerTable, orderTable);
+
+    List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
+    expectedJoinResult.add(Pair.of(111, Pair.of("John Doe", "Corn flakes")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet paper")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet plunger")));
+    expectedJoinResult.add(Pair.of(333, Pair.of("Someone Else", "Toilet brush")));
+
+    List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(joined
+        .materialize());
+    Collections.sort(joinedResultList);
+
+    assertEquals(expectedJoinResult, joinedResultList);
+  }
+
+  private static PTable<Integer, String> readTable(Pipeline pipeline, String filename) {
+    try {
+      return pipeline.readTextFile(FileHelper.createTempCopyOf(filename)).parallelDo("asTable",
+          new LineSplitter(), Writables.tableOf(Writables.ints(), Writables.strings()));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinTest.java b/crunch/src/test/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinTest.java
new file mode 100644
index 0000000..7143168
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.apache.crunch.types.avro.Avros.records;
+import static org.apache.crunch.types.avro.Avros.strings;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.Employee;
+import org.apache.crunch.test.Person;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class MultiAvroSchemaJoinTest {
+
+  private File personFile;
+  private File employeeFile;
+  
+  @Before
+  public void setUp() throws Exception {
+    this.personFile = File.createTempFile("person", ".avro");
+    this.employeeFile = File.createTempFile("employee", ".avro");
+
+    DatumWriter<Person> pdw = new SpecificDatumWriter<Person>();
+    DataFileWriter<Person> pfw = new DataFileWriter<Person>(pdw);
+    pfw.create(Person.SCHEMA$, personFile);
+    Person p1 = new Person();
+    p1.setName("Josh");
+    p1.setAge(19);
+    p1.setSiblingnames(ImmutableList.<CharSequence>of("Kate", "Mike"));
+    pfw.append(p1);
+    Person p2 = new Person();
+    p2.setName("Kate");
+    p2.setAge(17);
+    p2.setSiblingnames(ImmutableList.<CharSequence>of("Josh", "Mike"));
+    pfw.append(p2);
+    Person p3 = new Person();
+    p3.setName("Mike");
+    p3.setAge(12);
+    p3.setSiblingnames(ImmutableList.<CharSequence>of("Josh", "Kate"));
+    pfw.append(p3);
+    pfw.close();
+    
+    DatumWriter<Employee> edw = new SpecificDatumWriter<Employee>();
+    DataFileWriter<Employee> efw = new DataFileWriter<Employee>(edw);
+    efw.create(Employee.SCHEMA$, employeeFile);
+    Employee e1 = new Employee();
+    e1.setName("Kate");
+    e1.setSalary(100000);
+    e1.setDepartment("Marketing");
+    efw.append(e1);
+    efw.close();
+  }
+  
+  @After
+  public void tearDown() throws Exception {
+    personFile.delete();
+    employeeFile.delete();
+  }
+  
+  public static class NameFn<K extends SpecificRecord> extends MapFn<K, String> {
+    @Override
+    public String map(K input) {
+      Schema s = input.getSchema();
+      Schema.Field f = s.getField("name");
+      return input.get(f.pos()).toString();
+    }
+  }
+  
+  @Test
+  public void testJoin() throws Exception {
+     Pipeline p = new MRPipeline(MultiAvroSchemaJoinTest.class);
+     PCollection<Person> people = p.read(From.avroFile(personFile.getAbsolutePath(), records(Person.class)));
+     PCollection<Employee> employees = p.read(From.avroFile(employeeFile.getAbsolutePath(), records(Employee.class)));
+     
+     Iterable<Pair<Person, Employee>> result = people.by(new NameFn<Person>(), strings())
+         .join(employees.by(new NameFn<Employee>(), strings())).values().materialize();
+     List<Pair<Person, Employee>> v = Lists.newArrayList(result);
+     assertEquals(1, v.size());
+     assertEquals("Kate", v.get(0).first().getName().toString());
+     assertEquals("Kate", v.get(0).second().getName().toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/lib/join/RightOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/join/RightOuterJoinTest.java b/crunch/src/test/java/org/apache/crunch/lib/join/RightOuterJoinTest.java
new file mode 100644
index 0000000..59a21b1
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/join/RightOuterJoinTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTypeFamily;
+
+public class RightOuterJoinTest extends JoinTester {
+  @Override
+  public void assertPassed(Iterable<Pair<String, Long>> lines) {
+    boolean passed1 = false;
+    boolean passed2 = true;
+    boolean passed3 = false;
+    for (Pair<String, Long> line : lines) {
+      if ("wretched".equals(line.first()) && 24 == line.second()) {
+        passed1 = true;
+      }
+      if ("againe".equals(line.first())) {
+        passed2 = false;
+      }
+      if ("Montparnasse.".equals(line.first()) && 2 == line.second()) {
+        passed3 = true;
+      }
+    }
+    assertTrue(passed1);
+    assertTrue(passed2);
+    assertTrue(passed3);
+  }
+
+  @Override
+  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+    return new RightOuterJoinFn<String, Long, Long>(typeFamily.longs());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/test/CountersTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/test/CountersTest.java b/crunch/src/test/java/org/apache/crunch/test/CountersTest.java
new file mode 100644
index 0000000..32a15f2
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/test/CountersTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.After;
+import org.junit.Test;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+
+/**
+ * A test to verify using counters inside of a unit test works. :)
+ */
+public class CountersTest {
+
+  public enum CT { ONE, TWO, THREE };
+  
+  @After
+  public void after() {
+	  TestCounters.clearCounters();
+  }
+  
+  public static class CTFn extends DoFn<String, String> {
+    @Override
+    public void process(String input, Emitter<String> emitter) {
+      getCounter(CT.ONE).increment(1);
+      getCounter(CT.TWO).increment(4);
+      getCounter(CT.THREE).increment(7);
+    }    
+  }
+  
+  @Test public void test() {
+    CTFn fn = new CTFn();
+    fn.process("foo", null);
+    fn.process("bar", null);
+    assertEquals(2L, TestCounters.getCounter(CT.ONE).getValue());
+    assertEquals(8L, TestCounters.getCounter(CT.TWO).getValue());
+    assertEquals(14L, TestCounters.getCounter(CT.THREE).getValue());
+  }
+  
+  @Test public void secondTest() {
+    CTFn fn = new CTFn();
+    fn.process("foo", null);
+    fn.process("bar", null);
+    assertEquals(2L, TestCounters.getCounter(CT.ONE).getValue());
+    assertEquals(8L, TestCounters.getCounter(CT.TWO).getValue());
+    assertEquals(14L, TestCounters.getCounter(CT.THREE).getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/test/Employee.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/test/Employee.java b/crunch/src/test/java/org/apache/crunch/test/Employee.java
new file mode 100644
index 0000000..1a40002
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/test/Employee.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;  
+@SuppressWarnings("all")
+public class Employee extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Employee\",\"namespace\":\"org.apache.crunch.test\",\"fields\":[{\"name\":\"name\",\"type\":[\"string\",\"null\"]},{\"name\":\"salary\",\"type\":\"int\"},{\"name\":\"department\",\"type\":[\"string\",\"null\"]}]}");
+  @Deprecated public java.lang.CharSequence name;
+  @Deprecated public int salary;
+  @Deprecated public java.lang.CharSequence department;
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call. 
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return name;
+    case 1: return salary;
+    case 2: return department;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call. 
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: name = (java.lang.CharSequence)value$; break;
+    case 1: salary = (java.lang.Integer)value$; break;
+    case 2: department = (java.lang.CharSequence)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Gets the value of the 'name' field.
+   */
+  public java.lang.CharSequence getName() {
+    return name;
+  }
+
+  /**
+   * Sets the value of the 'name' field.
+   * @param value the value to set.
+   */
+  public void setName(java.lang.CharSequence value) {
+    this.name = value;
+  }
+
+  /**
+   * Gets the value of the 'salary' field.
+   */
+  public java.lang.Integer getSalary() {
+    return salary;
+  }
+
+  /**
+   * Sets the value of the 'salary' field.
+   * @param value the value to set.
+   */
+  public void setSalary(java.lang.Integer value) {
+    this.salary = value;
+  }
+
+  /**
+   * Gets the value of the 'department' field.
+   */
+  public java.lang.CharSequence getDepartment() {
+    return department;
+  }
+
+  /**
+   * Sets the value of the 'department' field.
+   * @param value the value to set.
+   */
+  public void setDepartment(java.lang.CharSequence value) {
+    this.department = value;
+  }
+
+  /** Creates a new Employee RecordBuilder */
+  public static org.apache.crunch.test.Employee.Builder newBuilder() {
+    return new org.apache.crunch.test.Employee.Builder();
+  }
+  
+  /** Creates a new Employee RecordBuilder by copying an existing Builder */
+  public static org.apache.crunch.test.Employee.Builder newBuilder(org.apache.crunch.test.Employee.Builder other) {
+    return new org.apache.crunch.test.Employee.Builder(other);
+  }
+  
+  /** Creates a new Employee RecordBuilder by copying an existing Employee instance */
+  public static org.apache.crunch.test.Employee.Builder newBuilder(org.apache.crunch.test.Employee other) {
+    return new org.apache.crunch.test.Employee.Builder(other);
+  }
+  
+  /**
+   * RecordBuilder for Employee instances.
+   */
+  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Employee>
+    implements org.apache.avro.data.RecordBuilder<Employee> {
+
+    private java.lang.CharSequence name;
+    private int salary;
+    private java.lang.CharSequence department;
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(org.apache.crunch.test.Employee.SCHEMA$);
+    }
+    
+    /** Creates a Builder by copying an existing Builder */
+    private Builder(org.apache.crunch.test.Employee.Builder other) {
+      super(other);
+    }
+    
+    /** Creates a Builder by copying an existing Employee instance */
+    private Builder(org.apache.crunch.test.Employee other) {
+            super(org.apache.crunch.test.Employee.SCHEMA$);
+      if (isValidValue(fields()[0], other.name)) {
+        this.name = (java.lang.CharSequence) data().deepCopy(fields()[0].schema(), other.name);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.salary)) {
+        this.salary = (java.lang.Integer) data().deepCopy(fields()[1].schema(), other.salary);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.department)) {
+        this.department = (java.lang.CharSequence) data().deepCopy(fields()[2].schema(), other.department);
+        fieldSetFlags()[2] = true;
+      }
+    }
+
+    /** Gets the value of the 'name' field */
+    public java.lang.CharSequence getName() {
+      return name;
+    }
+    
+    /** Sets the value of the 'name' field */
+    public org.apache.crunch.test.Employee.Builder setName(java.lang.CharSequence value) {
+      validate(fields()[0], value);
+      this.name = value;
+      fieldSetFlags()[0] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'name' field has been set */
+    public boolean hasName() {
+      return fieldSetFlags()[0];
+    }
+    
+    /** Clears the value of the 'name' field */
+    public org.apache.crunch.test.Employee.Builder clearName() {
+      name = null;
+      fieldSetFlags()[0] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'salary' field */
+    public java.lang.Integer getSalary() {
+      return salary;
+    }
+    
+    /** Sets the value of the 'salary' field */
+    public org.apache.crunch.test.Employee.Builder setSalary(int value) {
+      validate(fields()[1], value);
+      this.salary = value;
+      fieldSetFlags()[1] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'salary' field has been set */
+    public boolean hasSalary() {
+      return fieldSetFlags()[1];
+    }
+    
+    /** Clears the value of the 'salary' field */
+    public org.apache.crunch.test.Employee.Builder clearSalary() {
+      fieldSetFlags()[1] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'department' field */
+    public java.lang.CharSequence getDepartment() {
+      return department;
+    }
+    
+    /** Sets the value of the 'department' field */
+    public org.apache.crunch.test.Employee.Builder setDepartment(java.lang.CharSequence value) {
+      validate(fields()[2], value);
+      this.department = value;
+      fieldSetFlags()[2] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'department' field has been set */
+    public boolean hasDepartment() {
+      return fieldSetFlags()[2];
+    }
+    
+    /** Clears the value of the 'department' field */
+    public org.apache.crunch.test.Employee.Builder clearDepartment() {
+      department = null;
+      fieldSetFlags()[2] = false;
+      return this;
+    }
+
+    @Override
+    public Employee build() {
+      try {
+        Employee record = new Employee();
+        record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
+        record.salary = fieldSetFlags()[1] ? this.salary : (java.lang.Integer) defaultValue(fields()[1]);
+        record.department = fieldSetFlags()[2] ? this.department : (java.lang.CharSequence) defaultValue(fields()[2]);
+        return record;
+      } catch (Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/test/Person.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/test/Person.java b/crunch/src/test/java/org/apache/crunch/test/Person.java
new file mode 100644
index 0000000..aa662e3
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/test/Person.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;  
+@SuppressWarnings("all")
+public class Person extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Person\",\"namespace\":\"org.apache.crunch.test\",\"fields\":[{\"name\":\"name\",\"type\":[\"string\",\"null\"]},{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"siblingnames\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}");
+  @Deprecated public java.lang.CharSequence name;
+  @Deprecated public int age;
+  @Deprecated public java.util.List<java.lang.CharSequence> siblingnames;
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call. 
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return name;
+    case 1: return age;
+    case 2: return siblingnames;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call. 
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: name = (java.lang.CharSequence)value$; break;
+    case 1: age = (java.lang.Integer)value$; break;
+    case 2: siblingnames = (java.util.List<java.lang.CharSequence>)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Gets the value of the 'name' field.
+   */
+  public java.lang.CharSequence getName() {
+    return name;
+  }
+
+  /**
+   * Sets the value of the 'name' field.
+   * @param value the value to set.
+   */
+  public void setName(java.lang.CharSequence value) {
+    this.name = value;
+  }
+
+  /**
+   * Gets the value of the 'age' field.
+   */
+  public java.lang.Integer getAge() {
+    return age;
+  }
+
+  /**
+   * Sets the value of the 'age' field.
+   * @param value the value to set.
+   */
+  public void setAge(java.lang.Integer value) {
+    this.age = value;
+  }
+
+  /**
+   * Gets the value of the 'siblingnames' field.
+   */
+  public java.util.List<java.lang.CharSequence> getSiblingnames() {
+    return siblingnames;
+  }
+
+  /**
+   * Sets the value of the 'siblingnames' field.
+   * @param value the value to set.
+   */
+  public void setSiblingnames(java.util.List<java.lang.CharSequence> value) {
+    this.siblingnames = value;
+  }
+
+  /** Creates a new Person RecordBuilder */
+  public static org.apache.crunch.test.Person.Builder newBuilder() {
+    return new org.apache.crunch.test.Person.Builder();
+  }
+  
+  /** Creates a new Person RecordBuilder by copying an existing Builder */
+  public static org.apache.crunch.test.Person.Builder newBuilder(org.apache.crunch.test.Person.Builder other) {
+    return new org.apache.crunch.test.Person.Builder(other);
+  }
+  
+  /** Creates a new Person RecordBuilder by copying an existing Person instance */
+  public static org.apache.crunch.test.Person.Builder newBuilder(org.apache.crunch.test.Person other) {
+    return new org.apache.crunch.test.Person.Builder(other);
+  }
+  
+  /**
+   * RecordBuilder for Person instances.
+   */
+  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Person>
+    implements org.apache.avro.data.RecordBuilder<Person> {
+
+    private java.lang.CharSequence name;
+    private int age;
+    private java.util.List<java.lang.CharSequence> siblingnames;
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(org.apache.crunch.test.Person.SCHEMA$);
+    }
+    
+    /** Creates a Builder by copying an existing Builder */
+    private Builder(org.apache.crunch.test.Person.Builder other) {
+      super(other);
+    }
+    
+    /** Creates a Builder by copying an existing Person instance */
+    private Builder(org.apache.crunch.test.Person other) {
+            super(org.apache.crunch.test.Person.SCHEMA$);
+      if (isValidValue(fields()[0], other.name)) {
+        this.name = (java.lang.CharSequence) data().deepCopy(fields()[0].schema(), other.name);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.age)) {
+        this.age = (java.lang.Integer) data().deepCopy(fields()[1].schema(), other.age);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.siblingnames)) {
+        this.siblingnames = (java.util.List<java.lang.CharSequence>) data().deepCopy(fields()[2].schema(), other.siblingnames);
+        fieldSetFlags()[2] = true;
+      }
+    }
+
+    /** Gets the value of the 'name' field */
+    public java.lang.CharSequence getName() {
+      return name;
+    }
+    
+    /** Sets the value of the 'name' field */
+    public org.apache.crunch.test.Person.Builder setName(java.lang.CharSequence value) {
+      validate(fields()[0], value);
+      this.name = value;
+      fieldSetFlags()[0] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'name' field has been set */
+    public boolean hasName() {
+      return fieldSetFlags()[0];
+    }
+    
+    /** Clears the value of the 'name' field */
+    public org.apache.crunch.test.Person.Builder clearName() {
+      name = null;
+      fieldSetFlags()[0] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'age' field */
+    public java.lang.Integer getAge() {
+      return age;
+    }
+    
+    /** Sets the value of the 'age' field */
+    public org.apache.crunch.test.Person.Builder setAge(int value) {
+      validate(fields()[1], value);
+      this.age = value;
+      fieldSetFlags()[1] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'age' field has been set */
+    public boolean hasAge() {
+      return fieldSetFlags()[1];
+    }
+    
+    /** Clears the value of the 'age' field */
+    public org.apache.crunch.test.Person.Builder clearAge() {
+      fieldSetFlags()[1] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'siblingnames' field */
+    public java.util.List<java.lang.CharSequence> getSiblingnames() {
+      return siblingnames;
+    }
+    
+    /** Sets the value of the 'siblingnames' field */
+    public org.apache.crunch.test.Person.Builder setSiblingnames(java.util.List<java.lang.CharSequence> value) {
+      validate(fields()[2], value);
+      this.siblingnames = value;
+      fieldSetFlags()[2] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'siblingnames' field has been set */
+    public boolean hasSiblingnames() {
+      return fieldSetFlags()[2];
+    }
+    
+    /** Clears the value of the 'siblingnames' field */
+    public org.apache.crunch.test.Person.Builder clearSiblingnames() {
+      siblingnames = null;
+      fieldSetFlags()[2] = false;
+      return this;
+    }
+
+    @Override
+    public Person build() {
+      try {
+        Person record = new Person();
+        record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
+        record.age = fieldSetFlags()[1] ? this.age : (java.lang.Integer) defaultValue(fields()[1]);
+        record.siblingnames = fieldSetFlags()[2] ? this.siblingnames : (java.util.List<java.lang.CharSequence>) defaultValue(fields()[2]);
+        return record;
+      } catch (Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java b/crunch/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java
new file mode 100644
index 0000000..4020691
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.Collection;
+
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.crunch.types.writable.Writables;
+
+public class PTypeUtilsTest {
+  @Test
+  public void testPrimitives() {
+    assertEquals(Avros.strings(), AvroTypeFamily.getInstance().as(Writables.strings()));
+    Assert.assertEquals(Writables.doubles(), WritableTypeFamily.getInstance().as(Avros.doubles()));
+  }
+  
+  @Test
+  public void testTuple3() {
+    PType<Tuple3<String, Float, Integer>> t = Writables.triples(Writables.strings(),
+        Writables.floats(), Writables.ints());
+    PType<Tuple3<String, Float, Integer>> at = AvroTypeFamily.getInstance().as(t);
+    assertEquals(Avros.strings(), at.getSubTypes().get(0));
+    assertEquals(Avros.floats(), at.getSubTypes().get(1));
+    assertEquals(Avros.ints(), at.getSubTypes().get(2));
+  }
+
+  @Test
+  public void testTupleN() {
+    PType<TupleN> t = Avros.tuples(Avros.strings(),
+        Avros.floats(), Avros.ints());
+    PType<TupleN> wt = WritableTypeFamily.getInstance().as(t);
+    assertEquals(Writables.strings(), wt.getSubTypes().get(0));
+    assertEquals(Writables.floats(), wt.getSubTypes().get(1));
+    assertEquals(Writables.ints(), wt.getSubTypes().get(2));
+  }
+
+  @Test
+  public void testWritableCollections() {
+    PType<Collection<String>> t = Avros.collections(Avros.strings());
+    t = WritableTypeFamily.getInstance().as(t);
+    assertEquals(Writables.strings(), t.getSubTypes().get(0));
+  }
+
+  @Test
+  public void testAvroCollections() {
+    PType<Collection<Double>> t = Writables.collections(Writables.doubles());
+    t = AvroTypeFamily.getInstance().as(t);
+    assertEquals(Avros.doubles(), t.getSubTypes().get(0));
+  }
+
+  @Test
+  public void testAvroRegistered() {
+    AvroType<Utf8> at = new AvroType<Utf8>(Utf8.class, Schema.create(Schema.Type.STRING));
+    Avros.register(Utf8.class, at);
+    assertEquals(at, Avros.records(Utf8.class));
+  }
+  
+  @Test
+  public void testWritableBuiltin() {
+    assertNotNull(Writables.records(Text.class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
new file mode 100644
index 0000000..d442e9c
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+
+import org.apache.avro.generic.GenericData.Record;
+import org.junit.Test;
+
+import org.apache.crunch.test.Person;
+import org.apache.crunch.types.avro.AvroDeepCopier.AvroSpecificDeepCopier;
+import com.google.common.collect.Lists;
+
+public class AvroDeepCopierTest {
+
+  @Test
+  public void testDeepCopySpecific() {
+    Person person = new Person();
+    person.setName("John Doe");
+    person.setAge(42);
+    person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+    Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$)
+        .deepCopy(person);
+
+    assertEquals(person, deepCopyPerson);
+    assertNotSame(person, deepCopyPerson);
+  }
+
+  @Test
+  public void testDeepCopyGeneric() {
+    Record record = new Record(Person.SCHEMA$);
+    record.put("name", "John Doe");
+    record.put("age", 42);
+    record.put("siblingnames", Lists.newArrayList());
+
+    Record deepCopyRecord = new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$)
+        .deepCopy(record);
+
+    assertEquals(record, deepCopyRecord);
+    assertNotSame(record, deepCopyRecord);
+  }
+
+  @Test
+  public void testDeepCopyReflect() {
+    Person person = new Person();
+    person.setName("John Doe");
+    person.setAge(42);
+    person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+    Person deepCopyPerson = new AvroDeepCopier.AvroReflectDeepCopier<Person>(Person.class,
+        Person.SCHEMA$).deepCopy(person);
+
+    assertEquals(person, deepCopyPerson);
+    assertNotSame(person, deepCopyPerson);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/types/avro/AvroGroupedTableTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroGroupedTableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroGroupedTableTypeTest.java
new file mode 100644
index 0000000..48b1d38
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroGroupedTableTypeTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.types.PGroupedTableType;
+import com.google.common.collect.Lists;
+
+public class AvroGroupedTableTypeTest {
+
+  @Test
+  public void testGetDetachedValue() {
+    Integer integerValue = 42;
+    Person person = new Person();
+    person.setName("John Doe");
+    person.setAge(42);
+    person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+    Iterable<Person> inputPersonIterable = Lists.newArrayList(person);
+    Pair<Integer, Iterable<Person>> pair = Pair.of(integerValue, inputPersonIterable);
+
+    PGroupedTableType<Integer, Person> groupedTableType = Avros.tableOf(Avros.ints(),
+        Avros.reflects(Person.class)).getGroupedTableType();
+
+    Pair<Integer, Iterable<Person>> detachedPair = groupedTableType.getDetachedValue(pair);
+
+    assertSame(integerValue, detachedPair.first());
+    List<Person> personList = Lists.newArrayList(detachedPair.second());
+    assertEquals(inputPersonIterable, personList);
+    assertNotSame(person, personList.get(0));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java
new file mode 100644
index 0000000..fa9da1a
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import org.junit.Test;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.test.Person;
+import com.google.common.collect.Lists;
+
+public class AvroTableTypeTest {
+
+  @Test
+  public void testGetDetachedValue() {
+    Integer integerValue = 42;
+    Person person = new Person();
+    person.setName("John Doe");
+    person.setAge(42);
+    person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+    Pair<Integer, Person> pair = Pair.of(integerValue, person);
+
+    AvroTableType<Integer, Person> tableType = Avros.tableOf(Avros.ints(),
+        Avros.reflects(Person.class));
+
+    Pair<Integer, Person> detachedPair = tableType.getDetachedValue(pair);
+
+    assertSame(integerValue, detachedPair.first());
+    assertEquals(person, detachedPair.second());
+    assertNotSame(person, detachedPair.second());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
new file mode 100644
index 0000000..2bebca1
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.types.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.junit.Test;
+
+import org.apache.crunch.test.Person;
+import com.google.common.collect.Lists;
+
+public class AvroTypeTest {
+
+	@Test
+	public void testIsSpecific_SpecificData() {
+		assertTrue(Avros.records(Person.class).isSpecific());
+	}
+
+	@Test
+	public void testIsGeneric_SpecificData() {
+		assertFalse(Avros.records(Person.class).isGeneric());
+	}
+
+	@Test
+	public void testIsSpecific_GenericData() {
+		assertFalse(Avros.generics(Person.SCHEMA$).isSpecific());
+	}
+
+	@Test
+	public void testIsGeneric_GenericData() {
+		assertTrue(Avros.generics(Person.SCHEMA$).isGeneric());
+	}
+
+	@Test
+	public void testIsSpecific_NonAvroClass() {
+		assertFalse(Avros.ints().isSpecific());
+	}
+
+	@Test
+	public void testIsGeneric_NonAvroClass() {
+		assertFalse(Avros.ints().isGeneric());
+	}
+
+	@Test
+	public void testIsSpecific_SpecificAvroTable() {
+		assertTrue(Avros.tableOf(Avros.strings(), Avros.records(Person.class))
+				.isSpecific());
+	}
+
+	@Test
+	public void testIsGeneric_SpecificAvroTable() {
+		assertFalse(Avros.tableOf(Avros.strings(), Avros.records(Person.class))
+				.isGeneric());
+	}
+
+	@Test
+	public void testIsSpecific_GenericAvroTable() {
+		assertFalse(Avros.tableOf(Avros.strings(),
+				Avros.generics(Person.SCHEMA$)).isSpecific());
+	}
+
+	@Test
+	public void testIsGeneric_GenericAvroTable() {
+		assertTrue(Avros.tableOf(Avros.strings(),
+				Avros.generics(Person.SCHEMA$)).isGeneric());
+	}
+
+  @Test
+  public void testGetDetachedValue_AlreadyMappedAvroType() {
+    Integer value = 42;
+    Integer detachedValue = Avros.ints().getDetachedValue(value);
+    assertSame(value, detachedValue);
+  }
+
+  @Test
+  public void testGetDetachedValue_GenericAvroType() {
+    AvroType<Record> genericType = Avros.generics(Person.SCHEMA$);
+    GenericData.Record record = new GenericData.Record(Person.SCHEMA$);
+    record.put("name", "name value");
+    record.put("age", 42);
+    record.put("siblingnames", Lists.newArrayList());
+
+    Record detachedRecord = genericType.getDetachedValue(record);
+    assertEquals(record, detachedRecord);
+    assertNotSame(record, detachedRecord);
+  }
+
+  @Test
+  public void testGetDetachedValue_SpecificAvroType() {
+    AvroType<Person> specificType = Avros.records(Person.class);
+    Person person = new Person();
+    person.setName("name value");
+    person.setAge(42);
+    person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+    Person detachedPerson = specificType.getDetachedValue(person);
+    assertEquals(person, detachedPerson);
+    assertNotSame(person, detachedPerson);
+  }
+
+  @Test
+  public void testGetDetachedValue_ReflectAvroType() {
+    AvroType<Person> reflectType = Avros.reflects(Person.class);
+    Person person = new Person();
+    person.setName("name value");
+    person.setAge(42);
+    person.setSiblingnames(Lists.<CharSequence> newArrayList());
+
+    Person detachedPerson = reflectType.getDetachedValue(person);
+    assertEquals(person, detachedPerson);
+    assertNotSame(person, detachedPerson);
+  }
+
+}


Mime
View raw message