incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [10/19] CRUNCH-17: Split out Crunch integration tests. Contributed by Matthias Friedrich.
Date Sat, 14 Jul 2012 17:28:53 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java b/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
new file mode 100644
index 0000000..e2fa9a2
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTableKeyValueIT;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import com.google.common.collect.Lists;
+
+@RunWith(value = Parameterized.class)
+public class UnionCollectionIT {
+
+	private static final Log LOG = LogFactory.getLog(UnionCollectionIT.class);
+
+	private PTypeFamily typeFamily;
+	private Pipeline pipeline;
+	private PCollection<String> union;
+
+	private ArrayList<String> EXPECTED = Lists.newArrayList("a", "a", "b", "c", "c", "d", "e");
+
+	@Before
+	@SuppressWarnings("unchecked")
+	public void setUp() throws IOException {
+		String inputFile1 = FileHelper.createTempCopyOf("set1.txt");
+		String inputFile2 = FileHelper.createTempCopyOf("set2.txt");
+
+		PCollection<String> firstCollection = pipeline.read(At.textFile(inputFile1,
+				typeFamily.strings()));
+		PCollection<String> secondCollection = pipeline.read(At.textFile(inputFile2,
+				typeFamily.strings()));
+
+		LOG.info("Test fixture: [" + pipeline.getClass().getSimpleName() + " : "
+				+ typeFamily.getClass().getSimpleName() + "]  First: "
+				+ Lists.newArrayList(firstCollection.materialize().iterator()) + ", Second: "
+				+ Lists.newArrayList(secondCollection.materialize().iterator()));
+
+		union = secondCollection.union(firstCollection);
+	}
+
+	@After
+	public void tearDown() {
+		pipeline.done();
+	}
+
+	@Parameters
+	public static Collection<Object[]> data() throws IOException {
+		Object[][] data = new Object[][] {
+				{ WritableTypeFamily.getInstance(), new MRPipeline(PTableKeyValueIT.class) },
+				{ WritableTypeFamily.getInstance(), MemPipeline.getInstance() },
+				{ AvroTypeFamily.getInstance(), new MRPipeline(PTableKeyValueIT.class) },
+				{ AvroTypeFamily.getInstance(), MemPipeline.getInstance() } };
+		return Arrays.asList(data);
+	}
+
+	public UnionCollectionIT(PTypeFamily typeFamily, Pipeline pipeline) {
+		this.typeFamily = typeFamily;
+		this.pipeline = pipeline;
+	}
+
+	@Test
+	public void unionMaterializeShouldNotThrowNPE() {
+		checkMaterialized(union.materialize());
+		checkMaterialized(pipeline.materialize(union));
+	}
+
+	private void checkMaterialized(Iterable<String> materialized) {
+
+		List<String> materializedValues = Lists.newArrayList(materialized.iterator());
+		Collections.sort(materializedValues);
+		LOG.info("Materialized union: " + materializedValues);
+
+		assertEquals(EXPECTED, materializedValues);
+	}
+
+	@Test
+	public void unionWriteShouldNotThrowNPE() throws IOException {
+
+		File outputPath1 = FileHelper.createOutputPath();
+		File outputPath2 = FileHelper.createOutputPath();
+		File outputPath3 = FileHelper.createOutputPath();
+
+		if (typeFamily == AvroTypeFamily.getInstance()) {
+			union.write(To.avroFile(outputPath1.getAbsolutePath()));
+			pipeline.write(union, To.avroFile(outputPath2.getAbsolutePath()));
+
+			pipeline.run();
+
+			checkFileContents(outputPath1.getAbsolutePath());
+			checkFileContents(outputPath2.getAbsolutePath());
+
+		} else {
+
+			union.write(To.textFile(outputPath1.getAbsolutePath()));
+			pipeline.write(union, To.textFile(outputPath2.getAbsolutePath()));
+			pipeline.writeTextFile(union, outputPath3.getAbsolutePath());
+
+			pipeline.run();
+
+			checkFileContents(outputPath1.getAbsolutePath());
+			checkFileContents(outputPath2.getAbsolutePath());
+			checkFileContents(outputPath3.getAbsolutePath());
+		}
+
+	}
+
+	private void checkFileContents(String filePath) throws IOException {
+
+		List<String> fileContentValues = (typeFamily != AvroTypeFamily.getInstance() || !(pipeline instanceof MRPipeline)) ? Lists
+				.newArrayList(pipeline.read(At.textFile(filePath, typeFamily.strings())).materialize()
+						.iterator()) : Lists.newArrayList(pipeline.read(At.avroFile(filePath, Avros.strings()))
+				.materialize().iterator());
+
+		Collections.sort(fileContentValues);
+
+		LOG.info("Saved Union: " + fileContentValues);
+		assertEquals(EXPECTED, fileContentValues);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java b/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
new file mode 100644
index 0000000..df05aa5
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import org.apache.crunch.io.text.TextFileReaderFactory;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.writable.Writables;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+
+public class CompositePathIterableIT {
+
+  
+  @Test
+  public void testCreate_FilePresent() throws IOException{
+    String inputFilePath = FileHelper.createTempCopyOf("set1.txt");
+    Configuration conf = new Configuration();
+    LocalFileSystem local = FileSystem.getLocal(conf);
+    
+    Iterable<String> iterable = CompositePathIterable.create(local, new Path(inputFilePath), new TextFileReaderFactory<String>(Writables.strings(), conf));
+    
+    assertEquals(Lists.newArrayList("b", "c", "a", "e"), Lists.newArrayList(iterable));
+    
+  }
+  
+  @Test
+  public void testCreate_DirectoryPresentButNoFiles() throws IOException{
+    String inputFilePath = Files.createTempDir().getAbsolutePath();
+ 
+    Configuration conf = new Configuration();
+    LocalFileSystem local = FileSystem.getLocal(conf);
+    
+    Iterable<String> iterable = CompositePathIterable.create(local, new Path(inputFilePath), new TextFileReaderFactory<String>(Writables.strings(), conf));
+    
+    assertTrue(Lists.newArrayList(iterable).isEmpty());
+  }
+  
+  @Test(expected=IOException.class)
+  public void testCreate_DirectoryNotPresent() throws IOException{
+    File inputFileDir = Files.createTempDir();
+    inputFileDir.delete();
+    
+    // Sanity check
+    assertFalse(inputFileDir.exists());
+    
+    Configuration conf = new Configuration();
+    LocalFileSystem local = FileSystem.getLocal(conf);
+    
+    CompositePathIterable.create(local, new Path(inputFileDir.getAbsolutePath()), new TextFileReaderFactory<String>(Writables.strings(), conf));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
new file mode 100644
index 0000000..e25aec4
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectData;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.avro.AvroFileReaderFactoryTest.PojoPerson;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.types.avro.Avros;
+import com.google.common.collect.Lists;
+
+@SuppressWarnings("serial")
+public class AvroFileSourceTargetIT implements Serializable {
+
+	private transient File avroFile;
+
+	@Before
+	public void setUp() throws IOException {
+		avroFile = File.createTempFile("test", ".avro");
+	}
+
+	@After
+	public void tearDown() {
+		avroFile.delete();
+	}
+
+	private void populateGenericFile(List<GenericRecord> genericRecords,
+			Schema schema) throws IOException {
+		FileOutputStream outputStream = new FileOutputStream(this.avroFile);
+		GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(
+				schema);
+
+		DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(
+				genericDatumWriter);
+		dataFileWriter.create(schema, outputStream);
+
+		for (GenericRecord record : genericRecords) {
+			dataFileWriter.append(record);
+		}
+
+		dataFileWriter.close();
+		outputStream.close();
+
+	}
+
+	@Test
+	public void testSpecific() throws IOException {
+		GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+		savedRecord.put("name", "John Doe");
+		savedRecord.put("age", 42);
+		savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+		populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+		Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class);
+		PCollection<Person> genericCollection = pipeline.read(At.avroFile(
+				avroFile.getAbsolutePath(), Avros.records(Person.class)));
+
+		List<Person> personList = Lists.newArrayList(genericCollection
+				.materialize());
+
+		Person expectedPerson = new Person();
+		expectedPerson.setName("John Doe");
+		expectedPerson.setAge(42);
+
+		List<CharSequence> siblingNames = Lists.newArrayList();
+		siblingNames.add("Jimmy");
+		siblingNames.add("Jane");
+		expectedPerson.setSiblingnames(siblingNames);
+
+		assertEquals(Lists.newArrayList(expectedPerson),
+				Lists.newArrayList(personList));
+	}
+
+	@Test
+	public void testGeneric() throws IOException {
+		String genericSchemaJson = Person.SCHEMA$.toString().replace("Person",
+				"GenericPerson");
+		Schema genericPersonSchema = new Schema.Parser()
+				.parse(genericSchemaJson);
+		GenericRecord savedRecord = new GenericData.Record(genericPersonSchema);
+		savedRecord.put("name", "John Doe");
+		savedRecord.put("age", 42);
+		savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+		populateGenericFile(Lists.newArrayList(savedRecord),
+				genericPersonSchema);
+
+		Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class);
+		PCollection<Record> genericCollection = pipeline
+				.read(At.avroFile(avroFile.getAbsolutePath(),
+						Avros.generics(genericPersonSchema)));
+
+		List<Record> recordList = Lists.newArrayList(genericCollection
+				.materialize());
+
+		assertEquals(Lists.newArrayList(savedRecord),
+				Lists.newArrayList(recordList));
+	}
+
+	@Test
+	public void testReflect() throws IOException {
+		Schema pojoPersonSchema = ReflectData.get().getSchema(PojoPerson.class);
+		GenericRecord savedRecord = new GenericData.Record(pojoPersonSchema);
+		savedRecord.put("name", "John Doe");
+		populateGenericFile(Lists.newArrayList(savedRecord), pojoPersonSchema);
+
+		Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class);
+		PCollection<PojoPerson> personCollection = pipeline.read(At.avroFile(
+				avroFile.getAbsolutePath(), Avros.reflects(PojoPerson.class)));
+
+		List<PojoPerson> recordList = Lists.newArrayList(personCollection
+				.materialize());
+
+		assertEquals(1, recordList.size());
+		PojoPerson person = recordList.get(0);
+		assertEquals("John Doe", person.getName());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java b/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
new file mode 100644
index 0000000..f5c9902
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.avro.Avros;
+import com.google.common.collect.Lists;
+
+public class AvroReflectIT implements Serializable {
+
+	static class StringWrapper {
+		private String value;
+
+		public StringWrapper() {
+			this(null);
+		}
+
+		public StringWrapper(String value) {
+			this.value = value;
+		}
+
+		public String getValue() {
+			return value;
+		}
+
+		public void setValue(String value) {
+			this.value = value;
+		}
+
+		@Override
+		public String toString() {
+			return String.format("<StringWrapper(%s)>", value);
+		}
+
+		@Override
+		public int hashCode() {
+			final int prime = 31;
+			int result = 1;
+			result = prime * result + ((value == null) ? 0 : value.hashCode());
+			return result;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (this == obj)
+				return true;
+			if (obj == null)
+				return false;
+			if (getClass() != obj.getClass())
+				return false;
+			StringWrapper other = (StringWrapper) obj;
+			if (value == null) {
+				if (other.value != null)
+					return false;
+			} else if (!value.equals(other.value))
+				return false;
+			return true;
+		}
+
+	}
+
+	@Test
+	public void testReflection() throws IOException {
+		Pipeline pipeline = new MRPipeline(AvroReflectIT.class);
+		PCollection<StringWrapper> stringWrapperCollection = pipeline
+				.readTextFile(FileHelper.createTempCopyOf("set1.txt"))
+				.parallelDo(new MapFn<String, StringWrapper>() {
+
+					@Override
+					public StringWrapper map(String input) {
+						StringWrapper stringWrapper = new StringWrapper();
+						stringWrapper.setValue(input);
+						return stringWrapper;
+					}
+				}, Avros.reflects(StringWrapper.class));
+
+		List<StringWrapper> stringWrappers = Lists
+				.newArrayList(stringWrapperCollection.materialize());
+
+		pipeline.done();
+
+		assertEquals(Lists.newArrayList(new StringWrapper("b"),
+				new StringWrapper("c"), new StringWrapper("a"),
+				new StringWrapper("e")), stringWrappers);
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java b/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java
new file mode 100644
index 0000000..33f1f0c
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.apache.crunch.types.writable.Writables.strings;
+import static org.apache.crunch.types.writable.Writables.tableOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.test.Employee;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.crunch.types.writable.Writables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+public class AggregateIT {
+
+  @Test public void testWritables() throws Exception {
+    Pipeline pipeline = new MRPipeline(AggregateIT.class);
+    String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
+    PCollection<String> shakes = pipeline.readTextFile(shakesInputPath);
+    runMinMax(shakes, WritableTypeFamily.getInstance());
+    pipeline.done();
+  }
+
+  @Test public void testAvro() throws Exception {
+    Pipeline pipeline = new MRPipeline(AggregateIT.class);
+    String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
+    PCollection<String> shakes = pipeline.readTextFile(shakesInputPath);
+    runMinMax(shakes, AvroTypeFamily.getInstance());
+    pipeline.done();
+  }
+
+  @Test public void testInMemoryAvro() throws Exception {
+    PCollection<String> someText = MemPipeline.collectionOf(
+        "first line", "second line", "third line");
+    runMinMax(someText, AvroTypeFamily.getInstance());
+  }
+  
+  public static void runMinMax(PCollection<String> shakes, PTypeFamily family) throws Exception {
+    PCollection<Integer> lengths = shakes.parallelDo(new MapFn<String, Integer>() {
+      @Override
+      public Integer map(String input) {
+        return input.length();
+      }
+    }, family.ints());
+    PCollection<Integer> negLengths = lengths.parallelDo(new MapFn<Integer, Integer>() {
+      @Override
+      public Integer map(Integer input) {
+        return -input;
+      }
+    }, family.ints());
+    Integer maxLengths = Iterables.getFirst(Aggregate.max(lengths).materialize(), null);
+    Integer minLengths = Iterables.getFirst(Aggregate.min(negLengths).materialize(), null);
+    assertTrue(maxLengths != null);
+    assertTrue(minLengths != null);
+    assertEquals(maxLengths.intValue(), -minLengths.intValue());
+  }
+  
+  private static class SplitFn extends MapFn<String, Pair<String, String>> {
+    @Override
+    public Pair<String, String> map(String input) {
+      String[] p = input.split("\\s+");
+      return Pair.of(p[0], p[1]);
+    }  
+  }
+  
+  @Test public void testCollectUrls() throws Exception {
+    Pipeline p = new MRPipeline(AggregateIT.class);
+    String urlsInputPath = FileHelper.createTempCopyOf("urls.txt");
+    PTable<String, Collection<String>> urls = Aggregate.collectValues(
+        p.readTextFile(urlsInputPath)
+        .parallelDo(new SplitFn(), tableOf(strings(), strings())));
+    for (Pair<String, Collection<String>> e : urls.materialize()) {
+      String key = e.first();
+      int expectedSize = 0;
+      if ("www.A.com".equals(key)) {
+        expectedSize = 4;
+      } else if ("www.B.com".equals(key) || "www.F.com".equals(key)) {
+        expectedSize = 2;
+      } else if ("www.C.com".equals(key) || "www.D.com".equals(key) || "www.E.com".equals(key)) {
+        expectedSize = 1;
+      }
+      assertEquals("Checking key = " + key, expectedSize, e.second().size());
+      p.done();
+    }
+  }
+  
+  @Test public void testTopN() throws Exception {
+    PTableType<String, Integer> ptype = Avros.tableOf(Avros.strings(), Avros.ints());
+    PTable<String, Integer> counts = MemPipeline.typedTableOf(ptype, "foo", 12, "bar", 17, "baz", 29);
+    
+    PTable<String, Integer> top2 = Aggregate.top(counts, 2, true);
+    assertEquals(ImmutableList.of(Pair.of("baz", 29), Pair.of("bar", 17)), top2.materialize());
+    
+    PTable<String, Integer> bottom2 = Aggregate.top(counts, 2, false);
+    assertEquals(ImmutableList.of(Pair.of("foo", 12), Pair.of("bar", 17)), bottom2.materialize());
+  }
+
+  @Test
+  public void testCollectValues_Writables() throws IOException {
+    Pipeline pipeline = new MRPipeline(AggregateIT.class);
+    Map<Integer, Collection<Text>> collectionMap = pipeline
+        .readTextFile(FileHelper.createTempCopyOf("set2.txt"))
+        .parallelDo(new MapStringToTextPair(),
+            Writables.tableOf(Writables.ints(), Writables.writables(Text.class))
+        ).collectValues().materializeToMap();
+
+    assertEquals(1, collectionMap.size());
+
+    assertEquals(Lists.newArrayList(new Text("c"), new Text("d"), new Text("a")),
+        collectionMap.get(1));
+  }
+
+  @Test
+  public void testCollectValues_Avro() throws IOException {
+
+    MapStringToEmployeePair mapFn = new MapStringToEmployeePair();
+    Pipeline pipeline = new MRPipeline(AggregateIT.class);
+    Map<Integer, Collection<Employee>> collectionMap = pipeline
+        .readTextFile(FileHelper.createTempCopyOf("set2.txt"))
+        .parallelDo(mapFn,
+            Avros.tableOf(Avros.ints(), Avros.records(Employee.class))).collectValues()
+        .materializeToMap();
+
+    assertEquals(1, collectionMap.size());
+
+    Employee empC = mapFn.map("c").second();
+    Employee empD = mapFn.map("d").second();
+    Employee empA = mapFn.map("a").second();
+
+    assertEquals(Lists.newArrayList(empC, empD, empA),
+        collectionMap.get(1));
+  }
+
+  private static class MapStringToTextPair extends MapFn<String, Pair<Integer, Text>> {
+    @Override
+    public Pair<Integer, Text> map(String input) {
+      return Pair.of(1, new Text(input));
+    }
+  }
+
+  private static class MapStringToEmployeePair extends MapFn<String, Pair<Integer, Employee>> {
+    @Override
+    public Pair<Integer, Employee> map(String input) {
+      Employee emp = new Employee();
+      emp.setName(input);
+      emp.setSalary(0);
+      emp.setDepartment("");
+      return Pair.of(1, emp);
+    }
+  }
+
+  public static class PojoText {
+    private String value;
+
+    public PojoText() {
+      this("");
+    }
+
+    public PojoText(String value) {
+      this.value = value;
+    }
+
+    public String getValue() {
+      return value;
+    }
+
+    public void setValue(String value) {
+      this.value = value;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("PojoText<%s>", this.value);
+    }
+
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      PojoText other = (PojoText) obj;
+      if (value == null) {
+        if (other.value != null)
+          return false;
+      } else if (!value.equals(other.value))
+        return false;
+      return true;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java b/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java
new file mode 100644
index 0000000..0aece4b
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.apache.crunch.types.avro.Avros.ints;
+import static org.apache.crunch.types.avro.Avros.records;
+import static org.apache.crunch.types.avro.Avros.strings;
+import static junit.framework.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.Person;
+import com.google.common.collect.Lists;
+
+/**
+ * Test sorting Avro types by selected inner field
+ */
+public class AvroTypeSortIT implements Serializable {
+
+	private static final long serialVersionUID = 1344118240353796561L;
+
+	private transient File avroFile;
+
+	@Before
+	public void setUp() throws IOException {
+		avroFile = File.createTempFile("avrotest", ".avro");
+	}
+
+	@After
+	public void tearDown() {
+		avroFile.delete();
+	}
+
+	@Test
+	public void testSortAvroTypesBySelectedFields() throws Exception {
+
+		MRPipeline pipeline = new MRPipeline(AvroTypeSortIT.class);
+
+		Person ccc10 = createPerson("CCC", 10);
+		Person bbb20 = createPerson("BBB", 20);
+		Person aaa30 = createPerson("AAA", 30);
+
+		writeAvroFile(Lists.newArrayList(ccc10, bbb20, aaa30), avroFile);
+
+		PCollection<Person> unsorted = pipeline.read(At.avroFile(
+				avroFile.getAbsolutePath(), records(Person.class)));
+
+		// Sort by Name
+		MapFn<Person, String> nameExtractor = new MapFn<Person, String>() {
+
+			@Override
+			public String map(Person input) {
+				return input.getName().toString();
+			}
+		};
+
+		PCollection<Person> sortedByName = unsorted
+				.by(nameExtractor, strings()).groupByKey().ungroup().values();
+
+		List<Person> sortedByNameList = Lists.newArrayList(sortedByName
+				.materialize());
+
+		assertEquals(3, sortedByNameList.size());
+		assertEquals(aaa30, sortedByNameList.get(0));
+		assertEquals(bbb20, sortedByNameList.get(1));
+		assertEquals(ccc10, sortedByNameList.get(2));
+
+		// Sort by Age
+
+		MapFn<Person, Integer> ageExtractor = new MapFn<Person, Integer>() {
+
+			@Override
+			public Integer map(Person input) {
+				return input.getAge();
+			}
+		};
+
+		PCollection<Person> sortedByAge = unsorted.by(ageExtractor, ints())
+				.groupByKey().ungroup().values();
+
+		List<Person> sortedByAgeList = Lists.newArrayList(sortedByAge
+				.materialize());
+
+		assertEquals(3, sortedByAgeList.size());
+		assertEquals(ccc10, sortedByAgeList.get(0));
+		assertEquals(bbb20, sortedByAgeList.get(1));
+		assertEquals(aaa30, sortedByAgeList.get(2));
+
+		pipeline.done();
+	}
+
+	private void writeAvroFile(List<Person> people, File avroFile)
+			throws IOException {
+
+		FileOutputStream outputStream = new FileOutputStream(avroFile);
+		SpecificDatumWriter<Person> writer = new SpecificDatumWriter<Person>(
+				Person.class);
+
+		DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(
+				writer);
+		dataFileWriter.create(Person.SCHEMA$, outputStream);
+		for (Person person : people) {
+			dataFileWriter.append(person);
+		}
+		dataFileWriter.close();
+		outputStream.close();
+	}
+
+	private Person createPerson(String name, int age) throws IOException {
+
+		Person person = new Person();
+		person.setAge(age);
+		person.setName(name);
+		List<CharSequence> siblingNames = Lists.newArrayList();
+		person.setSiblingnames(siblingNames);
+
+		return person;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
new file mode 100644
index 0000000..cc67257
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.fn.MapKeysFn;
+import org.apache.crunch.fn.MapValuesFn;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import com.google.common.base.Splitter;
+import com.google.common.io.Files;
+
+public class CogroupIT {
+
+  private static class WordSplit extends DoFn<String, Pair<String, Long>> {
+    @Override
+    public void process(String input, Emitter<Pair<String, Long>> emitter) {
+      for (String word : Splitter.on(' ').split(input)) {
+        emitter.emit(Pair.of(word, 1L));
+      }
+    }
+  }
+
+  public static PTable<String, Long> join(PCollection<String> w1,
+      PCollection<String> w2, PTypeFamily ptf) {
+    PTableType<String, Long> ntt = ptf.tableOf(ptf.strings(), ptf.longs());
+    PTable<String, Long> ws1 = w1.parallelDo("ws1", new WordSplit(), ntt);
+    PTable<String, Long> ws2 = w2.parallelDo("ws2", new WordSplit(), ntt);
+    PTable<String, Pair<Collection<Long>, Collection<Long>>> cg = Cogroup.cogroup(ws1, ws2);
+    PTable<String, Long> sums = cg.parallelDo(
+        "wc",
+        new MapValuesFn<String, Pair<Collection<Long>, Collection<Long>>, Long>() {
+          @Override
+          public Long map(Pair<Collection<Long>, Collection<Long>> v) {
+            long sum = 0L;
+            for (Long value : v.first()) {
+              sum += value;
+            }
+            for (Long value : v.second()) {
+              sum += value;
+            }
+            return sum;
+          }
+        }, ntt);
+    return sums.parallelDo("firstletters", new MapKeysFn<String, String, Long>() {
+      @Override
+      public String map(String k1) {
+        if (k1.length() > 0) {
+          return k1.substring(0, 1).toLowerCase();
+        } else {
+          return "";
+        }
+      }
+    }, ntt).groupByKey().combineValues(CombineFn.<String>SUM_LONGS());
+  }
+
+  @Test
+  public void testWritableJoin() throws Exception {
+    run(new MRPipeline(CogroupIT.class), WritableTypeFamily.getInstance());
+  }
+  
+  @Test
+  public void testAvroJoin() throws Exception {
+    run(new MRPipeline(CogroupIT.class), AvroTypeFamily.getInstance());
+  }
+  
+  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+    String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
+    String maughamInputPath = FileHelper.createTempCopyOf("maugham.txt");
+    File output = FileHelper.createOutputPath();
+    
+    PCollection<String> shakespeare = pipeline.read(From.textFile(shakesInputPath));
+    PCollection<String> maugham = pipeline.read(From.textFile(maughamInputPath));
+    pipeline.writeTextFile(join(shakespeare, maugham, typeFamily), output.getAbsolutePath());
+    pipeline.done();
+    
+    File outputFile = new File(output, "part-r-00000");
+    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
+    boolean passed = false;
+    for (String line : lines) {
+      if (line.equals("j\t705")) {
+        passed = true;
+        break;
+      }
+    }
+    assertTrue(passed);
+    
+    output.deleteOnExit();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/SetIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SetIT.java b/crunch/src/it/java/org/apache/crunch/lib/SetIT.java
new file mode 100644
index 0000000..ab09bef
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/lib/SetIT.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import com.google.common.collect.Lists;
+
+@RunWith(value = Parameterized.class)
+public class SetIT {
+  
+  private PTypeFamily typeFamily;
+  
+  private Pipeline pipeline;
+  private PCollection<String> set1;
+  private PCollection<String> set2;
+  
+  public SetIT(PTypeFamily typeFamily) {
+    this.typeFamily = typeFamily;
+  }
+  
+  @Parameters
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][] {
+        { WritableTypeFamily.getInstance() },
+        { AvroTypeFamily.getInstance() }
+    };
+    return Arrays.asList(data);
+  }
+  
+  @Before
+  public void setUp() throws IOException {
+    String set1InputPath = FileHelper.createTempCopyOf("set1.txt");
+    String set2InputPath = FileHelper.createTempCopyOf("set2.txt");
+    pipeline = new MRPipeline(SetIT.class);
+    set1 = pipeline.read(At.textFile(set1InputPath, typeFamily.strings()));
+    set2 = pipeline.read(At.textFile(set2InputPath, typeFamily.strings()));
+  }
+  
+  @After
+  public void tearDown() {
+    pipeline.done();
+  }
+  
+  @Test
+  public void testDifference() throws Exception {
+    PCollection<String> difference = Set.difference(set1, set2);
+    assertEquals(Lists.newArrayList("b", "e"),
+        Lists.newArrayList(difference.materialize()));
+  }
+  
+  @Test
+  public void testIntersection() throws Exception {
+    PCollection<String> intersection = Set.intersection(set1, set2);
+    assertEquals(Lists.newArrayList("a", "c"),
+        Lists.newArrayList(intersection.materialize()));
+  }
+  
+  @Test
+  public void testComm() throws Exception {
+    PCollection<Tuple3<String, String, String>> comm = Set.comm(set1, set2);
+    Iterator<Tuple3<String, String, String>> i = comm.materialize().iterator();
+    checkEquals(null, null, "a", i.next());
+    checkEquals("b", null, null, i.next());
+    checkEquals(null, null, "c", i.next());
+    checkEquals(null, "d", null, i.next());
+    checkEquals("e", null, null, i.next());
+    assertFalse(i.hasNext());
+  }
+
+  private void checkEquals(String s1, String s2, String s3,
+      Tuple3<String, String, String> tuple) {
+    assertEquals("first string", s1, tuple.first());
+    assertEquals("second string", s2, tuple.second());
+    assertEquals("third string", s3, tuple.third());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
new file mode 100644
index 0000000..616ec15
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.apache.crunch.lib.Sort.ColumnOrder.by;
+import static org.apache.crunch.lib.Sort.Order.ASCENDING;
+import static org.apache.crunch.lib.Sort.Order.DESCENDING;
+import static org.apache.crunch.test.StringWrapper.wrap;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.lib.Sort;
+import org.apache.crunch.lib.Sort.ColumnOrder;
+import org.apache.crunch.lib.Sort.Order;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SortIT implements Serializable {
+
+  @Test
+  public void testWritableSortAsc() throws Exception {
+    runSingle(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), Order.ASCENDING,
+        "A\tand this text as well");
+  }
+
+  @Test
+  public void testWritableSortDesc() throws Exception {
+    runSingle(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), Order.DESCENDING,
+        "B\tthis doc has some text");
+  }
+
+  @Test
+  public void testWritableSortAscDesc() throws Exception {
+    runPair(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(1, ASCENDING),
+        by(2, DESCENDING), "A", "this doc has this text");
+  }
+
+  @Test
+  public void testWritableSortSecondDescFirstDesc() throws Exception {
+    runPair(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(2, DESCENDING),
+        by(1, ASCENDING), "A", "this doc has this text");
+  }
+
+  @Test
+  public void testWritableSortTripleAscDescAsc() throws Exception {
+    runTriple(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(1, ASCENDING),
+        by(2, DESCENDING), by(3, ASCENDING), "A", "this", "doc");
+  }
+
+  @Test
+  public void testWritableSortQuadAscDescAscDesc() throws Exception {
+    runQuad(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), by(1, ASCENDING),
+        by(2, DESCENDING), by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
+  }
+
+  @Test
+  public void testWritableSortTupleNAscDesc() throws Exception {
+    runTupleN(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), new ColumnOrder[] {
+        by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" });
+  }
+
+  @Test
+  public void testWritableSortTable() throws Exception {
+    runTable(new MRPipeline(SortIT.class), WritableTypeFamily.getInstance(), "A");
+  }
+
+  @Test
+  public void testAvroSortAsc() throws Exception {
+    runSingle(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), Order.ASCENDING,
+        "A\tand this text as well");
+  }
+
+  @Test
+  public void testAvroSortDesc() throws Exception {
+    runSingle(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), Order.DESCENDING,
+        "B\tthis doc has some text");
+  }
+
+  @Test
+  public void testAvroSortPairAscAsc() throws Exception {
+    runPair(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(1, ASCENDING),
+        by(2, DESCENDING), "A", "this doc has this text");
+  }
+
+  @Test
+  @Ignore("Avro sorting only works in field order at the moment")
+  public void testAvroSortPairSecondAscFirstDesc() throws Exception {
+    runPair(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(2, DESCENDING),
+        by(1, ASCENDING), "A", "this doc has this text");
+  }
+
+  @Test
+  public void testAvroSortTripleAscDescAsc() throws Exception {
+    runTriple(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(1, ASCENDING),
+        by(2, DESCENDING), by(3, ASCENDING), "A", "this", "doc");
+  }
+
+  @Test
+  public void testAvroSortQuadAscDescAscDesc() throws Exception {
+    runQuad(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), by(1, ASCENDING),
+        by(2, DESCENDING), by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
+  }
+
+  @Test
+  public void testAvroSortTupleNAscDesc() throws Exception {
+    runTupleN(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(),
+        new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A",
+            "this doc has this text" });
+  }
+
+  @Test
+  public void testAvroReflectSortPair() throws IOException {
+    Pipeline pipeline = new MRPipeline(SortIT.class);
+    PCollection<Pair<String, StringWrapper>> sorted = pipeline
+        .readTextFile(FileHelper.createTempCopyOf("set2.txt"))
+        .parallelDo(new MapFn<String, Pair<String, StringWrapper>>() {
+
+          @Override
+          public Pair<String, StringWrapper> map(String input) {
+            return Pair.of(input, wrap(input));
+          }
+        }, Avros.pairs(Avros.strings(), Avros.reflects(StringWrapper.class))).sort(true);
+
+    List<Pair<String, StringWrapper>> expected = Lists.newArrayList();
+    expected.add(Pair.of("a", wrap("a")));
+    expected.add(Pair.of("c", wrap("c")));
+    expected.add(Pair.of("d", wrap("d")));
+
+    assertEquals(expected, Lists.newArrayList(sorted.materialize()));
+  }
+
+  @Test
+  public void testAvroReflectSortTable() throws IOException {
+    Pipeline pipeline = new MRPipeline(SortIT.class);
+    PTable<String, StringWrapper> unsorted = pipeline.readTextFile(
+        FileHelper.createTempCopyOf("set2.txt")).parallelDo(
+        new MapFn<String, Pair<String, StringWrapper>>() {
+
+          @Override
+          public Pair<String, StringWrapper> map(String input) {
+            return Pair.of(input, wrap(input));
+          }
+        }, Avros.tableOf(Avros.strings(), Avros.reflects(StringWrapper.class)));
+
+    PTable<String, StringWrapper> sorted = Sort.sort(unsorted);
+
+    List<Pair<String, StringWrapper>> expected = Lists.newArrayList();
+    expected.add(Pair.of("a", wrap("a")));
+    expected.add(Pair.of("c", wrap("c")));
+    expected.add(Pair.of("d", wrap("d")));
+
+    assertEquals(expected, Lists.newArrayList(sorted.materialize()));
+  }
+
+  @Test
+  public void testAvroSortTable() throws Exception {
+    runTable(new MRPipeline(SortIT.class), AvroTypeFamily.getInstance(), "A");
+  }
+
+  private void runSingle(Pipeline pipeline, PTypeFamily typeFamily, Order order, String firstLine)
+      throws IOException {
+    String inputPath = FileHelper.createTempCopyOf("docs.txt");
+
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    // following turns the input from Writables to required type family
+    PCollection<String> input2 = input.parallelDo(new DoFn<String, String>() {
+      @Override
+      public void process(String input, Emitter<String> emitter) {
+        emitter.emit(input);
+      }
+    }, typeFamily.strings());
+    PCollection<String> sorted = Sort.sort(input2, order);
+    Iterable<String> lines = sorted.materialize();
+
+    assertEquals(firstLine, lines.iterator().next());
+    pipeline.done(); // TODO: finally
+  }
+
+  private void runPair(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first,
+      ColumnOrder second, String firstField, String secondField) throws IOException {
+    String inputPath = FileHelper.createTempCopyOf("docs.txt");
+
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PCollection<Pair<String, String>> kv = input.parallelDo(
+        new DoFn<String, Pair<String, String>>() {
+          @Override
+          public void process(String input, Emitter<Pair<String, String>> emitter) {
+            String[] split = input.split("[\t]+");
+            emitter.emit(Pair.of(split[0], split[1]));
+          }
+        }, typeFamily.pairs(typeFamily.strings(), typeFamily.strings()));
+    PCollection<Pair<String, String>> sorted = Sort.sortPairs(kv, first, second);
+    Iterable<Pair<String, String>> lines = sorted.materialize();
+    Pair<String, String> l = lines.iterator().next();
+    assertEquals(firstField, l.first());
+    assertEquals(secondField, l.second());
+    pipeline.done();
+  }
+
+  private void runTriple(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first,
+      ColumnOrder second, ColumnOrder third, String firstField, String secondField,
+      String thirdField) throws IOException {
+    String inputPath = FileHelper.createTempCopyOf("docs.txt");
+
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PCollection<Tuple3<String, String, String>> kv = input.parallelDo(
+        new DoFn<String, Tuple3<String, String, String>>() {
+          @Override
+          public void process(String input, Emitter<Tuple3<String, String, String>> emitter) {
+            String[] split = input.split("[\t ]+");
+            int len = split.length;
+            emitter.emit(Tuple3.of(split[0], split[1 % len], split[2 % len]));
+          }
+        }, typeFamily.triples(typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
+    PCollection<Tuple3<String, String, String>> sorted = Sort.sortTriples(kv, first, second, third);
+    Iterable<Tuple3<String, String, String>> lines = sorted.materialize();
+    Tuple3<String, String, String> l = lines.iterator().next();
+    assertEquals(firstField, l.first());
+    assertEquals(secondField, l.second());
+    assertEquals(thirdField, l.third());
+    pipeline.done();
+  }
+
+  private void runQuad(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first,
+      ColumnOrder second, ColumnOrder third, ColumnOrder fourth, String firstField,
+      String secondField, String thirdField, String fourthField) throws IOException {
+    String inputPath = FileHelper.createTempCopyOf("docs.txt");
+
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PCollection<Tuple4<String, String, String, String>> kv = input.parallelDo(
+        new DoFn<String, Tuple4<String, String, String, String>>() {
+          @Override
+          public void process(String input, Emitter<Tuple4<String, String, String, String>> emitter) {
+            String[] split = input.split("[\t ]+");
+            int len = split.length;
+            emitter.emit(Tuple4.of(split[0], split[1 % len], split[2 % len], split[3 % len]));
+          }
+        }, typeFamily.quads(typeFamily.strings(), typeFamily.strings(), typeFamily.strings(),
+            typeFamily.strings()));
+    PCollection<Tuple4<String, String, String, String>> sorted = Sort.sortQuads(kv, first, second,
+        third, fourth);
+    Iterable<Tuple4<String, String, String, String>> lines = sorted.materialize();
+    Tuple4<String, String, String, String> l = lines.iterator().next();
+    assertEquals(firstField, l.first());
+    assertEquals(secondField, l.second());
+    assertEquals(thirdField, l.third());
+    assertEquals(fourthField, l.fourth());
+    pipeline.done();
+  }
+
+  private void runTupleN(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder[] orders,
+      String[] fields) throws IOException {
+    String inputPath = FileHelper.createTempCopyOf("docs.txt");
+
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PType[] types = new PType[orders.length];
+    Arrays.fill(types, typeFamily.strings());
+    PCollection<TupleN> kv = input.parallelDo(new DoFn<String, TupleN>() {
+      @Override
+      public void process(String input, Emitter<TupleN> emitter) {
+        String[] split = input.split("[\t]+");
+        emitter.emit(new TupleN(split));
+      }
+    }, typeFamily.tuples(types));
+    PCollection<TupleN> sorted = Sort.sortTuples(kv, orders);
+    Iterable<TupleN> lines = sorted.materialize();
+    TupleN l = lines.iterator().next();
+    int i = 0;
+    for (String field : fields) {
+      assertEquals(field, l.get(i++));
+    }
+    pipeline.done();
+  }
+
+  private void runTable(Pipeline pipeline, PTypeFamily typeFamily, String firstKey)
+      throws IOException {
+    String inputPath = FileHelper.createTempCopyOf("docs.txt");
+
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PTable<String, String> table = input.parallelDo(new DoFn<String, Pair<String, String>>() {
+      @Override
+      public void process(String input, Emitter<Pair<String, String>> emitter) {
+        String[] split = input.split("[\t]+");
+        emitter.emit(Pair.of(split[0], split[1]));
+      }
+    }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
+
+    PTable<String, String> sorted = Sort.sort(table);
+    Iterable<Pair<String, String>> lines = sorted.materialize();
+    Pair<String, String> l = lines.iterator().next();
+    assertEquals(firstKey, l.first());
+    pipeline.done();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java b/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
new file mode 100644
index 0000000..69f0717
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/lib/SpecificAvroGroupByIT.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.Person.Builder;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.avro.SafeAvroSerialization;
+import com.google.common.collect.Lists;
+
+/**
+ * Test {@link SafeAvroSerialization} with Specific Avro types
+ */
+public class SpecificAvroGroupByIT implements Serializable {
+
+	private static final long serialVersionUID = 1344118240353796561L;
+
+	private transient File avroFile;
+
+	@Before
+	public void setUp() throws IOException {
+		avroFile = File.createTempFile("avrotest", ".avro");
+	}
+
+	@After
+	public void tearDown() {
+		avroFile.delete();
+	}
+
+	@Test
+	public void testGrouByWithSpecificAvroType() throws Exception {
+
+		MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class);
+
+		testSpecificAvro(pipeline);
+	}
+
+  @Test
+  public void testGrouByOnSpecificAvroButReflectionDatumReader()
+      throws Exception {
+    MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByIT.class);
+
+    // https://issues.apache.org/jira/browse/AVRO-1046  resolves 
+    // the ClassCastException when reading specific Avro types with 
+    // ReflectDatumReader
+    
+    pipeline.getConfiguration().setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT,
+        true);
+
+    testSpecificAvro(pipeline);
+  }
+
+	public void testSpecificAvro(MRPipeline pipeline) throws Exception {
+
+		createPersonAvroFile(avroFile);
+
+		PCollection<Person> unsorted = pipeline.read(At.avroFile(
+				avroFile.getAbsolutePath(), Avros.records(Person.class)));
+
+		PTable<String, Person> sorted = unsorted
+				.parallelDo(new MapFn<Person, Pair<String, Person>>() {
+
+					@Override
+					public Pair<String, Person> map(Person input) {
+						String key = input.getName().toString();
+						return Pair.of(key, input);
+
+					}
+				}, Avros.tableOf(Avros.strings(), Avros.records(Person.class)))
+				.groupByKey().ungroup();
+
+		List<Pair<String, Person>> outputPersonList = Lists.newArrayList(sorted
+				.materialize());
+
+		assertEquals(1, outputPersonList.size());
+		assertEquals(String.class, outputPersonList.get(0).first().getClass());
+		assertEquals(Person.class, outputPersonList.get(0).second().getClass());
+		
+		pipeline.done();
+	}
+
+	private void createPersonAvroFile(File avroFile) throws IOException {
+
+		Builder person = Person.newBuilder();
+		person.setAge(40);
+		person.setName("Bob");
+		List<CharSequence> siblingNames = Lists.newArrayList();
+		siblingNames.add("Bob" + "1");
+		siblingNames.add("Bob" + "2");
+		person.setSiblingnames(siblingNames);
+
+		FileOutputStream outputStream = new FileOutputStream(avroFile);
+		SpecificDatumWriter<Person> writer = new SpecificDatumWriter<Person>(
+				Person.class);
+
+		DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(
+				writer);
+		dataFileWriter.create(Person.SCHEMA$, outputStream);
+		dataFileWriter.append(person.build());
+		dataFileWriter.close();
+		outputStream.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java
new file mode 100644
index 0000000..8016a24
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/FullOuterJoinIT.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTypeFamily;
+
+public class FullOuterJoinIT extends JoinTester {
+  @Override
+  public void assertPassed(Iterable<Pair<String, Long>> lines) {
+    boolean passed1 = false;
+    boolean passed2 = false;
+    boolean passed3 = false;
+    for (Pair<String, Long> line : lines) {
+      if ("wretched".equals(line.first()) && 24 == line.second()) {
+        passed1 = true;
+      }
+      if ("againe".equals(line.first()) && 10 == line.second()) {
+        passed2 = true;
+      }
+      if ("Montparnasse.".equals(line.first()) && 2 == line.second()) {
+        passed3 = true;
+      }
+    }
+    assertTrue(passed1);
+    assertTrue(passed2);
+    assertTrue(passed3);
+  }
+
+  @Override
+  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+    return new FullOuterJoinFn<String, Long, Long>(typeFamily.longs());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java
new file mode 100644
index 0000000..97220ac
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/InnerJoinIT.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTypeFamily;
+
+public class InnerJoinIT extends JoinTester {
+  @Override
+  public void assertPassed(Iterable<Pair<String, Long>> lines) {
+    boolean passed1 = false;
+    boolean passed2 = true;
+    boolean passed3 = true;
+    for (Pair<String, Long> line : lines) {
+      if ("wretched".equals(line.first()) && 24 == line.second()) {
+        passed1 = true;
+      }
+      if ("againe".equals(line.first())) {
+        passed2 = false;
+      }
+      if ("Montparnasse.".equals(line.first())) {
+        passed3 = false;
+      }
+    }
+    assertTrue(passed1);
+    assertTrue(passed2);
+    assertTrue(passed3);
+  }
+
+  @Override
+  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+    return new InnerJoinFn<String, Long, Long>(typeFamily.longs());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java b/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java
new file mode 100644
index 0000000..c5a9f39
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/JoinTester.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.junit.Test;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.lib.Join;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+
+public abstract class JoinTester implements Serializable {
+  private static class WordSplit extends DoFn<String, String> {
+    @Override
+    public void process(String input, Emitter<String> emitter) {
+      for (String word : input.split("\\s+")) {
+        emitter.emit(word);
+      }
+    }
+  }
+
+  protected PTable<String, Long> join(PCollection<String> w1, PCollection<String> w2,
+        PTypeFamily ptf) {
+    PTableType<String, Long> ntt = ptf.tableOf(ptf.strings(), ptf.longs());
+    PTable<String, Long> ws1 = Aggregate.count(w1.parallelDo("ws1", new WordSplit(), ptf.strings()));
+    PTable<String, Long> ws2 = Aggregate.count(w2.parallelDo("ws2", new WordSplit(), ptf.strings()));
+
+    PTable<String, Pair<Long, Long>> join = Join.join(ws1, ws2, getJoinFn(ptf));
+
+    PTable<String, Long> sums = join.parallelDo("cnt",
+        new DoFn<Pair<String, Pair<Long, Long>>, Pair<String, Long>>() {
+          @Override
+          public void process(Pair<String, Pair<Long, Long>> input,
+                              Emitter<Pair<String, Long>> emitter) {
+            Pair<Long, Long> pair = input.second();
+            long sum = (pair.first() != null ? pair.first() : 0) + (pair.second() != null ? pair.second() : 0);
+            emitter.emit(Pair.of(input.first(), sum));
+          }
+        }, ntt);
+
+    return sums;
+  }
+
+  protected void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+    String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
+    String maughamInputPath = FileHelper.createTempCopyOf("maugham.txt");
+
+    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
+    PCollection<String> maugham = pipeline.readTextFile(maughamInputPath);
+    PTable<String, Long> joined = join(shakespeare, maugham, typeFamily);
+    Iterable<Pair<String, Long>> lines = joined.materialize();
+
+    assertPassed(lines);
+
+    pipeline.done();
+  }
+
+  @Test
+   public void testWritableJoin() throws Exception {
+    run(new MRPipeline(InnerJoinIT.class), WritableTypeFamily.getInstance());
+  }
+  
+  @Test
+  public void testAvroJoin() throws Exception {
+    run(new MRPipeline(InnerJoinIT.class), AvroTypeFamily.getInstance());
+  }
+
+  /**
+   * Used to check that the result of the join makes sense.
+   *
+   * @param lines The result of the join.
+   */
+  public abstract void assertPassed(Iterable<Pair<String, Long>> lines);
+
+  /**
+   * @return The JoinFn to use.
+   */
+  protected abstract JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily);
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java
new file mode 100644
index 0000000..aafe1c9
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/LeftOuterJoinIT.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTypeFamily;
+
+public class LeftOuterJoinIT extends JoinTester {
+  @Override
+  public void assertPassed(Iterable<Pair<String, Long>> lines) {
+    boolean passed1 = false;
+    boolean passed2 = false;
+    boolean passed3 = true;
+    for (Pair<String, Long> line : lines) {
+      if ("wretched".equals(line.first()) && 24 == line.second()) {
+        passed1 = true;
+      }
+      if ("againe".equals(line.first()) && 10 == line.second()) {
+        passed2 = true;
+      }
+      if ("Montparnasse.".equals(line.first())) {
+        passed3 = false;
+      }
+    }
+    assertTrue(passed1);
+    assertTrue(passed2);
+    assertTrue(passed3);
+  }
+
+  @Override
+  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+    return new LeftOuterJoinFn<String, Long, Long>(typeFamily.longs());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
new file mode 100644
index 0000000..baf3d4f
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.crunch.FilterFn;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.writable.Writables;
+import com.google.common.collect.Lists;
+
+public class MapsideJoinIT {
+
+  private static class LineSplitter extends MapFn<String, Pair<Integer, String>> {
+
+    @Override
+    public Pair<Integer, String> map(String input) {
+      String[] fields = input.split("\\|");
+      return Pair.of(Integer.parseInt(fields[0]), fields[1]);
+    }
+
+  }
+
+  private static class NegativeFilter extends FilterFn<Pair<Integer, String>> {
+
+    @Override
+    public boolean accept(Pair<Integer, String> input) {
+      return false;
+    }
+
+  }
+
+  @Test(expected = CrunchRuntimeException.class)
+  public void testNonMapReducePipeline() {
+    runMapsideJoin(MemPipeline.getInstance());
+  }
+
+  @Test
+  public void testMapsideJoin_RightSideIsEmpty() throws IOException {
+    MRPipeline pipeline = new MRPipeline(MapsideJoinIT.class);
+    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+    PTable<Integer, String> filteredOrderTable = orderTable.parallelDo(new NegativeFilter(),
+        orderTable.getPTableType());
+
+    PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(customerTable,
+        filteredOrderTable);
+
+    List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined
+        .materialize());
+
+    assertTrue(materializedJoin.isEmpty());
+
+  }
+
+  @Test
+  public void testMapsideJoin() throws IOException {
+    runMapsideJoin(new MRPipeline(MapsideJoinIT.class));
+  }
+
+  private void runMapsideJoin(Pipeline pipeline) {
+    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
+    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
+
+    PTable<Integer, Pair<String, String>> joined = MapsideJoin.join(customerTable, orderTable);
+
+    List<Pair<Integer, Pair<String, String>>> expectedJoinResult = Lists.newArrayList();
+    expectedJoinResult.add(Pair.of(111, Pair.of("John Doe", "Corn flakes")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet paper")));
+    expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet plunger")));
+    expectedJoinResult.add(Pair.of(333, Pair.of("Someone Else", "Toilet brush")));
+
+    List<Pair<Integer, Pair<String, String>>> joinedResultList = Lists.newArrayList(joined
+        .materialize());
+    Collections.sort(joinedResultList);
+
+    assertEquals(expectedJoinResult, joinedResultList);
+  }
+
+  private static PTable<Integer, String> readTable(Pipeline pipeline, String filename) {
+    try {
+      return pipeline.readTextFile(FileHelper.createTempCopyOf(filename)).parallelDo("asTable",
+          new LineSplitter(), Writables.tableOf(Writables.ints(), Writables.strings()));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java
new file mode 100644
index 0000000..5fcef72
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/MultiAvroSchemaJoinIT.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.apache.crunch.types.avro.Avros.records;
+import static org.apache.crunch.types.avro.Avros.strings;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.Employee;
+import org.apache.crunch.test.Person;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class MultiAvroSchemaJoinIT {
+
+  private File personFile;
+  private File employeeFile;
+  
+  @Before
+  public void setUp() throws Exception {
+    this.personFile = File.createTempFile("person", ".avro");
+    this.employeeFile = File.createTempFile("employee", ".avro");
+
+    DatumWriter<Person> pdw = new SpecificDatumWriter<Person>();
+    DataFileWriter<Person> pfw = new DataFileWriter<Person>(pdw);
+    pfw.create(Person.SCHEMA$, personFile);
+    Person p1 = new Person();
+    p1.setName("Josh");
+    p1.setAge(19);
+    p1.setSiblingnames(ImmutableList.<CharSequence>of("Kate", "Mike"));
+    pfw.append(p1);
+    Person p2 = new Person();
+    p2.setName("Kate");
+    p2.setAge(17);
+    p2.setSiblingnames(ImmutableList.<CharSequence>of("Josh", "Mike"));
+    pfw.append(p2);
+    Person p3 = new Person();
+    p3.setName("Mike");
+    p3.setAge(12);
+    p3.setSiblingnames(ImmutableList.<CharSequence>of("Josh", "Kate"));
+    pfw.append(p3);
+    pfw.close();
+    
+    DatumWriter<Employee> edw = new SpecificDatumWriter<Employee>();
+    DataFileWriter<Employee> efw = new DataFileWriter<Employee>(edw);
+    efw.create(Employee.SCHEMA$, employeeFile);
+    Employee e1 = new Employee();
+    e1.setName("Kate");
+    e1.setSalary(100000);
+    e1.setDepartment("Marketing");
+    efw.append(e1);
+    efw.close();
+  }
+  
+  @After
+  public void tearDown() throws Exception {
+    personFile.delete();
+    employeeFile.delete();
+  }
+  
+  public static class NameFn<K extends SpecificRecord> extends MapFn<K, String> {
+    @Override
+    public String map(K input) {
+      Schema s = input.getSchema();
+      Schema.Field f = s.getField("name");
+      return input.get(f.pos()).toString();
+    }
+  }
+  
+  @Test
+  public void testJoin() throws Exception {
+     Pipeline p = new MRPipeline(MultiAvroSchemaJoinIT.class);
+     PCollection<Person> people = p.read(From.avroFile(personFile.getAbsolutePath(), records(Person.class)));
+     PCollection<Employee> employees = p.read(From.avroFile(employeeFile.getAbsolutePath(), records(Employee.class)));
+     
+     Iterable<Pair<Person, Employee>> result = people.by(new NameFn<Person>(), strings())
+         .join(employees.by(new NameFn<Employee>(), strings())).values().materialize();
+     List<Pair<Person, Employee>> v = Lists.newArrayList(result);
+     assertEquals(1, v.size());
+     assertEquals("Kate", v.get(0).first().getName().toString());
+     assertEquals("Kate", v.get(0).second().getName().toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java b/crunch/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java
new file mode 100644
index 0000000..a3bb122
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/RightOuterJoinIT.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib.join;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTypeFamily;
+
+public class RightOuterJoinIT extends JoinTester {
+  @Override
+  public void assertPassed(Iterable<Pair<String, Long>> lines) {
+    boolean passed1 = false;
+    boolean passed2 = true;
+    boolean passed3 = false;
+    for (Pair<String, Long> line : lines) {
+      if ("wretched".equals(line.first()) && 24 == line.second()) {
+        passed1 = true;
+      }
+      if ("againe".equals(line.first())) {
+        passed2 = false;
+      }
+      if ("Montparnasse.".equals(line.first()) && 2 == line.second()) {
+        passed3 = true;
+      }
+    }
+    assertTrue(passed1);
+    assertTrue(passed2);
+    assertTrue(passed3);
+  }
+
+  @Override
+  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
+    return new RightOuterJoinFn<String, Long, Long>(typeFamily.longs());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/resources/customers.txt
----------------------------------------------------------------------
diff --git a/crunch/src/it/resources/customers.txt b/crunch/src/it/resources/customers.txt
new file mode 100644
index 0000000..98f3f3d
--- /dev/null
+++ b/crunch/src/it/resources/customers.txt
@@ -0,0 +1,4 @@
+111|John Doe
+222|Jane Doe
+333|Someone Else
+444|Has No Orders
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/resources/docs.txt
----------------------------------------------------------------------
diff --git a/crunch/src/it/resources/docs.txt b/crunch/src/it/resources/docs.txt
new file mode 100644
index 0000000..90a3f65
--- /dev/null
+++ b/crunch/src/it/resources/docs.txt
@@ -0,0 +1,6 @@
+A	this doc has this text
+A	and this text as well
+A	but also this
+B	this doc has some text
+B	but not as much as the last
+B	doc

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/resources/employee.avro
----------------------------------------------------------------------
diff --git a/crunch/src/it/resources/employee.avro b/crunch/src/it/resources/employee.avro
new file mode 100644
index 0000000..3563df9
--- /dev/null
+++ b/crunch/src/it/resources/employee.avro
@@ -0,0 +1,9 @@
+{
+"namespace": "com.cloudera.crunch.test",
+"name": "Employee",
+"type": "record",
+"fields": [
+  {"name": "name", "type": ["string", "null"] },
+  {"name": "salary", "type": "int"},
+  {"name": "department", "type": ["string", "null"] } ]
+} 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/resources/emptyTextFile.txt
----------------------------------------------------------------------
diff --git a/crunch/src/it/resources/emptyTextFile.txt b/crunch/src/it/resources/emptyTextFile.txt
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/resources/letters.txt
----------------------------------------------------------------------
diff --git a/crunch/src/it/resources/letters.txt b/crunch/src/it/resources/letters.txt
new file mode 100644
index 0000000..916bfc9
--- /dev/null
+++ b/crunch/src/it/resources/letters.txt
@@ -0,0 +1,2 @@
+a
+bb
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/42c9e4e5/crunch/src/it/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/crunch/src/it/resources/log4j.properties b/crunch/src/it/resources/log4j.properties
new file mode 100644
index 0000000..c8173d7
--- /dev/null
+++ b/crunch/src/it/resources/log4j.properties
@@ -0,0 +1,11 @@
+# ***** Set root logger level to INFO and its only appender to A.
+log4j.logger.org.apache.crunch=info, A
+
+# Log warnings on Hadoop for the local runner when testing
+log4j.logger.org.apache.hadoop=warn, A
+
+# ***** A is set to be a ConsoleAppender.
+log4j.appender.A=org.apache.log4j.ConsoleAppender
+# ***** A uses PatternLayout.
+log4j.appender.A.layout=org.apache.log4j.PatternLayout
+log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n


Mime
View raw message