incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [17/28] Rename com.cloudera.crunch -> org.apache.crunch in the Java core
Date Sat, 07 Jul 2012 21:49:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/MaterializeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/MaterializeTest.java b/src/test/java/com/cloudera/crunch/MaterializeTest.java
deleted file mode 100644
index a451da7..0000000
--- a/src/test/java/com/cloudera/crunch/MaterializeTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package com.cloudera.crunch;
-
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.impl.mem.MemPipeline;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.avro.AvroTypeFamily;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-import com.google.common.collect.Lists;
-
-public class MaterializeTest {
-
-	/** Filter that rejects everything. */
-	@SuppressWarnings("serial")
-	private static class FalseFilterFn extends FilterFn<String> {
-
-		@Override
-		public boolean accept(final String input) {
-			return false;
-		}
-	}
-
-	@Test
-	public void testMaterializeInput_Writables() throws IOException {
-		runMaterializeInput(new MRPipeline(MaterializeTest.class), WritableTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeInput_Avro() throws IOException {
-		runMaterializeInput(new MRPipeline(MaterializeTest.class), AvroTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeInput_InMemoryWritables() throws IOException {
-		runMaterializeInput(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeInput_InMemoryAvro() throws IOException {
-		runMaterializeInput(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeEmptyIntermediate_Writables() throws IOException {
-		runMaterializeEmptyIntermediate(new MRPipeline(MaterializeTest.class),
-				WritableTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeEmptyIntermediate_Avro() throws IOException {
-		runMaterializeEmptyIntermediate(new MRPipeline(MaterializeTest.class),
-				AvroTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeEmptyIntermediate_InMemoryWritables() throws IOException {
-		runMaterializeEmptyIntermediate(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeEmptyIntermediate_InMemoryAvro() throws IOException {
-		runMaterializeEmptyIntermediate(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
-	}
-
-	public void runMaterializeInput(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-		List<String> expectedContent = Lists.newArrayList("b", "c", "a", "e");
-		String inputPath = FileHelper.createTempCopyOf("set1.txt");
-
-		PCollection<String> lines = pipeline.readTextFile(inputPath);
-		assertEquals(expectedContent, Lists.newArrayList(lines.materialize()));
-		pipeline.done();
-	}
-
-	public void runMaterializeEmptyIntermediate(Pipeline pipeline, PTypeFamily typeFamily)
-			throws IOException {
-		String inputPath = FileHelper.createTempCopyOf("set1.txt");
-		PCollection<String> empty = pipeline.readTextFile(inputPath).filter(new FalseFilterFn());
-
-		assertTrue(Lists.newArrayList(empty.materialize()).isEmpty());
-		pipeline.done();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/MaterializeToMapTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/MaterializeToMapTest.java b/src/test/java/com/cloudera/crunch/MaterializeToMapTest.java
deleted file mode 100644
index 9550e87..0000000
--- a/src/test/java/com/cloudera/crunch/MaterializeToMapTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch;
-
-import static junit.framework.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.impl.mem.MemPipeline;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.google.common.collect.ImmutableList;
-
-public class MaterializeToMapTest {
-  
-  static final ImmutableList<Pair<Integer,String>> kvPairs = 
-      ImmutableList.of(
-          Pair.of(0, "a"),
-          Pair.of(1, "b"),
-          Pair.of(2, "c"),
-          Pair.of(3, "e"));
-  
-  public void assertMatches(Map<Integer,String> m) {
-    for (Integer k : m.keySet()) {
-      System.out.println(k + " " + kvPairs.get(k).second() + " " + m.get(k));
-      assertTrue(kvPairs.get(k).second().equals(m.get(k)));
-    }
-  }
-  
-  @Test
-  public void testMemMaterializeToMap() {
-    assertMatches(MemPipeline.tableOf(kvPairs).materializeToMap());
-  }
-  
-  private static class Set1Mapper extends MapFn<String,Pair<Integer,String>> {
-    @Override
-    public Pair<Integer, String> map(String input) {
-      
-      int k = -1;
-      if (input.equals("a")) k = 0;
-      else if (input.equals("b")) k = 1;
-      else if (input.equals("c")) k = 2;
-      else if (input.equals("e")) k = 3;
-      return Pair.of(k, input);
-    }
-  }
-  
-  @Test
-  public void testMRMaterializeToMap() throws IOException {
-    Pipeline p = new MRPipeline(MaterializeToMapTest.class);
-    String inputFile = FileHelper.createTempCopyOf("set1.txt");
-    PCollection<String> c = p.readTextFile(inputFile);
-    PTypeFamily tf = c.getTypeFamily();
-    PTable<Integer,String> t = c.parallelDo(new Set1Mapper(), tf.tableOf(tf.ints(), tf.strings()));
-    Map<Integer, String> m = t.materializeToMap();
-    assertMatches(m);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/MultipleOutputTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/MultipleOutputTest.java b/src/test/java/com/cloudera/crunch/MultipleOutputTest.java
deleted file mode 100644
index a12f724..0000000
--- a/src/test/java/com/cloudera/crunch/MultipleOutputTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.List;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.io.At;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.avro.AvroTypeFamily;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-import com.google.common.io.Files;
-
-public class MultipleOutputTest {
-  
-  public static PCollection<String> evenCountLetters(PCollection<String> words, PTypeFamily typeFamily) {
-    return words.parallelDo("even", new FilterFn<String>(){
-
-        @Override
-        public boolean accept(String input) {
-            return input.length() % 2 == 0;
-        }}, typeFamily.strings());
-  }
-  
-  public static PCollection<String> oddCountLetters(PCollection<String> words, PTypeFamily typeFamily) {
-      return words.parallelDo("odd", new FilterFn<String>(){
-
-        @Override
-        public boolean accept(String input) {
-            return input.length() % 2 != 0;
-        }}, typeFamily.strings());
-       
-    }
-  
-  public static PTable<String, Long> substr(PTable<String, Long> ptable) {
-	return ptable.parallelDo(new DoFn<Pair<String, Long>, Pair<String, Long>>() {
-	  public void process(Pair<String, Long> input,
-		  Emitter<Pair<String, Long>> emitter) {
-		if (input.first().length() > 0) {
-		  emitter.emit(Pair.of(input.first().substring(0, 1), input.second()));
-		}
-	  }      
-    }, ptable.getPTableType());
-  }
-  
-  @Test
-  public void testWritables() throws IOException {
-    run(new MRPipeline(MultipleOutputTest.class), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testAvro() throws IOException {
-    run(new MRPipeline(MultipleOutputTest.class), AvroTypeFamily.getInstance());
-  }
- 
-  
-  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-	String inputPath = FileHelper.createTempCopyOf("letters.txt");
-	File outputEven = FileHelper.createOutputPath();
-	File outputOdd = FileHelper.createOutputPath();
-	String outputPathEven = outputEven.getAbsolutePath();
-	String outputPathOdd = outputOdd.getAbsolutePath();
-	
-    PCollection<String> words = pipeline.read(
-         At.textFile(inputPath, typeFamily.strings()));
-    
-    PCollection<String> evenCountWords = evenCountLetters(words, typeFamily);
-    PCollection<String> oddCountWords = oddCountLetters(words, typeFamily);
-    pipeline.writeTextFile(evenCountWords, outputPathEven);
-    pipeline.writeTextFile(oddCountWords, outputPathOdd);
-    
-    pipeline.done();
-   
-    checkFileContents(outputPathEven, Arrays.asList("bb"));
-    checkFileContents(outputPathOdd, Arrays.asList("a"));
-   
-	outputEven.deleteOnExit();
-	outputOdd.deleteOnExit();
-  }  
-  
-  private void checkFileContents(String filePath, List<String> expected) throws IOException{
-    File outputFile = new File(filePath, "part-m-00000");
-    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
-    assertEquals(expected, lines);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/PCollectionGetSizeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/PCollectionGetSizeTest.java b/src/test/java/com/cloudera/crunch/PCollectionGetSizeTest.java
deleted file mode 100644
index 46f9ffe..0000000
--- a/src/test/java/com/cloudera/crunch/PCollectionGetSizeTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-package com.cloudera.crunch;
-
-import static com.cloudera.crunch.io.At.sequenceFile;
-import static com.cloudera.crunch.io.At.textFile;
-import static com.cloudera.crunch.types.writable.Writables.strings;
-import static com.google.common.collect.Lists.newArrayList;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import java.io.IOException;
-
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.cloudera.crunch.impl.mem.MemPipeline;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.test.FileHelper;
-
-public class PCollectionGetSizeTest {
-
-    private String emptyInputPath;
-    private String nonEmptyInputPath;
-    private String outputPath;
-
-    /** Filter that rejects everything. */
-    @SuppressWarnings("serial")
-    private static class FalseFilterFn extends FilterFn<String> {
-
-        @Override
-        public boolean accept(final String input) {
-            return false;
-        }
-    }
-
-    @Before
-    public void setUp() throws IOException {
-        emptyInputPath = FileHelper.createTempCopyOf("emptyTextFile.txt");
-        nonEmptyInputPath = FileHelper.createTempCopyOf("set1.txt");
-        outputPath = FileHelper.createOutputPath().getAbsolutePath();
-    }
-
-    @Test
-    public void testGetSizeOfEmptyInput_MRPipeline() throws IOException {
-        testCollectionGetSizeOfEmptyInput(new MRPipeline(this.getClass()));
-    }
-
-    @Test
-    public void testGetSizeOfEmptyInput_MemPipeline() throws IOException {
-        testCollectionGetSizeOfEmptyInput(MemPipeline.getInstance());
-    }
-
-    private void testCollectionGetSizeOfEmptyInput(Pipeline pipeline) throws IOException {
-
-        assertThat(pipeline.read(textFile(emptyInputPath)).getSize(), is(0L));
-    }
-
-    @Test
-    public void testMaterializeEmptyInput_MRPipeline() throws IOException {
-        testMaterializeEmptyInput(new MRPipeline(this.getClass()));
-    }
-
-    @Test
-    public void testMaterializeEmptyImput_MemPipeline() throws IOException {
-        testMaterializeEmptyInput(MemPipeline.getInstance());
-    }
-
-    private void testMaterializeEmptyInput(Pipeline pipeline) throws IOException {
-        assertThat(newArrayList(pipeline.readTextFile(emptyInputPath).materialize().iterator()).size(), is(0));
-    }
-
-    @Test
-    public void testGetSizeOfEmptyIntermediatePCollection_MRPipeline() throws IOException {
-
-        PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(new MRPipeline(this.getClass()));
-
-        assertThat(emptyIntermediate.getSize(), is(0L));
-    }
-
-    @Test
-    @Ignore("GetSize of a DoCollection is only an estimate based on scale factor, so we can't count on it being reported as 0")
-    public void testGetSizeOfEmptyIntermediatePCollection_NoSave_MRPipeline() throws IOException {
-
-        PCollection<String> data = new MRPipeline(this.getClass()).readTextFile(nonEmptyInputPath);
-
-        PCollection<String> emptyPCollection = data.filter(new FalseFilterFn());
-
-        assertThat(emptyPCollection.getSize(), is(0L));
-    }
-
-    @Test
-    public void testGetSizeOfEmptyIntermediatePCollection_MemPipeline() {
-
-        PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(MemPipeline.getInstance());
-
-        assertThat(emptyIntermediate.getSize(), is(0L));
-    }
-
-    @Test
-    public void testMaterializeOfEmptyIntermediatePCollection_MRPipeline() throws IOException {
-
-        PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(new MRPipeline(this.getClass()));
-
-        assertThat(newArrayList(emptyIntermediate.materialize()).size(), is(0));
-    }
-
-    @Test
-    public void testMaterializeOfEmptyIntermediatePCollection_MemPipeline() {
-
-        PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(MemPipeline.getInstance());
-
-        assertThat(newArrayList(emptyIntermediate.materialize()).size(), is(0));
-    }
-
-    private PCollection<String> createPesistentEmptyIntermediate(Pipeline pipeline) {
-
-        PCollection<String> data = pipeline.readTextFile(nonEmptyInputPath);
-
-        PCollection<String> emptyPCollection = data.filter(new FalseFilterFn());
-
-        emptyPCollection.write(sequenceFile(outputPath, strings()));
-
-        pipeline.run();
-
-        return pipeline.read(sequenceFile(outputPath, strings()));
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testExpectExceptionForGettingSizeOfNonExistingFile_MRPipeline() throws IOException {
-        new MRPipeline(this.getClass()).readTextFile("non_existing.file").getSize();
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testExpectExceptionForGettingSizeOfNonExistingFile_MemPipeline() {
-        MemPipeline.getInstance().readTextFile("non_existing.file").getSize();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/PTableKeyValueTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/PTableKeyValueTest.java b/src/test/java/com/cloudera/crunch/PTableKeyValueTest.java
deleted file mode 100644
index b1961b1..0000000
--- a/src/test/java/com/cloudera/crunch/PTableKeyValueTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package com.cloudera.crunch;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-
-import junit.framework.Assert;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.io.At;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.avro.AvroTypeFamily;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-import com.google.common.collect.Lists;
-
-@RunWith(value = Parameterized.class)
-public class PTableKeyValueTest implements Serializable {
-
-	private static final long serialVersionUID = 4374227704751746689L;
-
-	private transient PTypeFamily typeFamily;
-	private transient MRPipeline pipeline;
-	private transient String inputFile;
-
-	@Before
-	public void setUp() throws IOException {
-		pipeline = new MRPipeline(PTableKeyValueTest.class);
-		inputFile = FileHelper.createTempCopyOf("set1.txt");
-	}
-
-	@After
-	public void tearDown() {
-		pipeline.done();
-	}
-
-	public PTableKeyValueTest(PTypeFamily typeFamily) {
-		this.typeFamily = typeFamily;
-	}
-
-	@Parameters
-	public static Collection<Object[]> data() {
-		Object[][] data = new Object[][] {
-				{ WritableTypeFamily.getInstance() },
-				{ AvroTypeFamily.getInstance() } };
-		return Arrays.asList(data);
-	}
-
-	@Test
-	public void testKeysAndValues() throws Exception {
-
-		PCollection<String> collection = pipeline.read(At.textFile(inputFile,
-				typeFamily.strings()));
-
-		PTable<String, String> table = collection.parallelDo(
-				new DoFn<String, Pair<String, String>>() {
-
-					@Override
-					public void process(String input,
-							Emitter<Pair<String, String>> emitter) {
-						emitter.emit(Pair.of(input.toUpperCase(), input));
-
-					}
-				}, typeFamily.tableOf(typeFamily.strings(),
-						typeFamily.strings()));
-
-		PCollection<String> keys = table.keys();
-		PCollection<String> values = table.values();
-
-		ArrayList<String> keyList = Lists.newArrayList(keys.materialize()
-				.iterator());
-		ArrayList<String> valueList = Lists.newArrayList(values.materialize()
-				.iterator());
-
-		Assert.assertEquals(keyList.size(), valueList.size());
-		for (int i = 0; i < keyList.size(); i++) {
-			Assert.assertEquals(keyList.get(i), valueList.get(i).toUpperCase());
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/PageRankTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/PageRankTest.java b/src/test/java/com/cloudera/crunch/PageRankTest.java
deleted file mode 100644
index 66ae485..0000000
--- a/src/test/java/com/cloudera/crunch/PageRankTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.impl.mem.MemPipeline;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.lib.Aggregate;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.PType;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.avro.AvroTypeFamily;
-import com.cloudera.crunch.types.avro.Avros;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-import com.cloudera.crunch.util.PTypes;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-public class PageRankTest {
-
-  public static class PageRankData {
-	public float score;
-	public float lastScore;
-	public List<String> urls;
-	
-	public PageRankData() { }
-	
-	public PageRankData(float score, float lastScore, Iterable<String> urls) {
-	  this.score = score;
-	  this.lastScore = lastScore;
-	  this.urls = Lists.newArrayList(urls);
-	}
-	
-	public PageRankData next(float newScore) {
-	  return new PageRankData(newScore, score, urls);
-	}
-	
-	public float propagatedScore() {
-	  return score / urls.size();
-	}
-	
-	@Override
-	public String toString() {
-	  return score + " " + lastScore + " " + urls;
-	}
-  }
-  
-  @Test public void testAvroReflect() throws Exception {
-	PTypeFamily tf = AvroTypeFamily.getInstance();
-	PType<PageRankData> prType = Avros.reflects(PageRankData.class);
-    run(new MRPipeline(PageRankTest.class), prType, tf);	
-  }
-  
-  @Test public void testAvroMReflectInMemory() throws Exception {
-    PTypeFamily tf = AvroTypeFamily.getInstance();
-    PType<PageRankData> prType = Avros.reflects(PageRankData.class);
-    run(MemPipeline.getInstance(), prType, tf);        
-  }
-  
-  @Test public void testAvroJSON() throws Exception {
-	PTypeFamily tf = AvroTypeFamily.getInstance();
-	PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
-    run(new MRPipeline(PageRankTest.class), prType, tf);
-  }
-
-  @Test public void testAvroBSON() throws Exception {
-	PTypeFamily tf = AvroTypeFamily.getInstance();
-	PType<PageRankData> prType = PTypes.smile(PageRankData.class, tf);
-    run(new MRPipeline(PageRankTest.class), prType, tf);
-  }
-  
-  @Test public void testWritablesJSON() throws Exception {
-	PTypeFamily tf = WritableTypeFamily.getInstance();
-	PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
-    run(new MRPipeline(PageRankTest.class), prType, tf);
-  }
-
-  @Test public void testWritablesBSON() throws Exception {
-	PTypeFamily tf = WritableTypeFamily.getInstance();
-	PType<PageRankData> prType = PTypes.smile(PageRankData.class, tf);
-    run(new MRPipeline(PageRankTest.class), prType, tf);
-  }
-  
-  public static PTable<String, PageRankData> pageRank(PTable<String, PageRankData> input, final float d) {
-    PTypeFamily ptf = input.getTypeFamily();
-    PTable<String, Float> outbound = input.parallelDo(
-        new DoFn<Pair<String, PageRankData>, Pair<String, Float>>() {
-          @Override
-          public void process(Pair<String, PageRankData> input, Emitter<Pair<String, Float>> emitter) {
-            PageRankData prd = input.second();
-            for (String link : prd.urls) {
-              emitter.emit(Pair.of(link, prd.propagatedScore()));
-            }
-          }
-        }, ptf.tableOf(ptf.strings(), ptf.floats()));
-    
-    return input.cogroup(outbound).parallelDo(
-        new MapFn<Pair<String, Pair<Collection<PageRankData>, Collection<Float>>>, Pair<String, PageRankData>>() {
-              @Override
-              public Pair<String, PageRankData> map(Pair<String, Pair<Collection<PageRankData>, Collection<Float>>> input) {
-                PageRankData prd = Iterables.getOnlyElement(input.second().first());
-                Collection<Float> propagatedScores = input.second().second();
-                float sum = 0.0f;
-                for (Float s : propagatedScores) {
-                  sum += s;
-                }
-                return Pair.of(input.first(), prd.next(d + (1.0f - d)*sum));
-              }
-            }, input.getPTableType());
-  }
-  
-  public static void run(Pipeline pipeline, PType<PageRankData> prType, PTypeFamily ptf) throws Exception {
-    String urlInput = FileHelper.createTempCopyOf("urls.txt");
-    PTable<String, PageRankData> scores = pipeline.readTextFile(urlInput)
-        .parallelDo(new MapFn<String, Pair<String, String>>() {
-          @Override
-          public Pair<String, String> map(String input) {
-            String[] urls = input.split("\\t");
-            return Pair.of(urls[0], urls[1]);
-          }
-        }, ptf.tableOf(ptf.strings(), ptf.strings()))
-        .groupByKey()
-        .parallelDo(new MapFn<Pair<String, Iterable<String>>, Pair<String, PageRankData>>() {
-              @Override
-              public Pair<String, PageRankData> map(Pair<String, Iterable<String>> input) {
-                return Pair.of(input.first(), new PageRankData(1.0f, 0.0f, input.second()));
-              }
-            }, ptf.tableOf(ptf.strings(), prType));
-    
-    Float delta = 1.0f;
-    while (delta > 0.01) {
-      scores = pageRank(scores, 0.5f);
-      scores.materialize().iterator(); // force the write
-      delta = Iterables.getFirst(Aggregate.max(
-          scores.parallelDo(new MapFn<Pair<String, PageRankData>, Float>() {
-            @Override
-            public Float map(Pair<String, PageRankData> input) {
-              PageRankData prd = input.second();
-              return Math.abs(prd.score - prd.lastScore);
-            }
-          }, ptf.floats())).materialize(), null);
-    }
-    assertEquals(0.0048, delta, 0.001);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/PairTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/PairTest.java b/src/test/java/com/cloudera/crunch/PairTest.java
deleted file mode 100644
index 8216105..0000000
--- a/src/test/java/com/cloudera/crunch/PairTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.junit.Test;
-
-public class PairTest {
-  
-  @Test
-  public void testPairConstructor() {
-    Pair<String, Integer> pair = new Pair<String, Integer>("brock", 45);
-    test(pair);
-  }
-
-  @Test
-  public void testPairOf() {
-    Pair<String, Integer> pair = Pair.of("brock", 45);
-    test(pair);
-  }
-
-  protected void test(Pair<String, Integer> pair) {
-    assertTrue(pair.size() == 2);
-    
-    assertEquals("brock", pair.first());
-    assertEquals(new Integer(45), pair.second());
-    assertEquals(Pair.of("brock", 45), pair);
-    
-    assertEquals("brock", pair.get(0));
-    assertEquals(new Integer(45), pair.get(1));
-
-    try {
-      pair.get(-1);
-      fail();
-    } catch (IndexOutOfBoundsException e) {
-      // expected
-    }
-  }
-  
-  @Test
-  public void testPairComparisons() {
-    assertEquals(0, Pair.of(null, null).compareTo(Pair.of(null, null)));
-    assertEquals(0, Pair.of(1, 2).compareTo(Pair.of(1, 2)));
-    assertTrue(Pair.of(2, "a").compareTo(Pair.of(1, "a")) > 0);
-    assertTrue(Pair.of("a", 2).compareTo(Pair.of("a", 1)) > 0);
-    assertTrue(Pair.of(null, 17).compareTo(Pair.of(null, 29)) < 0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/TFIDFTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/TFIDFTest.java b/src/test/java/com/cloudera/crunch/TFIDFTest.java
deleted file mode 100644
index e75a23e..0000000
--- a/src/test/java/com/cloudera/crunch/TFIDFTest.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch;
-
-import static com.google.common.io.Resources.getResource;
-import static com.google.common.io.Resources.newInputStreamSupplier;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.charset.Charset;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.hadoop.fs.Path;
-import org.junit.Test;
-
-import com.cloudera.crunch.fn.MapKeysFn;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.io.seq.SeqFileSourceTarget;
-import com.cloudera.crunch.lib.Aggregate;
-import com.cloudera.crunch.lib.Join;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-
-@SuppressWarnings("serial")
-public class TFIDFTest implements Serializable {  
-  // total number of documents, should calculate
-  protected static final double N = 2;
-  
-  @Test
-  public void testWritablesSingleRun() throws IOException {
-    run(new MRPipeline(TFIDFTest.class), WritableTypeFamily.getInstance(), true);
-  }
-
-  @Test
-  public void testWritablesMultiRun() throws IOException {
-    run(new MRPipeline(TFIDFTest.class), WritableTypeFamily.getInstance(), false);
-  }
-
-  /**
-   * This method should generate a TF-IDF score for the input.
-   */
-  public PTable<String, Collection<Pair<String, Double>>>  generateTFIDF(PCollection<String> docs,
-      Path termFreqPath, PTypeFamily ptf) throws IOException {    
-    
-    /*
-     * Input: String
-     * Input title  text
-     * 
-     * Output: PTable<Pair<String, String>, Long> 
-     * Pair<Pair<word, title>, count in title>
-     */
-    PTable<Pair<String, String>, Long> tf = Aggregate.count(docs.parallelDo("term document frequency",
-        new DoFn<String, Pair<String, String>>() {
-      @Override
-      public void process(String doc, Emitter<Pair<String, String>> emitter) {
-        String[] kv = doc.split("\t");
-        String title = kv[0];
-        String text = kv[1];
-        for (String word : text.split("\\W+")) {
-          if(word.length() > 0) {
-            Pair<String, String> pair = Pair.of(word.toLowerCase(), title);
-            emitter.emit(pair);
-          }
-        }
-      }
-    }, ptf.pairs(ptf.strings(), ptf.strings())));
-    
-    tf.write(new SeqFileSourceTarget<Pair<Pair<String, String>, Long>>(termFreqPath, tf.getPType()));
-    
-    /*
-     * Input: Pair<Pair<String, String>, Long>
-     * Pair<Pair<word, title>, count in title>
-     * 
-     * Output: PTable<String, Long>
-     * PTable<word, # of docs containing word>
-     */
-    PTable<String, Long> n = Aggregate.count(tf.parallelDo("little n (# of docs contain word)",  
-        new DoFn<Pair<Pair<String, String>, Long>, String>() {
-      @Override
-      public void process(Pair<Pair<String, String>, Long> input,
-          Emitter<String> emitter) {
-        emitter.emit(input.first().first());
-      }
-    }, ptf.strings()));
-    
-    /*
-     * Input: Pair<Pair<String, String>, Long>
-     * Pair<Pair<word, title>, count in title>
-     * 
-     * Output: PTable<String, Pair<String, Long>>
-     * PTable<word, Pair<title, count in title>>
-     */
-    PTable<String, Collection<Pair<String, Long>>> wordDocumentCountPair = tf.parallelDo("transform wordDocumentPairCount",
-        new DoFn<Pair<Pair<String, String>, Long>, Pair<String, Collection<Pair<String, Long>>>>() {
-          Collection<Pair<String, Long>> buffer;
-          String key;
-          @Override
-          public void process(Pair<Pair<String, String>, Long> input,
-        	  Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
-            Pair<String, String> wordDocumentPair = input.first();
-            if(!wordDocumentPair.first().equals(key)) {
-              flush(emitter);
-              key = wordDocumentPair.first();
-              buffer = Lists.newArrayList();
-            }
-            buffer.add(Pair.of(wordDocumentPair.second(), input.second()));            
-          }
-          protected void flush(Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
-            if(buffer != null) {
-              emitter.emit(Pair.of(key, buffer));
-              buffer = null;
-            }
-          }
-          @Override
-          public void cleanup(Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
-            flush(emitter);
-          }
-      }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.longs()))));
-
-    PTable<String, Pair<Long, Collection<Pair<String, Long>>>> joinedResults = Join.join(n, wordDocumentCountPair);
-
-    /*
-     * Input: Pair<String, Pair<Long, Collection<Pair<String, Long>>>
-     * Pair<word, Pair<# of docs containing word, Collection<Pair<title, term frequency>>>
-     * 
-     * Output: Pair<String, Collection<Pair<String, Double>>>
-     * Pair<word, Collection<Pair<title, tfidf>>>
-     */
-    return joinedResults.parallelDo("calculate tfidf",
-        new MapFn<Pair<String, Pair<Long, Collection<Pair<String, Long>>>>, Pair<String, Collection<Pair<String, Double>>>>() {
-          @Override
-          public Pair<String, Collection<Pair<String, Double>>> map(Pair<String, Pair<Long, Collection<Pair<String, Long>>>> input) {
-            Collection<Pair<String, Double>> tfidfs = Lists.newArrayList();
-            String word = input.first();
-            double n = input.second().first();
-            double idf = Math.log(N / n);
-            for(Pair<String, Long> tf : input.second().second()) {
-              double tfidf = tf.second() * idf;
-              tfidfs.add(Pair.of(tf.first(), tfidf));
-            }
-            return Pair.of(word, tfidfs);
-          }
-      
-    }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.doubles()))));
-  }
-  
-  public void run(Pipeline pipeline, PTypeFamily typeFamily, boolean singleRun) throws IOException {
-    File input = File.createTempFile("docs", "txt");
-    input.deleteOnExit();
-    Files.copy(newInputStreamSupplier(getResource("docs.txt")), input);
-    
-    String outputPath1 = getOutput();
-    String outputPath2 = getOutput();
-    
-    Path tfPath = new Path(getOutput("termfreq"));
-    
-    PCollection<String> docs = pipeline.readTextFile(input.getAbsolutePath());
-        
-    PTable<String, Collection<Pair<String, Double>>> results =
-        generateTFIDF(docs, tfPath, typeFamily);
-    pipeline.writeTextFile(results, outputPath1);
-    if (!singleRun) {
-      pipeline.run();
-    }
-    
-    PTable<String, Collection<Pair<String, Double>>> uppercased = results.parallelDo(
-        new MapKeysFn<String, String, Collection<Pair<String, Double>>>() {
-          @Override
-          public String map(String k1) {
-            return k1.toUpperCase();
-          } 
-        }, results.getPTableType());
-    pipeline.writeTextFile(uppercased, outputPath2);
-    pipeline.done();
-    
-    // Check the lowercase version...
-    File outputFile = new File(outputPath1, "part-r-00000");
-    outputFile.deleteOnExit();
-    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
-    boolean passed = false;
-    for (String line : lines) {
-      if (line.startsWith("the") && line.contains("B,0.6931471805599453")) {
-        passed = true;
-        break;
-      }
-    }
-    assertTrue(passed);
-    
-    // ...and the uppercase version
-    outputFile = new File(outputPath2, "part-r-00000");
-    outputFile.deleteOnExit();
-    lines = Files.readLines(outputFile, Charset.defaultCharset());
-    passed = false;
-    for (String line : lines) {
-      if (line.startsWith("THE") && line.contains("B,0.6931471805599453")) {
-        passed = true;
-        break;
-      }
-    }
-    assertTrue(passed);
-  }
-  
-  public static String getOutput() throws IOException {
-    return getOutput("output");
-  }
-  
-  public static String getOutput(String prefix) throws IOException {
-    File output = File.createTempFile(prefix, "");
-    String path = output.getAbsolutePath();
-    output.delete();
-    return path;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/TermFrequencyTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/TermFrequencyTest.java b/src/test/java/com/cloudera/crunch/TermFrequencyTest.java
deleted file mode 100644
index 6e064ce..0000000
--- a/src/test/java/com/cloudera/crunch/TermFrequencyTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.impl.mem.MemPipeline;
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.io.At;
-import com.cloudera.crunch.io.ReadableSourceTarget;
-import com.cloudera.crunch.lib.Aggregate;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-
-@SuppressWarnings("serial")
-public class TermFrequencyTest implements Serializable {  
-  
-  @Test
-  public void testTermFrequencyWithNoTransform() throws IOException {
-    run(new MRPipeline(TermFrequencyTest.class), WritableTypeFamily.getInstance(), false);
-  }
-  
-  @Test
-  public void testTermFrequencyWithTransform() throws IOException {
-    run(new MRPipeline(TermFrequencyTest.class), WritableTypeFamily.getInstance(), true);
-  }
-  
-  @Test
-  public void testTermFrequencyNoTransformInMemory() throws IOException {
-    run(MemPipeline.getInstance(), WritableTypeFamily.getInstance(), false);  
-  }
-
-  @Test
-  public void testTermFrequencyWithTransformInMemory() throws IOException {
-    run(MemPipeline.getInstance(), WritableTypeFamily.getInstance(), true);
-  }
-  
-
-  public void run(Pipeline pipeline, PTypeFamily typeFamily, boolean transformTF) throws IOException {
-    String input = FileHelper.createTempCopyOf("docs.txt");
-    
-    File transformedOutput = FileHelper.createOutputPath();
-    File tfOutput = FileHelper.createOutputPath();
-    
-    PCollection<String> docs = pipeline.readTextFile(input);
-    
-    PTypeFamily ptf = docs.getTypeFamily();
-    
-    /*
-     * Input: String
-     * Input title  text
-     * 
-     * Output: PTable<Pair<String, String>, Long> 
-     * Pair<Pair<word, title>, count in title>
-     */
-    PTable<Pair<String, String>, Long> tf = Aggregate.count(docs.parallelDo("term document frequency",
-        new DoFn<String, Pair<String, String>>() {
-      @Override
-      public void process(String doc, Emitter<Pair<String, String>> emitter) {
-        String[] kv = doc.split("\t");
-        String title = kv[0];
-        String text = kv[1];
-        for (String word : text.split("\\W+")) {
-          if(word.length() > 0) {
-            Pair<String, String> pair = Pair.of(word.toLowerCase(), title);
-            emitter.emit(pair);
-          }
-        }
-      }
-    }, ptf.pairs(ptf.strings(), ptf.strings())));
-    
-    if(transformTF) {
-      /*
-       * Input: Pair<Pair<String, String>, Long>
-       * Pair<Pair<word, title>, count in title>
-       * 
-       * Output: PTable<String, Pair<String, Long>>
-       * PTable<word, Pair<title, count in title>>
-       */
-      PTable<String, Pair<String, Long>> wordDocumentCountPair = tf.parallelDo("transform wordDocumentPairCount",
-          new MapFn<Pair<Pair<String, String>, Long>, Pair<String, Pair<String, Long>>>() {
-            @Override
-            public Pair<String, Pair<String, Long>> map(Pair<Pair<String, String>, Long> input) {
-              Pair<String, String> wordDocumentPair = input.first();            
-              return Pair.of(wordDocumentPair.first(), Pair.of(wordDocumentPair.second(), input.second()));
-            }
-        }, ptf.tableOf(ptf.strings(), ptf.pairs(ptf.strings(), ptf.longs())));
-      
-      pipeline.writeTextFile(wordDocumentCountPair, transformedOutput.getAbsolutePath());
-    }
-    
-    SourceTarget<String> st = At.textFile(tfOutput.getAbsolutePath());
-    pipeline.write(tf, st);
-    
-    pipeline.run();
-    
-    // test the case we should see
-    Iterable<String> lines = ((ReadableSourceTarget<String>) st).read(pipeline.getConfiguration());
-    boolean passed = false;
-    for (String line : lines) {
-      if ("[well,A]\t0".equals(line)) {
-        fail("Found " + line + " but well is in Document A 1 time");
-      }
-      if ("[well,A]\t1".equals(line)) {
-        passed = true;
-      }
-    }
-    assertTrue(passed);
-    pipeline.done();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/TextPairTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/TextPairTest.java b/src/test/java/com/cloudera/crunch/TextPairTest.java
deleted file mode 100644
index 9fa3058..0000000
--- a/src/test/java/com/cloudera/crunch/TextPairTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.io.From;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.writable.Writables;
-
-public class TextPairTest  {
-
-  @Test
-  public void testWritables() throws IOException {
-    run(new MRPipeline(TextPairTest.class));
-  }
-  
-  private static final String CANARY = "Writables.STRING_TO_TEXT";
-  
-  public static PCollection<Pair<String, String>> wordDuplicate(PCollection<String> words) {
-    return words.parallelDo("my word duplicator", new DoFn<String, Pair<String, String>>() {
-      public void process(String line, Emitter<Pair<String, String>> emitter) {
-        for (String word : line.split("\\W+")) {
-          if(word.length() > 0) {
-            Pair<String, String> pair = Pair.of(CANARY, word);
-            emitter.emit(pair);
-          }
-        }
-      }
-    }, Writables.pairs(Writables.strings(), Writables.strings()));
-  }
-  
-  public void run(Pipeline pipeline) throws IOException {
-    String input = FileHelper.createTempCopyOf("shakes.txt");
-        
-    PCollection<String> shakespeare = pipeline.read(From.textFile(input));
-    Iterable<Pair<String, String>> lines = pipeline.materialize(wordDuplicate(shakespeare));    
-    boolean passed = false;
-    for (Pair<String, String> line : lines) {
-      if (line.first().contains(CANARY)) {
-        passed = true;
-        break;
-      }
-    }
-    
-    pipeline.done();
-    assertTrue(passed);
-  }  
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/TupleNClassCastBugTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/TupleNClassCastBugTest.java b/src/test/java/com/cloudera/crunch/TupleNClassCastBugTest.java
deleted file mode 100644
index 22d044e..0000000
--- a/src/test/java/com/cloudera/crunch/TupleNClassCastBugTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch;
-
-import static com.google.common.io.Resources.getResource;
-import static com.google.common.io.Resources.newInputStreamSupplier;
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.avro.AvroTypeFamily;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-import com.google.common.io.Files;
-
-public class TupleNClassCastBugTest {
-
-  public static PCollection<TupleN> mapGroupDo(PCollection<String> lines, PTypeFamily ptf) {
-    PTable<String, TupleN> mapped = lines.parallelDo(new MapFn<String, Pair<String, TupleN>>() {
-
-      @Override
-      public Pair<String, TupleN> map(String line) {
-        String[] columns = line.split("\\t");
-        String docId = columns[0];
-        String docLine = columns[1];
-        return Pair.of(docId, new TupleN(docId, docLine));
-      }
-    }, ptf.tableOf(ptf.strings(), ptf.tuples(ptf.strings(), ptf.strings())));
-    return mapped.groupByKey().parallelDo(new DoFn<Pair<String, Iterable<TupleN>>, TupleN>() {
-      @Override
-      public void process(Pair<String, Iterable<TupleN>> input, Emitter<TupleN> tupleNEmitter) {
-        for (TupleN tuple : input.second()) {
-          tupleNEmitter.emit(tuple);
-        }
-      }
-    }, ptf.tuples(ptf.strings(), ptf.strings()));
-  }
-
-  @Test
-  public void testWritables() throws IOException {
-    run(new MRPipeline(TupleNClassCastBugTest.class), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testAvro() throws IOException {
-    run(new MRPipeline(TupleNClassCastBugTest.class), AvroTypeFamily.getInstance());
-  }
-
-  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-    File input = File.createTempFile("docs", "txt");
-    input.deleteOnExit();
-    Files.copy(newInputStreamSupplier(getResource("docs.txt")), input);
-
-    File output = File.createTempFile("output", "");
-    String outputPath = output.getAbsolutePath();
-    output.delete();
-
-    PCollection<String> docLines = pipeline.readTextFile(input.getAbsolutePath());
-    pipeline.writeTextFile(mapGroupDo(docLines, typeFamily), outputPath);
-    pipeline.done();
-
-    // *** We are not directly testing the output, we are looking for a ClassCastException
-    // *** which is thrown in a different thread during the reduce phase. If all is well
-    // *** the file will exist and have six lines. Otherwise the bug is present.
-    File outputFile = new File(output, "part-r-00000");
-    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
-    int lineCount = 0;
-    for (String line : lines) {
-      lineCount++;
-    }
-    assertEquals(6, lineCount);
-    output.deleteOnExit();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/TupleTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/TupleTest.java b/src/test/java/com/cloudera/crunch/TupleTest.java
deleted file mode 100644
index 2e89f4d..0000000
--- a/src/test/java/com/cloudera/crunch/TupleTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.types.TupleFactory;
-
-public class TupleTest {
-  private String first = "foo";
-  private Integer second = 1729;
-  private Double third = 64.2;
-  private Boolean fourth = false;
-  private Float fifth = 17.29f;
-  
-  @Test
-  public void testTuple3() {
-    Tuple3<String, Integer, Double> t = new Tuple3<String, Integer, Double>(first, second, third);
-    assertEquals(3, t.size());
-    assertEquals(first, t.first());
-    assertEquals(second, t.second());
-    assertEquals(third, t.third());
-    assertEquals(first, t.get(0));
-    assertEquals(second, t.get(1));
-    assertEquals(third, t.get(2));
-    try {
-      t.get(-1);
-      fail();
-    } catch (IndexOutOfBoundsException e) {
-      // expected
-    }
-  }
-  
-  @Test
-  public void testTuple3Equality() {
-    Tuple3<String, Integer, Double> t = new Tuple3<String, Integer, Double>(first, second, third);
-    assertTrue(t.equals(new Tuple3(first, second, third)));
-    assertFalse(t.equals(new Tuple3(first, null, third)));
-    assertFalse((new Tuple3(null, null, null)).equals(t));
-    assertTrue((new Tuple3(first, null, null)).equals(new Tuple3(first, null, null)));
-  }
-  
-  @Test
-  public void testTuple4() {
-    Tuple4<String, Integer, Double, Boolean> t = 
-      new Tuple4<String, Integer, Double, Boolean>(first, second, third, fourth);
-    assertEquals(4, t.size());
-    assertEquals(first, t.first());
-    assertEquals(second, t.second());
-    assertEquals(third, t.third());
-    assertEquals(fourth, t.fourth());
-    assertEquals(first, t.get(0));
-    assertEquals(second, t.get(1));
-    assertEquals(third, t.get(2));
-    assertEquals(fourth, t.get(3));
-    try {
-      t.get(-1);
-      fail();
-    } catch (IndexOutOfBoundsException e) {
-      // expected
-    }
-  }
-
-  @Test
-  public void testTuple4Equality() {
-    Tuple4<String, Integer, Double, Boolean> t = 
-      new Tuple4<String, Integer, Double, Boolean>(first, second, third, fourth);
-    assertFalse(t.equals(new Tuple3(first, second, third)));
-    assertFalse(t.equals(new Tuple4(first, null, third, null)));
-    assertFalse((new Tuple4(null, null, null, null)).equals(t));
-    assertTrue((new Tuple4(first, null, third, null)).equals(
-        new Tuple4(first, null, third, null)));
-  }
-
-  @Test
-  public void testTupleN() {
-    TupleN t = new TupleN(first, second, third, fourth, fifth);
-    assertEquals(5, t.size());
-    assertEquals(first, t.get(0));
-    assertEquals(second, t.get(1));
-    assertEquals(third, t.get(2));
-    assertEquals(fourth, t.get(3));
-    assertEquals(fifth, t.get(4));
-    try {
-      t.get(-1);
-      fail();
-    } catch (IndexOutOfBoundsException e) {
-      // expected
-    }
-  }
-
-  @Test
-  public void testTupleNEquality() {
-	TupleN t = new TupleN(first, second, third, fourth, fifth);
-	assertTrue(t.equals(new TupleN(first, second, third, fourth, fifth)));
-    assertFalse(t.equals(new TupleN(first, null, third, null)));
-    assertFalse((new TupleN(null, null, null, null, null)).equals(t));
-    assertTrue((new TupleN(first, second, third, null, null)).equals(
-        new TupleN(first, second, third, null, null)));
-  }
-
-  @Test
-  public void testTupleFactory() {
-    checkTuple(TupleFactory.PAIR.makeTuple("a", "b"), Pair.class, "a", "b");
-    checkTuple(TupleFactory.TUPLE3.makeTuple("a", "b", "c"), Tuple3.class, "a", "b", "c");
-    checkTuple(TupleFactory.TUPLE4.makeTuple("a", "b", "c", "d"), Tuple4.class, "a", "b", "c", "d");
-    checkTuple(TupleFactory.TUPLEN.makeTuple("a", "b", "c", "d", "e"), TupleN.class, "a", "b", "c", "d", "e");
-
-    checkTuple(TupleFactory.TUPLEN.makeTuple("a", "b"), TupleN.class, "a", "b");
-    checkTuple(TupleFactory.TUPLEN.makeTuple("a", "b", "c"), TupleN.class, "a", "b", "c");
-    checkTuple(TupleFactory.TUPLEN.makeTuple("a", "b", "c", "d"), TupleN.class, "a", "b", "c", "d");
-    checkTuple(TupleFactory.TUPLEN.makeTuple("a", "b", "c", "d", "e"), TupleN.class, "a", "b", "c", "d", "e");
-  }
-
-  private void checkTuple(Tuple t, Class<? extends Tuple> type, Object... values) {
-    assertEquals(type, t.getClass());
-    assertEquals(values.length, t.size());
-    for (int i = 0; i < values.length; i++)
-      assertEquals(values[i], t.get(i));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/WordCountHBaseTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/WordCountHBaseTest.java b/src/test/java/com/cloudera/crunch/WordCountHBaseTest.java
deleted file mode 100644
index 4c357d6..0000000
--- a/src/test/java/com/cloudera/crunch/WordCountHBaseTest.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Random;
-import java.util.jar.JarEntry;
-import java.util.jar.JarOutputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.io.hbase.HBaseSourceTarget;
-import com.cloudera.crunch.io.hbase.HBaseTarget;
-import com.cloudera.crunch.lib.Aggregate;
-import com.cloudera.crunch.types.writable.Writables;
-import com.cloudera.crunch.util.DistCache;
-import com.google.common.io.ByteStreams;
-
-public class WordCountHBaseTest {
-  protected static final Log LOG = LogFactory.getLog(WordCountHBaseTest.class);
-
-  private static final byte[] COUNTS_COLFAM = Bytes.toBytes("cf");
-  private static final byte[] WORD_COLFAM = Bytes.toBytes("cf");
-
-  private HBaseTestingUtility hbaseTestUtil = new HBaseTestingUtility();
-  
-  @SuppressWarnings("serial")
-  public static PCollection<Put> wordCount(PTable<ImmutableBytesWritable, Result> words) {
-    PTable<String, Long> counts = Aggregate.count(words.parallelDo(
-        new DoFn<Pair<ImmutableBytesWritable, Result>, String>() {
-          @Override
-          public void process(Pair<ImmutableBytesWritable, Result> row, Emitter<String> emitter) {
-            byte[] word = row.second().getValue(WORD_COLFAM, null);
-            if (word != null) {
-              emitter.emit(Bytes.toString(word));
-            }
-          }
-        }, words.getTypeFamily().strings()));
-
-    return counts.parallelDo("convert to put",
-        new DoFn<Pair<String, Long>, Put>() {
-          @Override
-          public void process(Pair<String, Long> input, Emitter<Put> emitter) {
-            Put put = new Put(Bytes.toBytes(input.first()));
-            put.add(COUNTS_COLFAM, null,
-                Bytes.toBytes(input.second()));
-            emitter.emit(put);
-          }
-
-        }, Writables.writables(Put.class));
-  }
-
-  @SuppressWarnings("deprecation")
-  @Before
-  public void setUp() throws Exception {
-    Configuration conf = hbaseTestUtil.getConfiguration();
-    File tmpDir = File.createTempFile("logdir", "");
-    tmpDir.delete();
-    tmpDir.mkdir();
-    tmpDir.deleteOnExit();
-    conf.set("hadoop.log.dir", tmpDir.getAbsolutePath());
-    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
-    conf.setInt("hbase.master.info.port", -1);
-    conf.setInt("hbase.regionserver.info.port", -1);
-
-    hbaseTestUtil.startMiniZKCluster();
-    hbaseTestUtil.startMiniCluster();
-    hbaseTestUtil.startMiniMapReduceCluster(1);
-    
-    // For Hadoop-2.0.0, we have to do a bit more work.
-    if (TaskAttemptContext.class.isInterface()) {
-      conf = hbaseTestUtil.getConfiguration();
-      FileSystem fs = FileSystem.get(conf);
-      Path tmpPath = new Path("target", "WordCountHBaseTest-tmpDir");
-      FileSystem localFS = FileSystem.getLocal(conf);
-      for (FileStatus jarFile : localFS.listStatus(new Path("target/lib/"))) {
-        Path target = new Path(tmpPath, jarFile.getPath().getName());
-        fs.copyFromLocalFile(jarFile.getPath(), target);
-        DistributedCache.addFileToClassPath(target, conf, fs);
-      }
-    
-      // Create a programmatic container for this jar.
-      JarOutputStream jos = new JarOutputStream(new FileOutputStream("WordCountHBaseTest.jar"));
-      File baseDir = new File("target/test-classes");
-      jarUp(jos, baseDir, "com/cloudera/crunch/WordCountHBaseTest.class");
-      jarUp(jos, baseDir, "com/cloudera/crunch/WordCountHBaseTest$1.class");
-      jarUp(jos, baseDir, "com/cloudera/crunch/WordCountHBaseTest$2.class");
-      jos.close();
-
-      Path target = new Path(tmpPath, "WordCountHBaseTest.jar");
-      fs.copyFromLocalFile(true, new Path("WordCountHBaseTest.jar"), target);
-      DistributedCache.addFileToClassPath(target, conf, fs);
-    }
-  }
-  
-  private void jarUp(JarOutputStream jos, File baseDir, String classDir) throws IOException {
-    File file = new File(baseDir, classDir);
-    JarEntry e = new JarEntry(classDir);
-    e.setTime(file.lastModified());
-    jos.putNextEntry(e);
-    ByteStreams.copy(new FileInputStream(file), jos);
-    jos.closeEntry();
-  }
-  
-  @Test
-  public void testWordCount() throws IOException {
-    run(new MRPipeline(WordCountHBaseTest.class, hbaseTestUtil.getConfiguration()));
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    hbaseTestUtil.shutdownMiniMapReduceCluster();
-    hbaseTestUtil.shutdownMiniCluster();
-    hbaseTestUtil.shutdownMiniZKCluster();
-  }
-  
-  public void run(Pipeline pipeline) throws IOException {
-    
-    Random rand = new Random();
-    int postFix = Math.abs(rand.nextInt());
-    String inputTableName = "crunch_words_" + postFix;
-    String outputTableName = "crunch_counts_" + postFix;
-
-    try {
-      
-      HTable inputTable = hbaseTestUtil.createTable(Bytes.toBytes(inputTableName),
-          WORD_COLFAM);
-      HTable outputTable = hbaseTestUtil.createTable(Bytes.toBytes(outputTableName),
-          COUNTS_COLFAM);
-  
-      int key = 0;
-      key = put(inputTable, key, "cat");
-      key = put(inputTable, key, "cat");
-      key = put(inputTable, key, "dog");
-      Scan scan = new Scan();
-      scan.addColumn(WORD_COLFAM, null);
-      HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan);
-      PTable<ImmutableBytesWritable, Result> shakespeare = pipeline.read(source);
-      pipeline.write(wordCount(shakespeare), new HBaseTarget(outputTableName));
-      pipeline.done();
-      
-      assertIsLong(outputTable, "cat", 2);
-      assertIsLong(outputTable, "dog", 1);    
-    } finally {
-      // not quite sure...
-    }
-  }
-  
-  protected int put(HTable table, int key, String value) throws IOException {
-    Put put = new Put(Bytes.toBytes(key));
-    put.add(WORD_COLFAM, null, Bytes.toBytes(value));    
-    table.put(put);
-    return key + 1;
-  }
-  
-  protected void assertIsLong(HTable table, String key, long i) throws IOException {
-    Get get = new Get(Bytes.toBytes(key));
-    get.addColumn(COUNTS_COLFAM, null);
-    Result result = table.get(get);
-    
-    byte[] rawCount = result.getValue(COUNTS_COLFAM, null);
-    assertTrue(rawCount != null);
-    assertEquals(new Long(i), new Long(Bytes.toLong(rawCount)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/WordCountTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/WordCountTest.java b/src/test/java/com/cloudera/crunch/WordCountTest.java
deleted file mode 100644
index a0c4abb..0000000
--- a/src/test/java/com/cloudera/crunch/WordCountTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.impl.mr.MRPipeline;
-import com.cloudera.crunch.io.At;
-import com.cloudera.crunch.io.To;
-import com.cloudera.crunch.lib.Aggregate;
-import com.cloudera.crunch.test.FileHelper;
-import com.cloudera.crunch.types.PTypeFamily;
-import com.cloudera.crunch.types.avro.AvroTypeFamily;
-import com.cloudera.crunch.types.writable.WritableTypeFamily;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-
-public class WordCountTest {
-
-  enum WordCountStats {
-    ANDS
-  };
-
-  public static PTable<String, Long> wordCount(PCollection<String> words, PTypeFamily typeFamily) {
-    return Aggregate.count(words.parallelDo(new DoFn<String, String>() {
-
-      @Override
-      public void process(String line, Emitter<String> emitter) {
-        for (String word : line.split("\\s+")) {
-          emitter.emit(word);
-          if ("and".equals(word)) {
-            increment(WordCountStats.ANDS);
-          }
-        }
-      }
-    }, typeFamily.strings()));
-  }
-
-  public static PTable<String, Long> substr(PTable<String, Long> ptable) {
-    return ptable.parallelDo(new DoFn<Pair<String, Long>, Pair<String, Long>>() {
-
-      public void process(Pair<String, Long> input, Emitter<Pair<String, Long>> emitter) {
-        if (input.first().length() > 0) {
-          emitter.emit(Pair.of(input.first().substring(0, 1), input.second()));
-        }
-      }
-    }, ptable.getPTableType());
-  }
-
-  private boolean runSecond = false;
-  private boolean useToOutput = false;
-
-  @Test
-  public void testWritables() throws IOException {
-    run(new MRPipeline(WordCountTest.class), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testWritablesWithSecond() throws IOException {
-    runSecond = true;
-    run(new MRPipeline(WordCountTest.class), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testWritablesWithSecondUseToOutput() throws IOException {
-    runSecond = true;
-    useToOutput = true;
-    run(new MRPipeline(WordCountTest.class), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testAvro() throws IOException {
-    run(new MRPipeline(WordCountTest.class), AvroTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testAvroWithSecond() throws IOException {
-    runSecond = true;
-    run(new MRPipeline(WordCountTest.class), AvroTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testWithTopWritable() throws IOException {
-    runWithTop(WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testWithTopAvro() throws IOException {
-    runWithTop(AvroTypeFamily.getInstance());
-  }
-
-  public static void runWithTop(PTypeFamily tf) throws IOException {
-    Pipeline pipeline = new MRPipeline(WordCountTest.class);
-    String inputPath = FileHelper.createTempCopyOf("shakes.txt");
-
-    PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, tf.strings()));
-    PTable<String, Long> wordCount = wordCount(shakespeare, tf);
-    List<Pair<String, Long>> top5 = Lists.newArrayList(Aggregate.top(wordCount, 5, true)
-        .materialize());
-    assertEquals(
-        ImmutableList.of(Pair.of("", 1470L), Pair.of("the", 620L), Pair.of("and", 427L),
-            Pair.of("of", 396L), Pair.of("to", 367L)), top5);
-  }
-
-  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-    String inputPath = FileHelper.createTempCopyOf("shakes.txt");
-    File output = FileHelper.createOutputPath();
-    String outputPath = output.getAbsolutePath();
-
-    PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, typeFamily.strings()));
-    PTable<String, Long> wordCount = wordCount(shakespeare, typeFamily);
-    if (useToOutput) {
-      wordCount.write(To.textFile(outputPath));
-    } else {
-      pipeline.writeTextFile(wordCount, outputPath);
-    }
-
-    if (runSecond) {
-      File substrCount = File.createTempFile("substr", "");
-      String substrPath = substrCount.getAbsolutePath();
-      substrCount.delete();
-      PTable<String, Long> we = substr(wordCount).groupByKey().combineValues(
-          CombineFn.<String> SUM_LONGS());
-      pipeline.writeTextFile(we, substrPath);
-    }
-    PipelineResult res = pipeline.done();
-    assertTrue(res.succeeded());
-    List<PipelineResult.StageResult> stageResults = res.getStageResults();
-    if (runSecond) {
-      assertEquals(2, stageResults.size());
-    } else {
-      assertEquals(1, stageResults.size());
-      assertEquals(427, stageResults.get(0).getCounterValue(WordCountStats.ANDS));
-    }
-
-    File outputFile = new File(outputPath, "part-r-00000");
-    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
-    boolean passed = false;
-    for (String line : lines) {
-      if (line.startsWith("Macbeth\t28")) {
-        passed = true;
-        break;
-      }
-    }
-    assertTrue(passed);
-    output.deleteOnExit();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/fn/ExtractKeyFnTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/fn/ExtractKeyFnTest.java b/src/test/java/com/cloudera/crunch/fn/ExtractKeyFnTest.java
deleted file mode 100644
index 755209f..0000000
--- a/src/test/java/com/cloudera/crunch/fn/ExtractKeyFnTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.fn;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.MapFn;
-import com.cloudera.crunch.Pair;
-
-@SuppressWarnings("serial")
-public class ExtractKeyFnTest {
-
-	protected static final MapFn<String, Integer> mapFn = new MapFn<String, Integer>() {
-		@Override
-		public Integer map(String input) {
-			return input.hashCode();
-		}
-	};
-
-	protected static final ExtractKeyFn<Integer, String> one = new ExtractKeyFn<Integer, String>(
-			mapFn);
-
-	@Test
-	public void test() {
-		StoreLastEmitter<Pair<Integer, String>> emitter = StoreLastEmitter.create();
-		one.process("boza", emitter);
-		assertEquals(Pair.of("boza".hashCode(), "boza"), emitter.getLast());
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/fn/MapKeysTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/fn/MapKeysTest.java b/src/test/java/com/cloudera/crunch/fn/MapKeysTest.java
deleted file mode 100644
index 96da50e..0000000
--- a/src/test/java/com/cloudera/crunch/fn/MapKeysTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.fn;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.Pair;
-
-
-@SuppressWarnings("serial")
-public class MapKeysTest {
-  
-  protected static final MapKeysFn<String, Integer, Integer> one = new MapKeysFn<String, Integer, Integer>() {
-    @Override
-    public Integer map(String input) {
-      return 1;
-    }
-  };
-  
-  protected static final MapKeysFn<String, Integer, Integer> two = new MapKeysFn<String, Integer, Integer>() {
-    @Override
-    public Integer map(String input) {
-      return 2;
-    }
-  };
-  
-  @Test
-  public void test() {
-    StoreLastEmitter<Pair<Integer, Integer>> emitter = StoreLastEmitter.create();
-    one.process(Pair.of("k", Integer.MAX_VALUE), emitter);
-    assertEquals(Pair.of(1, Integer.MAX_VALUE), emitter.getLast());
-    two.process(Pair.of("k", Integer.MAX_VALUE), emitter);
-    assertEquals(Pair.of(2, Integer.MAX_VALUE), emitter.getLast());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/fn/MapValuesTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/fn/MapValuesTest.java b/src/test/java/com/cloudera/crunch/fn/MapValuesTest.java
deleted file mode 100644
index 696e965..0000000
--- a/src/test/java/com/cloudera/crunch/fn/MapValuesTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.fn;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.Pair;
-
-@SuppressWarnings("serial")
-public class MapValuesTest {
-  
-  static final MapValuesFn<String, String, Integer> one = new MapValuesFn<String, String, Integer>() {
-    @Override
-    public Integer map(String input) {
-      return 1;
-    }
-  };
-  
-  static final MapValuesFn<String, String, Integer> two = new MapValuesFn<String, String, Integer>() {
-    @Override
-    public Integer map(String input) {
-      return 2;
-    }
-  };
-  
-  @Test
-  public void test() {
-    StoreLastEmitter<Pair<String, Integer>> emitter = StoreLastEmitter.create();
-    one.process(Pair.of("k", "v"), emitter);
-    assertEquals(Pair.of("k", 1), emitter.getLast());
-    two.process(Pair.of("k", "v"), emitter);
-    assertEquals(Pair.of("k", 2), emitter.getLast());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/fn/PairMapTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/fn/PairMapTest.java b/src/test/java/com/cloudera/crunch/fn/PairMapTest.java
deleted file mode 100644
index 2f57ee6..0000000
--- a/src/test/java/com/cloudera/crunch/fn/PairMapTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.fn;
-
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.MapFn;
-import com.cloudera.crunch.Pair;
-
-@SuppressWarnings("serial")
-public class PairMapTest {
-  
-  static final MapFn<String, Integer> one = new MapFn<String, Integer>() {
-    @Override
-    public Integer map(String input) {
-      return 1;
-    }
-  };
-  
-  static final MapFn<String, Integer> two = new MapFn<String, Integer>() {
-    @Override
-    public Integer map(String input) {
-      return 2;
-    }
-  };
-  
-  @Test
-  public void testPairMap() {
-    StoreLastEmitter<Pair<Integer, Integer>> emitter = StoreLastEmitter.create();
-    PairMapFn<String, String, Integer, Integer> fn = new PairMapFn<String, String, Integer, Integer>(one, two);
-    fn.process(Pair.of("a", "b"), emitter);
-    Pair<Integer, Integer> pair = emitter.getLast();
-    assertTrue(pair.first() == 1);
-    assertTrue(pair.second() == 2);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/fn/StoreLastEmitter.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/fn/StoreLastEmitter.java b/src/test/java/com/cloudera/crunch/fn/StoreLastEmitter.java
deleted file mode 100644
index cac04f9..0000000
--- a/src/test/java/com/cloudera/crunch/fn/StoreLastEmitter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.fn;
-
-import com.cloudera.crunch.Emitter;
-
-class StoreLastEmitter<T> implements Emitter<T> {
-  private T last;
-
-  @Override
-  public void emit(T emitted) {
-    last = emitted;
-  }
-  
-  public T getLast() {
-    return last;
-  }
-  
-  @Override
-  public void flush() {
-  }
-  
-  public static <T> StoreLastEmitter<T> create() {
-    return new StoreLastEmitter<T>();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/impl/mem/MemPipelineFileWritingTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/impl/mem/MemPipelineFileWritingTest.java b/src/test/java/com/cloudera/crunch/impl/mem/MemPipelineFileWritingTest.java
deleted file mode 100644
index 9051fc1..0000000
--- a/src/test/java/com/cloudera/crunch/impl/mem/MemPipelineFileWritingTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.crunch.impl.mem;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.util.List;
-
-import com.cloudera.crunch.PCollection;
-import com.cloudera.crunch.Pipeline;
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-import com.google.common.io.Files;
-
-import org.junit.Test;
-
-public class MemPipelineFileWritingTest {
-  @Test
-  public void testMemPipelineFileWriter() throws Exception {
-    File tmpDir = Files.createTempDir();
-    tmpDir.delete();
-    Pipeline p = MemPipeline.getInstance();
-    PCollection<String> lines = MemPipeline.collectionOf("hello", "world");
-    p.writeTextFile(lines, tmpDir.getAbsolutePath());
-    p.done();
-    assertTrue(tmpDir.exists());
-    File[] files = tmpDir.listFiles();
-    assertTrue(files != null && files.length > 0);
-    for (File f : files) {
-      if (!f.getName().startsWith(".")) {
-        List<String> txt = Files.readLines(f, Charsets.UTF_8);
-        assertEquals(ImmutableList.of("hello", "world"), txt);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java b/src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java
deleted file mode 100644
index f265460..0000000
--- a/src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package com.cloudera.crunch.impl.mr;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import com.cloudera.crunch.SourceTarget;
-import com.cloudera.crunch.impl.mr.collect.PCollectionImpl;
-import com.cloudera.crunch.io.ReadableSourceTarget;
-import com.cloudera.crunch.types.avro.Avros;
-
-public class MRPipelineTest {
-
-  private MRPipeline pipeline;
-
-  @Before
-  public void setUp() throws IOException {
-    pipeline = spy(new MRPipeline(MRPipelineTest.class));
-  }
-
-  @Test
-  public void testGetMaterializeSourceTarget_AlreadyMaterialized() {
-    PCollectionImpl<String> materializedPcollection = mock(PCollectionImpl.class);
-    ReadableSourceTarget<String> readableSourceTarget = mock(ReadableSourceTarget.class);
-    when(materializedPcollection.getMaterializedAt()).thenReturn(readableSourceTarget);
-
-    assertEquals(readableSourceTarget, pipeline.getMaterializeSourceTarget(materializedPcollection));
-  }
-
-  @Test
-  public void testGetMaterializeSourceTarget_NotMaterialized_HasOutput() {
-
-    PCollectionImpl<String> pcollection = mock(PCollectionImpl.class);
-    ReadableSourceTarget<String> readableSourceTarget = mock(ReadableSourceTarget.class);
-    when(pcollection.getPType()).thenReturn(Avros.strings());
-    doReturn(readableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
-    when(pcollection.getMaterializedAt()).thenReturn(null);
-
-    assertEquals(readableSourceTarget, pipeline.getMaterializeSourceTarget(pcollection));
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testGetMaterializeSourceTarget_NotMaterialized_NotReadableSourceTarget() {
-    PCollectionImpl<String> pcollection = mock(PCollectionImpl.class);
-    SourceTarget<String> nonReadableSourceTarget = mock(SourceTarget.class);
-    when(pcollection.getPType()).thenReturn(Avros.strings());
-    doReturn(nonReadableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
-    when(pcollection.getMaterializedAt()).thenReturn(null);
-
-    pipeline.getMaterializeSourceTarget(pcollection);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/impl/mr/collect/DoCollectionImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/impl/mr/collect/DoCollectionImplTest.java b/src/test/java/com/cloudera/crunch/impl/mr/collect/DoCollectionImplTest.java
deleted file mode 100644
index 209810e..0000000
--- a/src/test/java/com/cloudera/crunch/impl/mr/collect/DoCollectionImplTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-package com.cloudera.crunch.impl.mr.collect;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.List;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.impl.mr.plan.DoNode;
-import com.cloudera.crunch.types.PType;
-import com.cloudera.crunch.types.writable.Writables;
-
-public class DoCollectionImplTest {
-  
-  
-
-  @Test
-  public void testGetSizeInternal_NoScaleFactor() {
-    runScaleTest(100L, 1.0f, 100L);
-  }
-  
-  @Test
-  public void testGetSizeInternal_ScaleFactorBelowZero() {
-    runScaleTest(100L, 0.5f, 50L);
-  }
-  
-  @Test
-  public void testGetSizeInternal_ScaleFactorAboveZero() {
-    runScaleTest(100L, 1.5f, 150L);
-  }
-  
-  private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize){
-    PCollectionImpl<String> parentCollection = new SizedPCollectionImpl(
-        "Sized collection", inputSize);
-    
-    DoCollectionImpl<String> doCollectionImpl = new DoCollectionImpl<String>(
-        "Scaled collection", parentCollection, new ScaledFunction(scaleFactor),
-        Writables.strings());
-
-    assertEquals(expectedScaledSize, doCollectionImpl.getSizeInternal()); 
-  }
-  
-
-  static class ScaledFunction extends DoFn<String, String>{
-    
-    private float scaleFactor;
-
-    public ScaledFunction(float scaleFactor){
-      this.scaleFactor = scaleFactor;
-    }
-
-    @Override
-    public void process(String input, Emitter<String> emitter) {
-      emitter.emit(input);
-    }
-    
-    @Override
-    public float scaleFactor() {
-      return scaleFactor;
-    }
-    
-  }
-
-  static class SizedPCollectionImpl extends PCollectionImpl<String> {
-
-    private long internalSize;
-
-    public SizedPCollectionImpl(String name, long internalSize) {
-      super(name);
-      this.internalSize = internalSize;
-    }
-
-    @Override
-    public PType getPType() {
-      return null;
-    }
-
-    @Override
-    public DoNode createDoNode() {
-      return null;
-    }
-
-    @Override
-    public List getParents() {
-      return null;
-    }
-
-    @Override
-    protected void acceptInternal(Visitor visitor) {
-    }
-
-    @Override
-    protected long getSizeInternal() {
-      return internalSize;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/test/java/com/cloudera/crunch/impl/mr/collect/DoTableImplTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/com/cloudera/crunch/impl/mr/collect/DoTableImplTest.java b/src/test/java/com/cloudera/crunch/impl/mr/collect/DoTableImplTest.java
deleted file mode 100644
index c619cb4..0000000
--- a/src/test/java/com/cloudera/crunch/impl/mr/collect/DoTableImplTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package com.cloudera.crunch.impl.mr.collect;
-
-import static com.cloudera.crunch.types.writable.Writables.strings;
-import static com.cloudera.crunch.types.writable.Writables.tableOf;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import org.junit.Test;
-
-import com.cloudera.crunch.DoFn;
-import com.cloudera.crunch.Emitter;
-import com.cloudera.crunch.Pair;
-
-public class DoTableImplTest {
-
-	@Test
-	public void testGetSizeInternal_NoScaleFactor() {
-		runScaleTest(100L, 1.0f, 100L);
-	}
-
-	@Test
-	public void testGetSizeInternal_ScaleFactorBelowZero() {
-		runScaleTest(100L, 0.5f, 50L);
-	}
-
-	@Test
-	public void testGetSizeInternal_ScaleFactorAboveZero() {
-		runScaleTest(100L, 1.5f, 150L);
-	}
-
-	private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize) {
-		
-		@SuppressWarnings("unchecked")
-		PCollectionImpl<String> parentCollection = (PCollectionImpl<String>) mock(PCollectionImpl.class);
-		
-		when(parentCollection.getSize()).thenReturn(inputSize);
-
-		DoTableImpl<String, String> doTableImpl = new DoTableImpl<String, String>("Scalled table collection",
-				parentCollection, new TableScaledFunction(scaleFactor), tableOf(strings(),
-						strings()));
-
-		assertEquals(expectedScaledSize, doTableImpl.getSizeInternal());
-		
-		verify(parentCollection).getSize();
-		
-		verifyNoMoreInteractions(parentCollection);
-	}
-
-	static class TableScaledFunction extends DoFn<String, Pair<String, String>> {
-
-		private float scaleFactor;
-
-		public TableScaledFunction(float scaleFactor) {
-			this.scaleFactor = scaleFactor;
-		}
-
-		@Override
-		public float scaleFactor() {
-			return scaleFactor;
-		}
-
-		@Override
-		public void process(String input, Emitter<Pair<String, String>> emitter) {
-			emitter.emit(Pair.of(input, input));
-
-		}
-	}
-}


Mime
View raw message