incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [18/28] Rename com.cloudera.crunch -> org.apache.crunch in the Java core
Date Sat, 07 Jul 2012 21:49:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/impl/mr/collect/UnionCollectionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/impl/mr/collect/UnionCollectionTest.java b/src/test/java/com/cloudera/crunch/impl/mr/collect/UnionCollectionTest.java
deleted file mode 100644
index 655bd57..0000000
--- a/src/test/java/com/cloudera/crunch/impl/mr/collect/UnionCollectionTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package com.cloudera.crunch.impl.mr.collect;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.PTableKeyValueTest;
-import com.cloudera.crunch.Pipeline;
-import com.cloudera.crunch.impl.mem.MemPipeline;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.io.At;
-import com.cloudera.crunch.io.To;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.avro.AvroTypeFamily;
-import com.cloudera.crunch.types.avro.Avros;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-import com.google.common.collect.Lists;
-
-@RunWith(value = Parameterized.class)
-public class UnionCollectionTest {
-
-	private static final Log LOG = LogFactory.getLog(UnionCollectionTest.class);
-
-	private PTypeFamily typeFamily;
-	private Pipeline pipeline;
-	private PCollection<String> union;
-
-	private ArrayList<String> EXPECTED = Lists.newArrayList("a", "a", "b", "c", "c", "d", "e");
-
-	@Before
-	@SuppressWarnings("unchecked")
-	public void setUp() throws IOException {
-		String inputFile1 = FileHelper.createTempCopyOf("set1.txt");
-		String inputFile2 = FileHelper.createTempCopyOf("set2.txt");
-
-		PCollection<String> firstCollection = pipeline.read(At.textFile(inputFile1,
-				typeFamily.strings()));
-		PCollection<String> secondCollection = pipeline.read(At.textFile(inputFile2,
-				typeFamily.strings()));
-
-		LOG.info("Test fixture: [" + pipeline.getClass().getSimpleName() + " : "
-				+ typeFamily.getClass().getSimpleName() + "]  First: "
-				+ Lists.newArrayList(firstCollection.materialize().iterator()) + ", Second: "
-				+ Lists.newArrayList(secondCollection.materialize().iterator()));
-
-		union = secondCollection.union(firstCollection);
-	}
-
-	@After
-	public void tearDown() {
-		pipeline.done();
-	}
-
-	@Parameters
-	public static Collection<Object[]> data() throws IOException {
-		Object[][] data = new Object[][] {
-				{ WritableTypeFamily.getInstance(), new MRPipeline(PTableKeyValueTest.class) },
-				{ WritableTypeFamily.getInstance(), MemPipeline.getInstance() },
-				{ AvroTypeFamily.getInstance(), new MRPipeline(PTableKeyValueTest.class) },
-				{ AvroTypeFamily.getInstance(), MemPipeline.getInstance() } };
-		return Arrays.asList(data);
-	}
-
-	public UnionCollectionTest(PTypeFamily typeFamily, Pipeline pipeline) {
-		this.typeFamily = typeFamily;
-		this.pipeline = pipeline;
-	}
-
-	@Test
-	public void unionMaterializeShouldNotThrowNPE() {
-		checkMaterialized(union.materialize());
-		checkMaterialized(pipeline.materialize(union));
-	}
-
-	private void checkMaterialized(Iterable<String> materialized) {
-
-		List<String> materializedValues = Lists.newArrayList(materialized.iterator());
-		Collections.sort(materializedValues);
-		LOG.info("Materialized union: " + materializedValues);
-
-		assertEquals(EXPECTED, materializedValues);
-	}
-
-	@Test
-	public void unionWriteShouldNotThrowNPE() throws IOException {
-
-		File outputPath1 = FileHelper.createOutputPath();
-		File outputPath2 = FileHelper.createOutputPath();
-		File outputPath3 = FileHelper.createOutputPath();
-
-		if (typeFamily == AvroTypeFamily.getInstance()) {
-			union.write(To.avroFile(outputPath1.getAbsolutePath()));
-			pipeline.write(union, To.avroFile(outputPath2.getAbsolutePath()));
-
-			pipeline.run();
-
-			checkFileContents(outputPath1.getAbsolutePath());
-			checkFileContents(outputPath2.getAbsolutePath());
-
-		} else {
-
-			union.write(To.textFile(outputPath1.getAbsolutePath()));
-			pipeline.write(union, To.textFile(outputPath2.getAbsolutePath()));
-			pipeline.writeTextFile(union, outputPath3.getAbsolutePath());
-
-			pipeline.run();
-
-			checkFileContents(outputPath1.getAbsolutePath());
-			checkFileContents(outputPath2.getAbsolutePath());
-			checkFileContents(outputPath3.getAbsolutePath());
-		}
-
-	}
-
-	private void checkFileContents(String filePath) throws IOException {
-
-		List<String> fileContentValues = (typeFamily != AvroTypeFamily.getInstance() || !(pipeline instanceof MRPipeline)) ? Lists
-				.newArrayList(pipeline.read(At.textFile(filePath, typeFamily.strings())).materialize()
-						.iterator()) : Lists.newArrayList(pipeline.read(At.avroFile(filePath, Avros.strings()))
-				.materialize().iterator());
-
-		Collections.sort(fileContentValues);
-
-		LOG.info("Saved Union: " + fileContentValues);
-		assertEquals(EXPECTED, fileContentValues);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/impl/mr/plan/JobNameBuilderTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/impl/mr/plan/JobNameBuilderTest.java b/src/test/java/com/cloudera/crunch/impl/mr/plan/JobNameBuilderTest.java
deleted file mode 100644
index 6521c4a..0000000
--- a/src/test/java/com/cloudera/crunch/impl/mr/plan/JobNameBuilderTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mr.plan;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.types.writable.Writables;
-import com.google.common.collect.Lists;
-
-public class JobNameBuilderTest {
-
-  @Test
-  public void testBuild() {
-    final String pipelineName = "PipelineName";
-    final String nodeName = "outputNode";
-    DoNode doNode = DoNode.createOutputNode(nodeName, Writables.strings());
-    JobNameBuilder jobNameBuilder = new JobNameBuilder(pipelineName);
-    jobNameBuilder.visit(Lists.newArrayList(doNode));
-    String jobName = jobNameBuilder.build();
-    
-    assertEquals(String.format("%s: %s", pipelineName, nodeName), jobName);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/io/CompositePathIterableTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/io/CompositePathIterableTest.java b/src/test/java/com/cloudera/crunch/io/CompositePathIterableTest.java
deleted file mode 100644
index ad63983..0000000
--- a/src/test/java/com/cloudera/crunch/io/CompositePathIterableTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package com.cloudera.crunch.io;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.junit.Test;
-
-import com.cloudera.crunch.io.text.TextFileReaderFactory;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.writable.Writables;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-
-
-public class CompositePathIterableTest {
-
-  
-  @Test
-  public void testCreate_FilePresent() throws IOException{
-    String inputFilePath = FileHelper.createTempCopyOf("set1.txt");
-    Configuration conf = new Configuration();
-    LocalFileSystem local = FileSystem.getLocal(conf);
-    
-    Iterable<String> iterable = CompositePathIterable.create(local, new Path(inputFilePath), new TextFileReaderFactory<String>(Writables.strings(), conf));
-    
-    assertEquals(Lists.newArrayList("b", "c", "a", "e"), Lists.newArrayList(iterable));
-    
-  }
-  
-  @Test
-  public void testCreate_DirectoryPresentButNoFiles() throws IOException{
-    String inputFilePath = Files.createTempDir().getAbsolutePath();
- 
-    Configuration conf = new Configuration();
-    LocalFileSystem local = FileSystem.getLocal(conf);
-    
-    Iterable<String> iterable = CompositePathIterable.create(local, new Path(inputFilePath), new TextFileReaderFactory<String>(Writables.strings(), conf));
-    
-    assertTrue(Lists.newArrayList(iterable).isEmpty());
-  }
-  
-  @Test(expected=IOException.class)
-  public void testCreate_DirectoryNotPresent() throws IOException{
-    File inputFileDir = Files.createTempDir();
-    inputFileDir.delete();
-    
-    // Sanity check
-    assertFalse(inputFileDir.exists());
-    
-    Configuration conf = new Configuration();
-    LocalFileSystem local = FileSystem.getLocal(conf);
-    
-    CompositePathIterable.create(local, new Path(inputFileDir.getAbsolutePath()), new TextFileReaderFactory<String>(Writables.strings(), conf));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/io/SourceTargetHelperTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/io/SourceTargetHelperTest.java b/src/test/java/com/cloudera/crunch/io/SourceTargetHelperTest.java
deleted file mode 100644
index b1c73f0..0000000
--- a/src/test/java/com/cloudera/crunch/io/SourceTargetHelperTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package com.cloudera.crunch.io;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.junit.Test;
-
-public class SourceTargetHelperTest {
-
-	@Test
-	public void testGetNonexistentPathSize() throws Exception {
-		File tmp = File.createTempFile("pathsize", "");
-		Path tmpPath = new Path(tmp.getAbsolutePath());
-		tmp.delete();
-		FileSystem fs = FileSystem.getLocal(new Configuration());
-		assertEquals(-1L, SourceTargetHelper.getPathSize(fs, tmpPath));
-	}
-
-	@Test
-	public void testGetNonExistentPathSize_NonExistantPath() throws IOException {
-		FileSystem mockFs = new MockFileSystem();
-		assertEquals(-1L, SourceTargetHelper.getPathSize(mockFs, new Path("does/not/exist")));
-	}
-
-	/**
-	 * Mock FileSystem that returns null for {@link FileSystem#listStatus(Path)}.
-	 */
-	static class MockFileSystem extends LocalFileSystem {
-
-		@Override
-		public FileStatus[] listStatus(Path f) throws IOException {
-			return null;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/io/avro/AvroFileReaderFactoryTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/io/avro/AvroFileReaderFactoryTest.java b/src/test/java/com/cloudera/crunch/io/avro/AvroFileReaderFactoryTest.java
deleted file mode 100644
index 390fbf1..0000000
--- a/src/test/java/com/cloudera/crunch/io/avro/AvroFileReaderFactoryTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.avro;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.cloudera.crunch.test.Person;
-import com.cloudera.crunch.types.avro.Avros;
-import com.google.common.collect.Lists;
-
-public class AvroFileReaderFactoryTest {
-
-	private File avroFile;
-
-	@Before
-	public void setUp() throws IOException {
-		// InputSupplier<InputStream> inputStreamSupplier =
-		// newInputStreamSupplier(getResource("person.avro"));
-		avroFile = File.createTempFile("test", ".av");
-	}
-
-	@After
-	public void tearDown() {
-		avroFile.delete();
-	}
-
-	private void populateGenericFile(List<GenericRecord> genericRecords,
-			Schema outputSchema) throws IOException {
-		FileOutputStream outputStream = new FileOutputStream(this.avroFile);
-		GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(
-				outputSchema);
-
-		DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(
-				genericDatumWriter);
-		dataFileWriter.create(outputSchema, outputStream);
-
-		for (GenericRecord record : genericRecords) {
-			dataFileWriter.append(record);
-		}
-
-		dataFileWriter.close();
-		outputStream.close();
-
-	}
-
-	@Test
-	public void testRead_GenericReader() throws IOException {
-		GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
-		savedRecord.put("name", "John Doe");
-		savedRecord.put("age", 42);
-		savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-		populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
-
-		AvroFileReaderFactory<GenericData.Record> genericReader = new AvroFileReaderFactory<GenericData.Record>(
-				Avros.generics(Person.SCHEMA$), new Configuration());
-		Iterator<GenericData.Record> recordIterator = genericReader.read(
-				FileSystem.getLocal(new Configuration()), new Path(
-						this.avroFile.getAbsolutePath()));
-
-		GenericRecord genericRecord = recordIterator.next();
-		assertEquals(savedRecord, genericRecord);
-		assertFalse(recordIterator.hasNext());
-	}
-
-	@Test
-	public void testRead_SpecificReader() throws IOException {
-		GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
-		savedRecord.put("name", "John Doe");
-		savedRecord.put("age", 42);
-		savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-		populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
-
-		AvroFileReaderFactory<Person> genericReader = new AvroFileReaderFactory<Person>(
-				Avros.records(Person.class), new Configuration());
-		Iterator<Person> recordIterator = genericReader.read(FileSystem
-				.getLocal(new Configuration()),
-				new Path(this.avroFile.getAbsolutePath()));
-
-		Person expectedPerson = new Person();
-		expectedPerson.setAge(42);
-		expectedPerson.setName("John Doe");
-		List<CharSequence> siblingNames = Lists.newArrayList();
-		siblingNames.add("Jimmy");
-		siblingNames.add("Jane");
-		expectedPerson.setSiblingnames(siblingNames);
-
-		Person person = recordIterator.next();
-
-		assertEquals(expectedPerson, person);
-		assertFalse(recordIterator.hasNext());
-	}
-
-	@Test
-	public void testRead_ReflectReader() throws IOException {
-		Schema reflectSchema = ReflectData.get().getSchema(PojoPerson.class);
-		GenericRecord savedRecord = new GenericData.Record(reflectSchema);
-		savedRecord.put("name", "John Doe");
-		populateGenericFile(Lists.newArrayList(savedRecord), reflectSchema);
-
-		AvroFileReaderFactory<PojoPerson> genericReader = new AvroFileReaderFactory<PojoPerson>(
-				Avros.reflects(PojoPerson.class), new Configuration());
-		Iterator<PojoPerson> recordIterator = genericReader.read(FileSystem
-				.getLocal(new Configuration()),
-				new Path(this.avroFile.getAbsolutePath()));
-
-		PojoPerson person = recordIterator.next();
-
-		assertEquals("John Doe", person.getName());
-		assertFalse(recordIterator.hasNext());
-	}
-
-	public static class PojoPerson {
-		private String name;
-
-		public String getName() {
-			return name;
-		}
-
-		public void setName(String name) {
-			this.name = name;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/io/avro/AvroFileSourceTargetTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/io/avro/AvroFileSourceTargetTest.java b/src/test/java/com/cloudera/crunch/io/avro/AvroFileSourceTargetTest.java
deleted file mode 100644
index ce43a98..0000000
--- a/src/test/java/com/cloudera/crunch/io/avro/AvroFileSourceTargetTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.avro;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.reflect.ReflectData;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.Pipeline;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.io.At;
-import com.cloudera.crunch.io.avro.AvroFileReaderFactoryTest.PojoPerson;
-import com.cloudera.crunch.test.Person;
-import com.cloudera.crunch.types.avro.Avros;
-import com.google.common.collect.Lists;
-
-@SuppressWarnings("serial")
-public class AvroFileSourceTargetTest implements Serializable {
-
-	private transient File avroFile;
-
-	@Before
-	public void setUp() throws IOException {
-		avroFile = File.createTempFile("test", ".avro");
-	}
-
-	@After
-	public void tearDown() {
-		avroFile.delete();
-	}
-
-	private void populateGenericFile(List<GenericRecord> genericRecords,
-			Schema schema) throws IOException {
-		FileOutputStream outputStream = new FileOutputStream(this.avroFile);
-		GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(
-				schema);
-
-		DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(
-				genericDatumWriter);
-		dataFileWriter.create(schema, outputStream);
-
-		for (GenericRecord record : genericRecords) {
-			dataFileWriter.append(record);
-		}
-
-		dataFileWriter.close();
-		outputStream.close();
-
-	}
-
-	@Test
-	public void testSpecific() throws IOException {
-		GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
-		savedRecord.put("name", "John Doe");
-		savedRecord.put("age", 42);
-		savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-		populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
-
-		Pipeline pipeline = new MRPipeline(AvroFileSourceTargetTest.class);
-		PCollection<Person> genericCollection = pipeline.read(At.avroFile(
-				avroFile.getAbsolutePath(), Avros.records(Person.class)));
-
-		List<Person> personList = Lists.newArrayList(genericCollection
-				.materialize());
-
-		Person expectedPerson = new Person();
-		expectedPerson.setName("John Doe");
-		expectedPerson.setAge(42);
-
-		List<CharSequence> siblingNames = Lists.newArrayList();
-		siblingNames.add("Jimmy");
-		siblingNames.add("Jane");
-		expectedPerson.setSiblingnames(siblingNames);
-
-		assertEquals(Lists.newArrayList(expectedPerson),
-				Lists.newArrayList(personList));
-	}
-
-	@Test
-	public void testGeneric() throws IOException {
-		String genericSchemaJson = Person.SCHEMA$.toString().replace("Person",
-				"GenericPerson");
-		Schema genericPersonSchema = new Schema.Parser()
-				.parse(genericSchemaJson);
-		GenericRecord savedRecord = new GenericData.Record(genericPersonSchema);
-		savedRecord.put("name", "John Doe");
-		savedRecord.put("age", 42);
-		savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
-		populateGenericFile(Lists.newArrayList(savedRecord),
-				genericPersonSchema);
-
-		Pipeline pipeline = new MRPipeline(AvroFileSourceTargetTest.class);
-		PCollection<Record> genericCollection = pipeline
-				.read(At.avroFile(avroFile.getAbsolutePath(),
-						Avros.generics(genericPersonSchema)));
-
-		List<Record> recordList = Lists.newArrayList(genericCollection
-				.materialize());
-
-		assertEquals(Lists.newArrayList(savedRecord),
-				Lists.newArrayList(recordList));
-	}
-
-	@Test
-	public void testReflect() throws IOException {
-		Schema pojoPersonSchema = ReflectData.get().getSchema(PojoPerson.class);
-		GenericRecord savedRecord = new GenericData.Record(pojoPersonSchema);
-		savedRecord.put("name", "John Doe");
-		populateGenericFile(Lists.newArrayList(savedRecord), pojoPersonSchema);
-
-		Pipeline pipeline = new MRPipeline(AvroFileSourceTargetTest.class);
-		PCollection<PojoPerson> personCollection = pipeline.read(At.avroFile(
-				avroFile.getAbsolutePath(), Avros.reflects(PojoPerson.class)));
-
-		List<PojoPerson> recordList = Lists.newArrayList(personCollection
-				.materialize());
-
-		assertEquals(1, recordList.size());
-		PojoPerson person = recordList.get(0);
-		assertEquals("John Doe", person.getName());
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/io/avro/AvroFileSourceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/io/avro/AvroFileSourceTest.java b/src/test/java/com/cloudera/crunch/io/avro/AvroFileSourceTest.java
deleted file mode 100644
index 10fce39..0000000
--- a/src/test/java/com/cloudera/crunch/io/avro/AvroFileSourceTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.io.avro;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.mapred.AvroJob;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.cloudera.crunch.test.Person;
-import com.cloudera.crunch.types.avro.AvroType;
-import com.cloudera.crunch.types.avro.Avros;
-
-public class AvroFileSourceTest {
-
-	private Job job;
-	File tempFile;
-
-	@Before
-	public void setUp() throws IOException {
-		job = new Job();
-		tempFile = File.createTempFile("test", ".avr");
-	}
-
-	@After
-	public void tearDown() {
-		tempFile.delete();
-	}
-
-	@Test
-	public void testConfigureJob_SpecificData() throws IOException {
-		AvroType<Person> avroSpecificType = Avros.records(Person.class);
-		AvroFileSource<Person> personFileSource = new AvroFileSource<Person>(
-				new Path(tempFile.getAbsolutePath()), avroSpecificType);
-
-		personFileSource.configureSource(job, -1);
-
-		assertFalse(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT,
-				true));
-		assertEquals(Person.SCHEMA$.toString(),
-				job.getConfiguration().get(AvroJob.INPUT_SCHEMA));
-	}
-
-	@Test
-	public void testConfigureJob_GenericData() throws IOException {
-		AvroType<Record> avroGenericType = Avros.generics(Person.SCHEMA$);
-		AvroFileSource<Record> personFileSource = new AvroFileSource<Record>(
-				new Path(tempFile.getAbsolutePath()), avroGenericType);
-
-		personFileSource.configureSource(job, -1);
-
-		assertTrue(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT,
-				false));
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/io/avro/AvroReflectTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/io/avro/AvroReflectTest.java b/src/test/java/com/cloudera/crunch/io/avro/AvroReflectTest.java
deleted file mode 100644
index b0e2e87..0000000
--- a/src/test/java/com/cloudera/crunch/io/avro/AvroReflectTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-package com.cloudera.crunch.io.avro;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.MapFn;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.Pipeline;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.avro.Avros;
-import com.google.common.collect.Lists;
-
-public class AvroReflectTest implements Serializable {
-
-	static class StringWrapper {
-		private String value;
-
-		public StringWrapper() {
-			this(null);
-		}
-
-		public StringWrapper(String value) {
-			this.value = value;
-		}
-
-		public String getValue() {
-			return value;
-		}
-
-		public void setValue(String value) {
-			this.value = value;
-		}
-
-		@Override
-		public String toString() {
-			return String.format("<StringWrapper(%s)>", value);
-		}
-
-		@Override
-		public int hashCode() {
-			final int prime = 31;
-			int result = 1;
-			result = prime * result + ((value == null) ? 0 : value.hashCode());
-			return result;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (this == obj)
-				return true;
-			if (obj == null)
-				return false;
-			if (getClass() != obj.getClass())
-				return false;
-			StringWrapper other = (StringWrapper) obj;
-			if (value == null) {
-				if (other.value != null)
-					return false;
-			} else if (!value.equals(other.value))
-				return false;
-			return true;
-		}
-
-	}
-
-	@Test
-	public void testReflection() throws IOException {
-		Pipeline pipeline = new MRPipeline(AvroReflectTest.class);
-		PCollection<StringWrapper> stringWrapperCollection = pipeline
-				.readTextFile(FileHelper.createTempCopyOf("set1.txt"))
-				.parallelDo(new MapFn<String, StringWrapper>() {
-
-					@Override
-					public StringWrapper map(String input) {
-						StringWrapper stringWrapper = new StringWrapper();
-						stringWrapper.setValue(input);
-						return stringWrapper;
-					}
-				}, Avros.reflects(StringWrapper.class));
-
-		List<StringWrapper> stringWrappers = Lists
-				.newArrayList(stringWrapperCollection.materialize());
-
-		pipeline.done();
-
-		assertEquals(Lists.newArrayList(new StringWrapper("b"),
-				new StringWrapper("c"), new StringWrapper("a"),
-				new StringWrapper("e")), stringWrappers);
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/lib/AggregateTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/AggregateTest.java b/src/test/java/com/cloudera/crunch/lib/AggregateTest.java
deleted file mode 100644
index dc8bf9a..0000000
--- a/src/test/java/com/cloudera/crunch/lib/AggregateTest.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.lib;
-
-import static com.cloudera.crunch.types.writable.Writables.strings;
-import static com.cloudera.crunch.types.writable.Writables.tableOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.hadoop.io.Text;
-import org.junit.Test;
-
-import com.cloudera.crunch.MapFn;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.Pipeline;
-import com.cloudera.crunch.impl.mem.MemPipeline;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.test.Employee;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.PTableType;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.avro.AvroTypeFamily;
-import com.cloudera.crunch.types.avro.Avros;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-import com.cloudera.crunch.types.writable.Writables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-public class AggregateTest {
-
-  @Test public void testWritables() throws Exception {
-    Pipeline pipeline = new MRPipeline(AggregateTest.class);
-    String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
-    PCollection<String> shakes = pipeline.readTextFile(shakesInputPath);
-    runMinMax(shakes, WritableTypeFamily.getInstance());
-    pipeline.done();
-  }
-
-  @Test public void testAvro() throws Exception {
-    Pipeline pipeline = new MRPipeline(AggregateTest.class);
-    String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
-    PCollection<String> shakes = pipeline.readTextFile(shakesInputPath);
-    runMinMax(shakes, AvroTypeFamily.getInstance());
-    pipeline.done();
-  }
-
-  @Test public void testInMemoryAvro() throws Exception {
-    PCollection<String> someText = MemPipeline.collectionOf(
-        "first line", "second line", "third line");
-    runMinMax(someText, AvroTypeFamily.getInstance());
-  }
-  
-  public static void runMinMax(PCollection<String> shakes, PTypeFamily family) throws Exception {
-    PCollection<Integer> lengths = shakes.parallelDo(new MapFn<String, Integer>() {
-      @Override
-      public Integer map(String input) {
-        return input.length();
-      }
-    }, family.ints());
-    PCollection<Integer> negLengths = lengths.parallelDo(new MapFn<Integer, Integer>() {
-      @Override
-      public Integer map(Integer input) {
-        return -input;
-      }
-    }, family.ints());
-    Integer maxLengths = Iterables.getFirst(Aggregate.max(lengths).materialize(), null);
-    Integer minLengths = Iterables.getFirst(Aggregate.min(negLengths).materialize(), null);
-    assertTrue(maxLengths != null);
-    assertTrue(minLengths != null);
-    assertEquals(maxLengths.intValue(), -minLengths.intValue());
-  }
-  
-  private static class SplitFn extends MapFn<String, Pair<String, String>> {
-    @Override
-    public Pair<String, String> map(String input) {
-      String[] p = input.split("\\s+");
-      return Pair.of(p[0], p[1]);
-    }  
-  }
-  
-  @Test public void testCollectUrls() throws Exception {
-    Pipeline p = new MRPipeline(AggregateTest.class);
-    String urlsInputPath = FileHelper.createTempCopyOf("urls.txt");
-    PTable<String, Collection<String>> urls = Aggregate.collectValues(
-        p.readTextFile(urlsInputPath)
-        .parallelDo(new SplitFn(), tableOf(strings(), strings())));
-    for (Pair<String, Collection<String>> e : urls.materialize()) {
-      String key = e.first();
-      int expectedSize = 0;
-      if ("www.A.com".equals(key)) {
-        expectedSize = 4;
-      } else if ("www.B.com".equals(key) || "www.F.com".equals(key)) {
-        expectedSize = 2;
-      } else if ("www.C.com".equals(key) || "www.D.com".equals(key) || "www.E.com".equals(key)) {
-        expectedSize = 1;
-      }
-      assertEquals("Checking key = " + key, expectedSize, e.second().size());
-      p.done();
-    }
-  }
-  
-  @Test public void testTopN() throws Exception {
-    PTableType<String, Integer> ptype = Avros.tableOf(Avros.strings(), Avros.ints());
-    PTable<String, Integer> counts = MemPipeline.typedTableOf(ptype, "foo", 12, "bar", 17, "baz", 29);
-    
-    PTable<String, Integer> top2 = Aggregate.top(counts, 2, true);
-    assertEquals(ImmutableList.of(Pair.of("baz", 29), Pair.of("bar", 17)), top2.materialize());
-    
-    PTable<String, Integer> bottom2 = Aggregate.top(counts, 2, false);
-    assertEquals(ImmutableList.of(Pair.of("foo", 12), Pair.of("bar", 17)), bottom2.materialize());
-  }
-
-  @Test
-  public void testCollectValues_Writables() throws IOException {
-    Pipeline pipeline = new MRPipeline(AggregateTest.class);
-    Map<Integer, Collection<Text>> collectionMap = pipeline
-        .readTextFile(FileHelper.createTempCopyOf("set2.txt"))
-        .parallelDo(new MapStringToTextPair(),
-            Writables.tableOf(Writables.ints(), Writables.writables(Text.class))
-        ).collectValues().materializeToMap();
-
-    assertEquals(1, collectionMap.size());
-
-    assertEquals(Lists.newArrayList(new Text("c"), new Text("d"), new Text("a")),
-        collectionMap.get(1));
-  }
-
-  @Test
-  public void testCollectValues_Avro() throws IOException {
-
-    MapStringToEmployeePair mapFn = new MapStringToEmployeePair();
-    Pipeline pipeline = new MRPipeline(AggregateTest.class);
-    Map<Integer, Collection<Employee>> collectionMap = pipeline
-        .readTextFile(FileHelper.createTempCopyOf("set2.txt"))
-        .parallelDo(mapFn,
-            Avros.tableOf(Avros.ints(), Avros.records(Employee.class))).collectValues()
-        .materializeToMap();
-
-    assertEquals(1, collectionMap.size());
-
-    Employee empC = mapFn.map("c").second();
-    Employee empD = mapFn.map("d").second();
-    Employee empA = mapFn.map("a").second();
-
-    assertEquals(Lists.newArrayList(empC, empD, empA),
-        collectionMap.get(1));
-  }
-
-  private static class MapStringToTextPair extends MapFn<String, Pair<Integer, Text>> {
-    @Override
-    public Pair<Integer, Text> map(String input) {
-      return Pair.of(1, new Text(input));
-    }
-  }
-
-  private static class MapStringToEmployeePair extends MapFn<String, Pair<Integer, Employee>> {
-    @Override
-    public Pair<Integer, Employee> map(String input) {
-      Employee emp = new Employee();
-      emp.setName(input);
-      emp.setSalary(0);
-      emp.setDepartment("");
-      return Pair.of(1, emp);
-    }
-  }
-
-  public static class PojoText {
-    private String value;
-
-    public PojoText() {
-      this("");
-    }
-
-    public PojoText(String value) {
-      this.value = value;
-    }
-
-    public String getValue() {
-      return value;
-    }
-
-    public void setValue(String value) {
-      this.value = value;
-    }
-
-    @Override
-    public String toString() {
-      return String.format("PojoText<%s>", this.value);
-    }
-
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj)
-        return true;
-      if (obj == null)
-        return false;
-      if (getClass() != obj.getClass())
-        return false;
-      PojoText other = (PojoText) obj;
-      if (value == null) {
-        if (other.value != null)
-          return false;
-      } else if (!value.equals(other.value))
-        return false;
-      return true;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/lib/AvroIndexedRecordPartitionerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/AvroIndexedRecordPartitionerTest.java b/src/test/java/com/cloudera/crunch/lib/AvroIndexedRecordPartitionerTest.java
deleted file mode 100644
index 9f1c848..0000000
--- a/src/test/java/com/cloudera/crunch/lib/AvroIndexedRecordPartitionerTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-package com.cloudera.crunch.lib;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.cloudera.crunch.lib.join.JoinUtils.AvroIndexedRecordPartitioner;
-
-public class AvroIndexedRecordPartitionerTest {
-
-	private AvroIndexedRecordPartitioner avroPartitioner;
-	
-	@Before
-	public void setUp(){
-		avroPartitioner = new AvroIndexedRecordPartitioner();
-	}
-	
-	@Test
-	public void testGetPartition() {
-		IndexedRecord indexedRecord = new MockIndexedRecord(3);
-		AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
-		
-		assertEquals(3, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 5));
-		assertEquals(1, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 2));
-	}
-	
-	@Test
-	public void testGetPartition_NegativeHashValue(){
-		IndexedRecord indexedRecord = new MockIndexedRecord(-3);
-		AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
-		
-		assertEquals(3, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 5));
-		assertEquals(1, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 2));
-	}
-	
-	@Test
-	public void testGetPartition_IntegerMinValue(){
-		IndexedRecord indexedRecord = new MockIndexedRecord(Integer.MIN_VALUE);
-		AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
-		
-		assertEquals(0, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), Integer.MAX_VALUE));
-	}
-	
-	/**
-	 * Mock implementation of IndexedRecord to give us control over the hashCode.
-	 */
-	static class MockIndexedRecord implements IndexedRecord {
-		
-		private Integer value;
-		
-		public MockIndexedRecord(Integer value){
-			this.value = value;
-		}
-		
-		@Override
-		public int hashCode() {
-			return value.hashCode();
-		}
-
-		@Override
-		public Schema getSchema() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public Object get(int arg0) {
-			return this.value;
-		}
-
-		@Override
-		public void put(int arg0, Object arg1) {
-			throw new UnsupportedOperationException();
-		}
-		
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/lib/AvroTypeSortTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/AvroTypeSortTest.java b/src/test/java/com/cloudera/crunch/lib/AvroTypeSortTest.java
deleted file mode 100644
index 0f51ef6..0000000
--- a/src/test/java/com/cloudera/crunch/lib/AvroTypeSortTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.lib;
-
-import static com.cloudera.crunch.types.avro.Avros.ints;
-import static com.cloudera.crunch.types.avro.Avros.records;
-import static com.cloudera.crunch.types.avro.Avros.strings;
-import static junit.framework.Assert.assertEquals;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.cloudera.crunch.MapFn;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.io.At;
-import com.cloudera.crunch.test.Person;
-import com.google.common.collect.Lists;
-
-/**
- * Test sorting Avro types by selected inner field
- */
-public class AvroTypeSortTest implements Serializable {
-
-	private static final long serialVersionUID = 1344118240353796561L;
-
-	private transient File avroFile;
-
-	@Before
-	public void setUp() throws IOException {
-		avroFile = File.createTempFile("avrotest", ".avro");
-	}
-
-	@After
-	public void tearDown() {
-		avroFile.delete();
-	}
-
-	@Test
-	public void testSortAvroTypesBySelectedFields() throws Exception {
-
-		MRPipeline pipeline = new MRPipeline(AvroTypeSortTest.class);
-
-		Person ccc10 = createPerson("CCC", 10);
-		Person bbb20 = createPerson("BBB", 20);
-		Person aaa30 = createPerson("AAA", 30);
-
-		writeAvroFile(Lists.newArrayList(ccc10, bbb20, aaa30), avroFile);
-
-		PCollection<Person> unsorted = pipeline.read(At.avroFile(
-				avroFile.getAbsolutePath(), records(Person.class)));
-
-		// Sort by Name
-		MapFn<Person, String> nameExtractor = new MapFn<Person, String>() {
-
-			@Override
-			public String map(Person input) {
-				return input.getName().toString();
-			}
-		};
-
-		PCollection<Person> sortedByName = unsorted
-				.by(nameExtractor, strings()).groupByKey().ungroup().values();
-
-		List<Person> sortedByNameList = Lists.newArrayList(sortedByName
-				.materialize());
-
-		assertEquals(3, sortedByNameList.size());
-		assertEquals(aaa30, sortedByNameList.get(0));
-		assertEquals(bbb20, sortedByNameList.get(1));
-		assertEquals(ccc10, sortedByNameList.get(2));
-
-		// Sort by Age
-
-		MapFn<Person, Integer> ageExtractor = new MapFn<Person, Integer>() {
-
-			@Override
-			public Integer map(Person input) {
-				return input.getAge();
-			}
-		};
-
-		PCollection<Person> sortedByAge = unsorted.by(ageExtractor, ints())
-				.groupByKey().ungroup().values();
-
-		List<Person> sortedByAgeList = Lists.newArrayList(sortedByAge
-				.materialize());
-
-		assertEquals(3, sortedByAgeList.size());
-		assertEquals(ccc10, sortedByAgeList.get(0));
-		assertEquals(bbb20, sortedByAgeList.get(1));
-		assertEquals(aaa30, sortedByAgeList.get(2));
-
-		pipeline.done();
-	}
-
-	private void writeAvroFile(List<Person> people, File avroFile)
-			throws IOException {
-
-		FileOutputStream outputStream = new FileOutputStream(avroFile);
-		SpecificDatumWriter<Person> writer = new SpecificDatumWriter<Person>(
-				Person.class);
-
-		DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(
-				writer);
-		dataFileWriter.create(Person.SCHEMA$, outputStream);
-		for (Person person : people) {
-			dataFileWriter.append(person);
-		}
-		dataFileWriter.close();
-		outputStream.close();
-	}
-
-	private Person createPerson(String name, int age) throws IOException {
-
-		Person person = new Person();
-		person.setAge(age);
-		person.setName(name);
-		List<CharSequence> siblingNames = Lists.newArrayList();
-		person.setSiblingnames(siblingNames);
-
-		return person;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/lib/CartesianTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/CartesianTest.java b/src/test/java/com/cloudera/crunch/lib/CartesianTest.java
deleted file mode 100644
index b2d9b94..0000000
--- a/src/test/java/com/cloudera/crunch/lib/CartesianTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package com.cloudera.crunch.lib;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashSet;
-import java.util.Iterator;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.impl.mem.MemPipeline;
-import com.cloudera.crunch.types.writable.Writables;
-import com.google.common.collect.ImmutableList;
-
-public class CartesianTest {
-
-  @Test
-  public void testCartesianCollection() {
-    ImmutableList<ImmutableList<Integer>> testCases = ImmutableList.of(
-        ImmutableList.of(1, 2, 3, 4, 5), ImmutableList.<Integer>of(1, 2, 3), ImmutableList.<Integer>of());
-
-    for (int t1 = 0; t1 < testCases.size(); t1++) {
-      ImmutableList<Integer> testCase1 = testCases.get(t1);
-      for (int t2 = t1; t2 < testCases.size(); t2++) {
-        ImmutableList<Integer> testCase2 = testCases.get(t2);
-
-        PCollection<Integer> X = MemPipeline.typedCollectionOf(Writables.ints(), testCase1);
-        PCollection<Integer> Y = MemPipeline.typedCollectionOf(Writables.ints(), testCase2);
-
-        PCollection<Pair<Integer,Integer>> cross = Cartesian.cross(X, Y);
-        HashSet<Pair<Integer, Integer>> crossSet = new HashSet<Pair<Integer, Integer>>();
-        for (Iterator<Pair<Integer, Integer>> i = cross.materialize().iterator(); i.hasNext(); ) {
-          crossSet.add(i.next());
-        }
-        assertEquals(crossSet.size(), testCase1.size() * testCase2.size());
-
-        for (int i = 0; i < testCase1.size(); i++) {
-          for (int j = 0; j < testCase2.size(); j++) {
-            assertTrue(crossSet.contains(Pair.of(testCase1.get(i), testCase2.get(j))));
-          }
-        }
-      }
-    }
-  }
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/lib/CogroupTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/CogroupTest.java b/src/test/java/com/cloudera/crunch/lib/CogroupTest.java
deleted file mode 100644
index 648d11f..0000000
--- a/src/test/java/com/cloudera/crunch/lib/CogroupTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.lib;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.Collection;
-import java.util.List;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.CombineFn;
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.Pipeline;
-import com.cloudera.crunch.fn.MapKeysFn;
-import com.cloudera.crunch.fn.MapValuesFn;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.io.From;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.PTableType;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.avro.AvroTypeFamily;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-import com.google.common.base.Splitter;
-import com.google.common.io.Files;
-
-public class CogroupTest {
-
-  private static class WordSplit extends DoFn<String, Pair<String, Long>> {
-    @Override
-    public void process(String input, Emitter<Pair<String, Long>> emitter) {
-      for (String word : Splitter.on(' ').split(input)) {
-        emitter.emit(Pair.of(word, 1L));
-      }
-    }
-  }
-
-  public static PTable<String, Long> join(PCollection<String> w1,
-      PCollection<String> w2, PTypeFamily ptf) {
-    PTableType<String, Long> ntt = ptf.tableOf(ptf.strings(), ptf.longs());
-    PTable<String, Long> ws1 = w1.parallelDo("ws1", new WordSplit(), ntt);
-    PTable<String, Long> ws2 = w2.parallelDo("ws2", new WordSplit(), ntt);
-    PTable<String, Pair<Collection<Long>, Collection<Long>>> cg = Cogroup.cogroup(ws1, ws2);
-    PTable<String, Long> sums = cg.parallelDo(
-        "wc",
-        new MapValuesFn<String, Pair<Collection<Long>, Collection<Long>>, Long>() {
-          @Override
-          public Long map(Pair<Collection<Long>, Collection<Long>> v) {
-            long sum = 0L;
-            for (Long value : v.first()) {
-              sum += value;
-            }
-            for (Long value : v.second()) {
-              sum += value;
-            }
-            return sum;
-          }
-        }, ntt);
-    return sums.parallelDo("firstletters", new MapKeysFn<String, String, Long>() {
-      @Override
-      public String map(String k1) {
-        if (k1.length() > 0) {
-          return k1.substring(0, 1).toLowerCase();
-        } else {
-          return "";
-        }
-      }
-    }, ntt).groupByKey().combineValues(CombineFn.<String>SUM_LONGS());
-  }
-
-  @Test
-  public void testWritableJoin() throws Exception {
-    run(new MRPipeline(CogroupTest.class), WritableTypeFamily.getInstance());
-  }
-  
-  @Test
-  public void testAvroJoin() throws Exception {
-    run(new MRPipeline(CogroupTest.class), AvroTypeFamily.getInstance());
-  }
-  
-  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-    String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
-    String maughamInputPath = FileHelper.createTempCopyOf("maugham.txt");
-    File output = FileHelper.createOutputPath();
-    
-    PCollection<String> shakespeare = pipeline.read(From.textFile(shakesInputPath));
-    PCollection<String> maugham = pipeline.read(From.textFile(maughamInputPath));
-    pipeline.writeTextFile(join(shakespeare, maugham, typeFamily), output.getAbsolutePath());
-    pipeline.done();
-    
-    File outputFile = new File(output, "part-r-00000");
-    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
-    boolean passed = false;
-    for (String line : lines) {
-      if (line.equals("j\t705")) {
-        passed = true;
-        break;
-      }
-    }
-    assertTrue(passed);
-    
-    output.deleteOnExit();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/lib/SampleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/SampleTest.java b/src/test/java/com/cloudera/crunch/lib/SampleTest.java
deleted file mode 100644
index c146d4f..0000000
--- a/src/test/java/com/cloudera/crunch/lib/SampleTest.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package com.cloudera.crunch.lib;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.List;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.impl.mem.MemPipeline;
-import com.google.common.collect.ImmutableList;
-
-public class SampleTest {
-  @Test
-  public void testSampler() {
-	Iterable<Integer> sample = MemPipeline.collectionOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
-	    .sample(0.2, 123998).materialize();
-	List<Integer> sampleValues = ImmutableList.copyOf(sample);
-	assertEquals(ImmutableList.of(6, 7), sampleValues);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/lib/SetTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/SetTest.java b/src/test/java/com/cloudera/crunch/lib/SetTest.java
deleted file mode 100644
index 3cadc0e..0000000
--- a/src/test/java/com/cloudera/crunch/lib/SetTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.lib;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.Pipeline;
-import com.cloudera.crunch.Tuple3;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.io.At;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.avro.AvroTypeFamily;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-import com.google.common.collect.Lists;
-
-@RunWith(value = Parameterized.class)
-public class SetTest {
-  
-  private PTypeFamily typeFamily;
-  
-  private Pipeline pipeline;
-  private PCollection<String> set1;
-  private PCollection<String> set2;
-  
-  public SetTest(PTypeFamily typeFamily) {
-    this.typeFamily = typeFamily;
-  }
-  
-  @Parameters
-  public static Collection<Object[]> data() {
-    Object[][] data = new Object[][] {
-        { WritableTypeFamily.getInstance() },
-        { AvroTypeFamily.getInstance() }
-    };
-    return Arrays.asList(data);
-  }
-  
-  @Before
-  public void setUp() throws IOException {
-    String set1InputPath = FileHelper.createTempCopyOf("set1.txt");
-    String set2InputPath = FileHelper.createTempCopyOf("set2.txt");
-    pipeline = new MRPipeline(SetTest.class);
-    set1 = pipeline.read(At.textFile(set1InputPath, typeFamily.strings()));
-    set2 = pipeline.read(At.textFile(set2InputPath, typeFamily.strings()));
-  }
-  
-  @After
-  public void tearDown() {
-    pipeline.done();
-  }
-  
-  @Test
-  public void testDifference() throws Exception {
-    PCollection<String> difference = Set.difference(set1, set2);
-    assertEquals(Lists.newArrayList("b", "e"),
-        Lists.newArrayList(difference.materialize()));
-  }
-  
-  @Test
-  public void testIntersection() throws Exception {
-    PCollection<String> intersection = Set.intersection(set1, set2);
-    assertEquals(Lists.newArrayList("a", "c"),
-        Lists.newArrayList(intersection.materialize()));
-  }
-  
-  @Test
-  public void testComm() throws Exception {
-    PCollection<Tuple3<String, String, String>> comm = Set.comm(set1, set2);
-    Iterator<Tuple3<String, String, String>> i = comm.materialize().iterator();
-    checkEquals(null, null, "a", i.next());
-    checkEquals("b", null, null, i.next());
-    checkEquals(null, null, "c", i.next());
-    checkEquals(null, "d", null, i.next());
-    checkEquals("e", null, null, i.next());
-    assertFalse(i.hasNext());
-  }
-
-  private void checkEquals(String s1, String s2, String s3,
-      Tuple3<String, String, String> tuple) {
-    assertEquals("first string", s1, tuple.first());
-    assertEquals("second string", s2, tuple.second());
-    assertEquals("third string", s3, tuple.third());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/lib/SortTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/SortTest.java b/src/test/java/com/cloudera/crunch/lib/SortTest.java
deleted file mode 100644
index e65a01a..0000000
--- a/src/test/java/com/cloudera/crunch/lib/SortTest.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.lib;
-
-import static com.cloudera.crunch.lib.Sort.ColumnOrder.by;
-import static com.cloudera.crunch.lib.Sort.Order.ASCENDING;
-import static com.cloudera.crunch.lib.Sort.Order.DESCENDING;
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Arrays;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.Pipeline;
-import com.cloudera.crunch.Tuple3;
-import com.cloudera.crunch.Tuple4;
-import com.cloudera.crunch.TupleN;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.lib.Sort.ColumnOrder;
-import com.cloudera.crunch.lib.Sort.Order;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.PType;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.avro.AvroTypeFamily;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-
-public class SortTest implements Serializable {
-  
-  @Test
-  public void testWritableSortAsc() throws Exception {
-    runSingle(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
-        Order.ASCENDING, "A\tand this text as well");
-  }
-
-  @Test
-  public void testWritableSortDesc() throws Exception {
-    runSingle(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
-        Order.DESCENDING, "B\tthis doc has some text");
-  }
-  
-  @Test
-  public void testWritableSortAscDesc() throws Exception {
-    runPair(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
-        by(1, ASCENDING), by(2, DESCENDING), "A", "this doc has this text");
-  }
-
-  @Test
-  public void testWritableSortSecondDescFirstDesc() throws Exception {
-    runPair(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
-        by(2, DESCENDING), by(1, ASCENDING), "A", "this doc has this text");
-  }
-
-  @Test
-  public void testWritableSortTripleAscDescAsc() throws Exception {
-    runTriple(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
-        by(1, ASCENDING), by(2, DESCENDING), by(3, ASCENDING), "A", "this", "doc");
-  }
-
-  @Test
-  public void testWritableSortQuadAscDescAscDesc() throws Exception {
-    runQuad(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
-        by(1, ASCENDING), by(2, DESCENDING), by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
-  }
-
-  @Test
-  public void testWritableSortTupleNAscDesc() throws Exception {
-    runTupleN(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
-        new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING)}, new String[] { "A", "this doc has this text" });
-  }
-
-  @Test
-  public void testWritableSortTable() throws Exception {
-    runTable(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
-        "A");
-  }
-  
-  @Test
-  public void testAvroSortAsc() throws Exception {
-    runSingle(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
-        Order.ASCENDING, "A\tand this text as well");
-  }
-  
-  @Test
-  public void testAvroSortDesc() throws Exception {
-    runSingle(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
-        Order.DESCENDING, "B\tthis doc has some text");
-  }
-  
-  @Test
-  public void testAvroSortPairAscAsc() throws Exception {
-    runPair(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
-        by(1, ASCENDING), by(2, DESCENDING), "A", "this doc has this text");
-  }
-  
-  @Test
-  @Ignore("Avro sorting only works in field order at the moment")
-  public void testAvroSortPairSecondAscFirstDesc() throws Exception {
-    runPair(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
-        by(2, DESCENDING), by(1, ASCENDING), "A", "this doc has this text");
-  }
-  
-  @Test
-  public void testAvroSortTripleAscDescAsc() throws Exception {
-    runTriple(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
-        by(1, ASCENDING), by(2, DESCENDING), by(3, ASCENDING), "A", "this", "doc");
-  }
-
-  @Test
-  public void testAvroSortQuadAscDescAscDesc() throws Exception {
-    runQuad(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
-        by(1, ASCENDING), by(2, DESCENDING), by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
-  }
-
-  @Test
-  public void testAvroSortTupleNAscDesc() throws Exception {
-    runTupleN(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
-        new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" });
-  }
-  
-  @Test
-  public void testAvroSortTable() throws Exception {
-    runTable(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), "A");
-  }
-
-  private void runSingle(Pipeline pipeline, PTypeFamily typeFamily,
-      Order order, String firstLine) throws IOException {
-    String inputPath = FileHelper.createTempCopyOf("docs.txt");
-    
-    PCollection<String> input = pipeline.readTextFile(inputPath);
-    // following turns the input from Writables to required type family
-    PCollection<String> input2 = input.parallelDo(new DoFn<String, String>() {
-      @Override
-      public void process(String input, Emitter<String> emitter) {
-        emitter.emit(input);
-      }
-    }, typeFamily.strings());
-    PCollection<String> sorted = Sort.sort(input2, order);
-    Iterable<String> lines = sorted.materialize();
-    
-    assertEquals(firstLine, lines.iterator().next());
-    pipeline.done(); // TODO: finally
-  }
-  
-  private void runPair(Pipeline pipeline, PTypeFamily typeFamily,
-      ColumnOrder first, ColumnOrder second, String firstField, String secondField) throws IOException {
-    String inputPath = FileHelper.createTempCopyOf("docs.txt");
-    
-    PCollection<String> input = pipeline.readTextFile(inputPath);
-    PCollection<Pair<String, String>> kv = input.parallelDo(
-      new DoFn<String, Pair<String, String>>() {
-        @Override
-        public void process(String input, Emitter<Pair<String, String>> emitter) {
-          String[] split = input.split("[\t]+");
-          emitter.emit(Pair.of(split[0], split[1]));
-        }
-    }, typeFamily.pairs(typeFamily.strings(), typeFamily.strings()));
-    PCollection<Pair<String, String>> sorted = Sort.sortPairs(kv, first, second);
-    Iterable<Pair<String, String>> lines = sorted.materialize();
-    Pair<String, String> l = lines.iterator().next();
-    assertEquals(firstField, l.first());
-    assertEquals(secondField, l.second());
-    pipeline.done();
-  }
-  
-  private void runTriple(Pipeline pipeline, PTypeFamily typeFamily,
-      ColumnOrder first, ColumnOrder second, ColumnOrder third, String firstField, String secondField, String thirdField) throws IOException {
-    String inputPath = FileHelper.createTempCopyOf("docs.txt");
-    
-    PCollection<String> input = pipeline.readTextFile(inputPath);
-    PCollection<Tuple3<String, String, String>> kv = input.parallelDo(
-      new DoFn<String, Tuple3<String, String, String>>() {
-        @Override
-        public void process(String input, Emitter<Tuple3<String, String, String>> emitter) {
-          String[] split = input.split("[\t ]+");
-          int len = split.length;
-          emitter.emit(Tuple3.of(split[0], split[1 % len], split[2 % len]));
-        }
-    }, typeFamily.triples(typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
-    PCollection<Tuple3<String, String, String>> sorted = Sort.sortTriples(kv, first, second, third);
-    Iterable<Tuple3<String, String, String>> lines = sorted.materialize();
-    Tuple3<String, String, String> l = lines.iterator().next();
-    assertEquals(firstField, l.first());
-    assertEquals(secondField, l.second());
-    assertEquals(thirdField, l.third());
-    pipeline.done();
-  }
-  
-  private void runQuad(Pipeline pipeline, PTypeFamily typeFamily,
-      ColumnOrder first, ColumnOrder second, ColumnOrder third, ColumnOrder fourth,
-      String firstField, String secondField, String thirdField, String fourthField) throws IOException {
-    String inputPath = FileHelper.createTempCopyOf("docs.txt");
-    
-    PCollection<String> input = pipeline.readTextFile(inputPath);
-    PCollection<Tuple4<String, String, String, String>> kv = input.parallelDo(
-      new DoFn<String, Tuple4<String, String, String, String>>() {
-        @Override
-        public void process(String input, Emitter<Tuple4<String, String, String, String>> emitter) {
-          String[] split = input.split("[\t ]+");
-          int len = split.length;
-          emitter.emit(Tuple4.of(split[0], split[1 % len], split[2 % len], split[3 % len]));
-        }
-    }, typeFamily.quads(typeFamily.strings(), typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
-    PCollection<Tuple4<String, String, String, String>> sorted = Sort.sortQuads(kv, first, second, third, fourth);
-    Iterable<Tuple4<String, String, String, String>> lines = sorted.materialize();
-    Tuple4<String, String, String, String> l = lines.iterator().next();
-    assertEquals(firstField, l.first());
-    assertEquals(secondField, l.second());
-    assertEquals(thirdField, l.third());
-    assertEquals(fourthField, l.fourth());
-    pipeline.done();
-  }
-  
-  private void runTupleN(Pipeline pipeline, PTypeFamily typeFamily,
-      ColumnOrder[] orders, String[] fields) throws IOException {
-    String inputPath = FileHelper.createTempCopyOf("docs.txt");
-    
-    PCollection<String> input = pipeline.readTextFile(inputPath);
-    PType[] types = new PType[orders.length];
-    Arrays.fill(types, typeFamily.strings());
-    PCollection<TupleN> kv = input.parallelDo(
-      new DoFn<String, TupleN>() {
-        @Override
-        public void process(String input, Emitter<TupleN> emitter) {
-          String[] split = input.split("[\t]+");
-          emitter.emit(new TupleN(split));
-        }
-    }, typeFamily.tuples(types));
-    PCollection<TupleN> sorted = Sort.sortTuples(kv, orders);
-    Iterable<TupleN> lines = sorted.materialize();
-    TupleN l = lines.iterator().next();
-    int i = 0;
-    for (String field : fields) {
-      assertEquals(field, l.get(i++));      
-    }
-    pipeline.done();
-  }
-
-  private void runTable(Pipeline pipeline, PTypeFamily typeFamily,
-      String firstKey) throws IOException {
-    String inputPath = FileHelper.createTempCopyOf("docs.txt");
-    
-    PCollection<String> input = pipeline.readTextFile(inputPath);
-    PTable<String, String> table = input.parallelDo(
-        new DoFn<String, Pair<String, String>>() {
-          @Override
-          public void process(String input, Emitter<Pair<String, String>> emitter) {
-            String[] split = input.split("[\t]+");
-            emitter.emit(Pair.of(split[0], split[1]));
-          }
-      }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
-    
-    PTable<String, String> sorted = Sort.sort(table);
-    Iterable<Pair<String, String>> lines = sorted.materialize();
-    Pair<String, String> l = lines.iterator().next();
-    assertEquals(firstKey, l.first());
-    pipeline.done();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/lib/SpecificAvroGroupByTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/SpecificAvroGroupByTest.java b/src/test/java/com/cloudera/crunch/lib/SpecificAvroGroupByTest.java
deleted file mode 100644
index ef889ab..0000000
--- a/src/test/java/com/cloudera/crunch/lib/SpecificAvroGroupByTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.lib;
-
-import static junit.framework.Assert.assertEquals;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.cloudera.crunch.MapFn;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.io.At;
-import com.cloudera.crunch.test.Person;
-import com.cloudera.crunch.test.Person.Builder;
-import com.cloudera.crunch.types.avro.Avros;
-import com.cloudera.crunch.types.avro.SafeAvroSerialization;
-import com.google.common.collect.Lists;
-
-/**
- * Test {@link SafeAvroSerialization} with Specific Avro types
- */
-public class SpecificAvroGroupByTest implements Serializable {
-
-	private static final long serialVersionUID = 1344118240353796561L;
-
-	private transient File avroFile;
-
-	@Before
-	public void setUp() throws IOException {
-		avroFile = File.createTempFile("avrotest", ".avro");
-	}
-
-	@After
-	public void tearDown() {
-		avroFile.delete();
-	}
-
-	@Test
-	public void testGrouByWithSpecificAvroType() throws Exception {
-
-		MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByTest.class);
-
-		testSpecificAvro(pipeline);
-	}
-
-  @Test
-  public void testGrouByOnSpecificAvroButReflectionDatumReader()
-      throws Exception {
-    MRPipeline pipeline = new MRPipeline(SpecificAvroGroupByTest.class);
-
-    // https://issues.apache.org/jira/browse/AVRO-1046  resolves 
-    // the ClassCastException when reading specific Avro types with 
-    // ReflectDatumReader
-    
-    pipeline.getConfiguration().setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT,
-        true);
-
-    testSpecificAvro(pipeline);
-  }
-
-	public void testSpecificAvro(MRPipeline pipeline) throws Exception {
-
-		createPersonAvroFile(avroFile);
-
-		PCollection<Person> unsorted = pipeline.read(At.avroFile(
-				avroFile.getAbsolutePath(), Avros.records(Person.class)));
-
-		PTable<String, Person> sorted = unsorted
-				.parallelDo(new MapFn<Person, Pair<String, Person>>() {
-
-					@Override
-					public Pair<String, Person> map(Person input) {
-						String key = input.getName().toString();
-						return Pair.of(key, input);
-
-					}
-				}, Avros.tableOf(Avros.strings(), Avros.records(Person.class)))
-				.groupByKey().ungroup();
-
-		List<Pair<String, Person>> outputPersonList = Lists.newArrayList(sorted
-				.materialize());
-
-		assertEquals(1, outputPersonList.size());
-		assertEquals(String.class, outputPersonList.get(0).first().getClass());
-		assertEquals(Person.class, outputPersonList.get(0).second().getClass());
-		
-		pipeline.done();
-	}
-
-	private void createPersonAvroFile(File avroFile) throws IOException {
-
-		Builder person = Person.newBuilder();
-		person.setAge(40);
-		person.setName("Bob");
-		List<CharSequence> siblingNames = Lists.newArrayList();
-		siblingNames.add("Bob" + "1");
-		siblingNames.add("Bob" + "2");
-		person.setSiblingnames(siblingNames);
-
-		FileOutputStream outputStream = new FileOutputStream(avroFile);
-		SpecificDatumWriter<Person> writer = new SpecificDatumWriter<Person>(
-				Person.class);
-
-		DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(
-				writer);
-		dataFileWriter.create(Person.SCHEMA$, outputStream);
-		dataFileWriter.append(person.build());
-		dataFileWriter.close();
-		outputStream.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/lib/TupleWritablePartitionerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/TupleWritablePartitionerTest.java b/src/test/java/com/cloudera/crunch/lib/TupleWritablePartitionerTest.java
deleted file mode 100644
index 27f786f..0000000
--- a/src/test/java/com/cloudera/crunch/lib/TupleWritablePartitionerTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package com.cloudera.crunch.lib;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.cloudera.crunch.lib.join.JoinUtils.TupleWritablePartitioner;
-import com.cloudera.crunch.types.writable.TupleWritable;
-
-public class TupleWritablePartitionerTest {
-
-	private TupleWritablePartitioner tupleWritableParitioner;
-	
-	@Before
-	public void setUp(){
-		tupleWritableParitioner = new TupleWritablePartitioner();
-	}
-	
-	@Test
-	public void testGetPartition() {
-		IntWritable intWritable = new IntWritable(3);
-		TupleWritable key = new TupleWritable(new Writable[]{intWritable});
-		assertEquals(3, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
-		assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
-	}
-	
-	@Test
-	public void testGetPartition_NegativeHashValue(){
-		IntWritable intWritable = new IntWritable(-3);
-		// Sanity check, if this doesn't work then the premise of this test is wrong
-		assertEquals(-3, intWritable.hashCode());
-		
-		TupleWritable key = new TupleWritable(new Writable[]{intWritable});
-		assertEquals(3, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
-		assertEquals(1, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
-	}
-	
-	@Test
-	public void testGetPartition_IntegerMinValue(){
-		IntWritable intWritable = new IntWritable(Integer.MIN_VALUE);
-		// Sanity check, if this doesn't work then the premise of this test is wrong
-		assertEquals(Integer.MIN_VALUE, intWritable.hashCode());
-		
-		
-		TupleWritable key = new TupleWritable(new Writable[]{intWritable});
-		assertEquals(0, tupleWritableParitioner.getPartition(key, NullWritable.get(), Integer.MAX_VALUE));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/lib/join/FullOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/join/FullOuterJoinTest.java b/src/test/java/com/cloudera/crunch/lib/join/FullOuterJoinTest.java
deleted file mode 100644
index 88b8225..0000000
--- a/src/test/java/com/cloudera/crunch/lib/join/FullOuterJoinTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.lib.join;
-
-import static org.junit.Assert.assertTrue;
-
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.types.PTypeFamily;
-
-public class FullOuterJoinTest extends JoinTester {
-  @Override
-  public void assertPassed(Iterable<Pair<String, Long>> lines) {
-    boolean passed1 = false;
-    boolean passed2 = false;
-    boolean passed3 = false;
-    for (Pair<String, Long> line : lines) {
-      if ("wretched".equals(line.first()) && 24 == line.second()) {
-        passed1 = true;
-      }
-      if ("againe".equals(line.first()) && 10 == line.second()) {
-        passed2 = true;
-      }
-      if ("Montparnasse.".equals(line.first()) && 2 == line.second()) {
-        passed3 = true;
-      }
-    }
-    assertTrue(passed1);
-    assertTrue(passed2);
-    assertTrue(passed3);
-  }
-
-  @Override
-  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
-    return new FullOuterJoinFn<String, Long, Long>(typeFamily.longs());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/lib/join/InnerJoinTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/join/InnerJoinTest.java b/src/test/java/com/cloudera/crunch/lib/join/InnerJoinTest.java
deleted file mode 100644
index b42e51e..0000000
--- a/src/test/java/com/cloudera/crunch/lib/join/InnerJoinTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.lib.join;
-
-import static org.junit.Assert.assertTrue;
-
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.types.PTypeFamily;
-
-public class InnerJoinTest extends JoinTester {
-  @Override
-  public void assertPassed(Iterable<Pair<String, Long>> lines) {
-    boolean passed1 = false;
-    boolean passed2 = true;
-    boolean passed3 = true;
-    for (Pair<String, Long> line : lines) {
-      if ("wretched".equals(line.first()) && 24 == line.second()) {
-        passed1 = true;
-      }
-      if ("againe".equals(line.first())) {
-        passed2 = false;
-      }
-      if ("Montparnasse.".equals(line.first())) {
-        passed3 = false;
-      }
-    }
-    assertTrue(passed1);
-    assertTrue(passed2);
-    assertTrue(passed3);
-  }
-
-  @Override
-  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
-    return new InnerJoinFn<String, Long, Long>(typeFamily.longs());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/lib/join/JoinTester.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/join/JoinTester.java b/src/test/java/com/cloudera/crunch/lib/join/JoinTester.java
deleted file mode 100644
index 6208cb4..0000000
--- a/src/test/java/com/cloudera/crunch/lib/join/JoinTester.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.lib.join;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.PTable;
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.Pipeline;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.lib.Aggregate;
-import com.cloudera.crunch.lib.Join;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.PTableType;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.avro.AvroTypeFamily;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-
-public abstract class JoinTester implements Serializable {
-  private static class WordSplit extends DoFn<String, String> {
-    @Override
-    public void process(String input, Emitter<String> emitter) {
-      for (String word : input.split("\\s+")) {
-        emitter.emit(word);
-      }
-    }
-  }
-
-  protected PTable<String, Long> join(PCollection<String> w1, PCollection<String> w2,
-        PTypeFamily ptf) {
-    PTableType<String, Long> ntt = ptf.tableOf(ptf.strings(), ptf.longs());
-    PTable<String, Long> ws1 = Aggregate.count(w1.parallelDo("ws1", new WordSplit(), ptf.strings()));
-    PTable<String, Long> ws2 = Aggregate.count(w2.parallelDo("ws2", new WordSplit(), ptf.strings()));
-
-    PTable<String, Pair<Long, Long>> join = Join.join(ws1, ws2, getJoinFn(ptf));
-
-    PTable<String, Long> sums = join.parallelDo("cnt",
-        new DoFn<Pair<String, Pair<Long, Long>>, Pair<String, Long>>() {
-          @Override
-          public void process(Pair<String, Pair<Long, Long>> input,
-                              Emitter<Pair<String, Long>> emitter) {
-            Pair<Long, Long> pair = input.second();
-            long sum = (pair.first() != null ? pair.first() : 0) + (pair.second() != null ? pair.second() : 0);
-            emitter.emit(Pair.of(input.first(), sum));
-          }
-        }, ntt);
-
-    return sums;
-  }
-
-  protected void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-    String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
-    String maughamInputPath = FileHelper.createTempCopyOf("maugham.txt");
-
-    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
-    PCollection<String> maugham = pipeline.readTextFile(maughamInputPath);
-    PTable<String, Long> joined = join(shakespeare, maugham, typeFamily);
-    Iterable<Pair<String, Long>> lines = joined.materialize();
-
-    assertPassed(lines);
-
-    pipeline.done();
-  }
-
-  @Test
-   public void testWritableJoin() throws Exception {
-    run(new MRPipeline(InnerJoinTest.class), WritableTypeFamily.getInstance());
-  }
-  
-  @Test
-  public void testAvroJoin() throws Exception {
-    run(new MRPipeline(InnerJoinTest.class), AvroTypeFamily.getInstance());
-  }
-
-  /**
-   * Used to check that the result of the join makes sense.
-   *
-   * @param lines The result of the join.
-   */
-  public abstract void assertPassed(Iterable<Pair<String, Long>> lines);
-
-  /**
-   * @return The JoinFn to use.
-   */
-  protected abstract JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily);
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/lib/join/LeftOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/lib/join/LeftOuterJoinTest.java b/src/test/java/com/cloudera/crunch/lib/join/LeftOuterJoinTest.java
deleted file mode 100644
index 0ad4490..0000000
--- a/src/test/java/com/cloudera/crunch/lib/join/LeftOuterJoinTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.lib.join;
-
-import static org.junit.Assert.assertTrue;
-
-import com.cloudera.crunch.Pair;
-import com.cloudera.crunch.types.PTypeFamily;
-
-public class LeftOuterJoinTest extends JoinTester {
-  @Override
-  public void assertPassed(Iterable<Pair<String, Long>> lines) {
-    boolean passed1 = false;
-    boolean passed2 = false;
-    boolean passed3 = true;
-    for (Pair<String, Long> line : lines) {
-      if ("wretched".equals(line.first()) && 24 == line.second()) {
-        passed1 = true;
-      }
-      if ("againe".equals(line.first()) && 10 == line.second()) {
-        passed2 = true;
-      }
-      if ("Montparnasse.".equals(line.first())) {
-        passed3 = false;
-      }
-    }
-    assertTrue(passed1);
-    assertTrue(passed2);
-    assertTrue(passed3);
-  }
-
-  @Override
-  protected JoinFn<String, Long, Long> getJoinFn(PTypeFamily typeFamily) {
-    return new LeftOuterJoinFn<String, Long, Long>(typeFamily.longs());
-  }
-}


Mime
View raw message