incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [12/33] CRUNCH-8: Moving the code into multiple Maven modules. Contributed by Matthias Friedrich
Date Wed, 11 Jul 2012 05:14:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/WordCountTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/WordCountTest.java b/crunch/src/test/java/org/apache/crunch/WordCountTest.java
new file mode 100644
index 0000000..01a81e9
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/WordCountTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.To;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+public class WordCountTest {
+
+  enum WordCountStats {
+    ANDS
+  };
+
+  public static PTable<String, Long> wordCount(PCollection<String> words, PTypeFamily typeFamily) {
+    return Aggregate.count(words.parallelDo(new DoFn<String, String>() {
+
+      @Override
+      public void process(String line, Emitter<String> emitter) {
+        for (String word : line.split("\\s+")) {
+          emitter.emit(word);
+          if ("and".equals(word)) {
+            increment(WordCountStats.ANDS);
+          }
+        }
+      }
+    }, typeFamily.strings()));
+  }
+
+  public static PTable<String, Long> substr(PTable<String, Long> ptable) {
+    return ptable.parallelDo(new DoFn<Pair<String, Long>, Pair<String, Long>>() {
+
+      public void process(Pair<String, Long> input, Emitter<Pair<String, Long>> emitter) {
+        if (input.first().length() > 0) {
+          emitter.emit(Pair.of(input.first().substring(0, 1), input.second()));
+        }
+      }
+    }, ptable.getPTableType());
+  }
+
+  private boolean runSecond = false;
+  private boolean useToOutput = false;
+
+  @Test
+  public void testWritables() throws IOException {
+    run(new MRPipeline(WordCountTest.class), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testWritablesWithSecond() throws IOException {
+    runSecond = true;
+    run(new MRPipeline(WordCountTest.class), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testWritablesWithSecondUseToOutput() throws IOException {
+    runSecond = true;
+    useToOutput = true;
+    run(new MRPipeline(WordCountTest.class), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testAvro() throws IOException {
+    run(new MRPipeline(WordCountTest.class), AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testAvroWithSecond() throws IOException {
+    runSecond = true;
+    run(new MRPipeline(WordCountTest.class), AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testWithTopWritable() throws IOException {
+    runWithTop(WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testWithTopAvro() throws IOException {
+    runWithTop(AvroTypeFamily.getInstance());
+  }
+
+  public static void runWithTop(PTypeFamily tf) throws IOException {
+    Pipeline pipeline = new MRPipeline(WordCountTest.class);
+    String inputPath = FileHelper.createTempCopyOf("shakes.txt");
+
+    PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, tf.strings()));
+    PTable<String, Long> wordCount = wordCount(shakespeare, tf);
+    List<Pair<String, Long>> top5 = Lists.newArrayList(Aggregate.top(wordCount, 5, true)
+        .materialize());
+    assertEquals(
+        ImmutableList.of(Pair.of("", 1470L), Pair.of("the", 620L), Pair.of("and", 427L),
+            Pair.of("of", 396L), Pair.of("to", 367L)), top5);
+  }
+
+  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+    String inputPath = FileHelper.createTempCopyOf("shakes.txt");
+    File output = FileHelper.createOutputPath();
+    String outputPath = output.getAbsolutePath();
+
+    PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, typeFamily.strings()));
+    PTable<String, Long> wordCount = wordCount(shakespeare, typeFamily);
+    if (useToOutput) {
+      wordCount.write(To.textFile(outputPath));
+    } else {
+      pipeline.writeTextFile(wordCount, outputPath);
+    }
+
+    if (runSecond) {
+      File substrCount = File.createTempFile("substr", "");
+      String substrPath = substrCount.getAbsolutePath();
+      substrCount.delete();
+      PTable<String, Long> we = substr(wordCount).groupByKey().combineValues(
+          CombineFn.<String> SUM_LONGS());
+      pipeline.writeTextFile(we, substrPath);
+    }
+    PipelineResult res = pipeline.done();
+    assertTrue(res.succeeded());
+    List<PipelineResult.StageResult> stageResults = res.getStageResults();
+    if (runSecond) {
+      assertEquals(2, stageResults.size());
+    } else {
+      assertEquals(1, stageResults.size());
+      assertEquals(427, stageResults.get(0).getCounterValue(WordCountStats.ANDS));
+    }
+
+    File outputFile = new File(outputPath, "part-r-00000");
+    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
+    boolean passed = false;
+    for (String line : lines) {
+      if (line.startsWith("Macbeth\t28")) {
+        passed = true;
+        break;
+      }
+    }
+    assertTrue(passed);
+    output.deleteOnExit();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/fn/ExtractKeyFnTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/fn/ExtractKeyFnTest.java b/crunch/src/test/java/org/apache/crunch/fn/ExtractKeyFnTest.java
new file mode 100644
index 0000000..205809e
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/fn/ExtractKeyFnTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+
+@SuppressWarnings("serial")
+public class ExtractKeyFnTest {
+
+	protected static final MapFn<String, Integer> mapFn = new MapFn<String, Integer>() {
+		@Override
+		public Integer map(String input) {
+			return input.hashCode();
+		}
+	};
+
+	protected static final ExtractKeyFn<Integer, String> one = new ExtractKeyFn<Integer, String>(
+			mapFn);
+
+	@Test
+	public void test() {
+		StoreLastEmitter<Pair<Integer, String>> emitter = StoreLastEmitter.create();
+		one.process("boza", emitter);
+		assertEquals(Pair.of("boza".hashCode(), "boza"), emitter.getLast());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/fn/MapKeysTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/fn/MapKeysTest.java b/crunch/src/test/java/org/apache/crunch/fn/MapKeysTest.java
new file mode 100644
index 0000000..323f276
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/fn/MapKeysTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+import org.apache.crunch.Pair;
+
+
+@SuppressWarnings("serial")
+public class MapKeysTest {
+  
+  protected static final MapKeysFn<String, Integer, Integer> one = new MapKeysFn<String, Integer, Integer>() {
+    @Override
+    public Integer map(String input) {
+      return 1;
+    }
+  };
+  
+  protected static final MapKeysFn<String, Integer, Integer> two = new MapKeysFn<String, Integer, Integer>() {
+    @Override
+    public Integer map(String input) {
+      return 2;
+    }
+  };
+  
+  @Test
+  public void test() {
+    StoreLastEmitter<Pair<Integer, Integer>> emitter = StoreLastEmitter.create();
+    one.process(Pair.of("k", Integer.MAX_VALUE), emitter);
+    assertEquals(Pair.of(1, Integer.MAX_VALUE), emitter.getLast());
+    two.process(Pair.of("k", Integer.MAX_VALUE), emitter);
+    assertEquals(Pair.of(2, Integer.MAX_VALUE), emitter.getLast());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/fn/MapValuesTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/fn/MapValuesTest.java b/crunch/src/test/java/org/apache/crunch/fn/MapValuesTest.java
new file mode 100644
index 0000000..cf971a1
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/fn/MapValuesTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+import org.apache.crunch.Pair;
+
+@SuppressWarnings("serial")
+public class MapValuesTest {
+  
+  static final MapValuesFn<String, String, Integer> one = new MapValuesFn<String, String, Integer>() {
+    @Override
+    public Integer map(String input) {
+      return 1;
+    }
+  };
+  
+  static final MapValuesFn<String, String, Integer> two = new MapValuesFn<String, String, Integer>() {
+    @Override
+    public Integer map(String input) {
+      return 2;
+    }
+  };
+  
+  @Test
+  public void test() {
+    StoreLastEmitter<Pair<String, Integer>> emitter = StoreLastEmitter.create();
+    one.process(Pair.of("k", "v"), emitter);
+    assertEquals(Pair.of("k", 1), emitter.getLast());
+    two.process(Pair.of("k", "v"), emitter);
+    assertEquals(Pair.of("k", 2), emitter.getLast());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/fn/PairMapTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/fn/PairMapTest.java b/crunch/src/test/java/org/apache/crunch/fn/PairMapTest.java
new file mode 100644
index 0000000..28e2459
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/fn/PairMapTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+
+@SuppressWarnings("serial")
+public class PairMapTest {
+  
+  static final MapFn<String, Integer> one = new MapFn<String, Integer>() {
+    @Override
+    public Integer map(String input) {
+      return 1;
+    }
+  };
+  
+  static final MapFn<String, Integer> two = new MapFn<String, Integer>() {
+    @Override
+    public Integer map(String input) {
+      return 2;
+    }
+  };
+  
+  @Test
+  public void testPairMap() {
+    StoreLastEmitter<Pair<Integer, Integer>> emitter = StoreLastEmitter.create();
+    PairMapFn<String, String, Integer, Integer> fn = new PairMapFn<String, String, Integer, Integer>(one, two);
+    fn.process(Pair.of("a", "b"), emitter);
+    Pair<Integer, Integer> pair = emitter.getLast();
+    assertTrue(pair.first() == 1);
+    assertTrue(pair.second() == 2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/fn/StoreLastEmitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/fn/StoreLastEmitter.java b/crunch/src/test/java/org/apache/crunch/fn/StoreLastEmitter.java
new file mode 100644
index 0000000..2632957
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/fn/StoreLastEmitter.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.Emitter;
+
+class StoreLastEmitter<T> implements Emitter<T> {
+  private T last;
+
+  @Override
+  public void emit(T emitted) {
+    last = emitted;
+  }
+  
+  public T getLast() {
+    return last;
+  }
+  
+  @Override
+  public void flush() {
+  }
+  
+  public static <T> StoreLastEmitter<T> create() {
+    return new StoreLastEmitter<T>();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/impl/mem/MemPipelineFileWritingTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mem/MemPipelineFileWritingTest.java b/crunch/src/test/java/org/apache/crunch/impl/mem/MemPipelineFileWritingTest.java
new file mode 100644
index 0000000..442a563
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/impl/mem/MemPipelineFileWritingTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+
+import org.junit.Test;
+
+public class MemPipelineFileWritingTest {
+  @Test
+  public void testMemPipelineFileWriter() throws Exception {
+    File tmpDir = Files.createTempDir();
+    tmpDir.delete();
+    Pipeline p = MemPipeline.getInstance();
+    PCollection<String> lines = MemPipeline.collectionOf("hello", "world");
+    p.writeTextFile(lines, tmpDir.getAbsolutePath());
+    p.done();
+    assertTrue(tmpDir.exists());
+    File[] files = tmpDir.listFiles();
+    assertTrue(files != null && files.length > 0);
+    for (File f : files) {
+      if (!f.getName().startsWith(".")) {
+        List<String> txt = Files.readLines(f, Charsets.UTF_8);
+        assertEquals(ImmutableList.of("hello", "world"), txt);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java
new file mode 100644
index 0000000..9d10e97
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/impl/mr/MRPipelineTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.types.avro.Avros;
+
+public class MRPipelineTest {
+
+  private MRPipeline pipeline;
+
+  @Before
+  public void setUp() throws IOException {
+    pipeline = spy(new MRPipeline(MRPipelineTest.class));
+  }
+
+  @Test
+  public void testGetMaterializeSourceTarget_AlreadyMaterialized() {
+    PCollectionImpl<String> materializedPcollection = mock(PCollectionImpl.class);
+    ReadableSourceTarget<String> readableSourceTarget = mock(ReadableSourceTarget.class);
+    when(materializedPcollection.getMaterializedAt()).thenReturn(readableSourceTarget);
+
+    assertEquals(readableSourceTarget, pipeline.getMaterializeSourceTarget(materializedPcollection));
+  }
+
+  @Test
+  public void testGetMaterializeSourceTarget_NotMaterialized_HasOutput() {
+
+    PCollectionImpl<String> pcollection = mock(PCollectionImpl.class);
+    ReadableSourceTarget<String> readableSourceTarget = mock(ReadableSourceTarget.class);
+    when(pcollection.getPType()).thenReturn(Avros.strings());
+    doReturn(readableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
+    when(pcollection.getMaterializedAt()).thenReturn(null);
+
+    assertEquals(readableSourceTarget, pipeline.getMaterializeSourceTarget(pcollection));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetMaterializeSourceTarget_NotMaterialized_NotReadableSourceTarget() {
+    PCollectionImpl<String> pcollection = mock(PCollectionImpl.class);
+    SourceTarget<String> nonReadableSourceTarget = mock(SourceTarget.class);
+    when(pcollection.getPType()).thenReturn(Avros.strings());
+    doReturn(nonReadableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
+    when(pcollection.getMaterializedAt()).thenReturn(null);
+
+    pipeline.getMaterializeSourceTarget(pcollection);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
new file mode 100644
index 0000000..96fbd7e
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/impl/mr/collect/DoCollectionImplTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.impl.mr.plan.DoNode;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
+
+public class DoCollectionImplTest {
+  
+  
+
+  @Test
+  public void testGetSizeInternal_NoScaleFactor() {
+    runScaleTest(100L, 1.0f, 100L);
+  }
+  
+  @Test
+  public void testGetSizeInternal_ScaleFactorBelowZero() {
+    runScaleTest(100L, 0.5f, 50L);
+  }
+  
+  @Test
+  public void testGetSizeInternal_ScaleFactorAboveZero() {
+    runScaleTest(100L, 1.5f, 150L);
+  }
+  
+  private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize){
+    PCollectionImpl<String> parentCollection = new SizedPCollectionImpl(
+        "Sized collection", inputSize);
+    
+    DoCollectionImpl<String> doCollectionImpl = new DoCollectionImpl<String>(
+        "Scaled collection", parentCollection, new ScaledFunction(scaleFactor),
+        Writables.strings());
+
+    assertEquals(expectedScaledSize, doCollectionImpl.getSizeInternal()); 
+  }
+  
+
+  static class ScaledFunction extends DoFn<String, String>{
+    
+    private float scaleFactor;
+
+    public ScaledFunction(float scaleFactor){
+      this.scaleFactor = scaleFactor;
+    }
+
+    @Override
+    public void process(String input, Emitter<String> emitter) {
+      emitter.emit(input);
+    }
+    
+    @Override
+    public float scaleFactor() {
+      return scaleFactor;
+    }
+    
+  }
+
+  static class SizedPCollectionImpl extends PCollectionImpl<String> {
+
+    private long internalSize;
+
+    public SizedPCollectionImpl(String name, long internalSize) {
+      super(name);
+      this.internalSize = internalSize;
+    }
+
+    @Override
+    public PType getPType() {
+      return null;
+    }
+
+    @Override
+    public DoNode createDoNode() {
+      return null;
+    }
+
+    @Override
+    public List getParents() {
+      return null;
+    }
+
+    @Override
+    protected void acceptInternal(Visitor visitor) {
+    }
+
+    @Override
+    protected long getSizeInternal() {
+      return internalSize;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java
new file mode 100644
index 0000000..d5a9722
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/impl/mr/collect/DoTableImplTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import static org.apache.crunch.types.writable.Writables.strings;
+import static org.apache.crunch.types.writable.Writables.tableOf;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+
+public class DoTableImplTest {
+
+	@Test
+	public void testGetSizeInternal_NoScaleFactor() {
+		runScaleTest(100L, 1.0f, 100L);
+	}
+
+	@Test
+	public void testGetSizeInternal_ScaleFactorBelowZero() {
+		runScaleTest(100L, 0.5f, 50L);
+	}
+
+	@Test
+	public void testGetSizeInternal_ScaleFactorAboveZero() {
+		runScaleTest(100L, 1.5f, 150L);
+	}
+
+	private void runScaleTest(long inputSize, float scaleFactor, long expectedScaledSize) {
+		
+		@SuppressWarnings("unchecked")
+		PCollectionImpl<String> parentCollection = (PCollectionImpl<String>) mock(PCollectionImpl.class);
+		
+		when(parentCollection.getSize()).thenReturn(inputSize);
+
+		DoTableImpl<String, String> doTableImpl = new DoTableImpl<String, String>("Scalled table collection",
+				parentCollection, new TableScaledFunction(scaleFactor), tableOf(strings(),
+						strings()));
+
+		assertEquals(expectedScaledSize, doTableImpl.getSizeInternal());
+		
+		verify(parentCollection).getSize();
+		
+		verifyNoMoreInteractions(parentCollection);
+	}
+
+	static class TableScaledFunction extends DoFn<String, Pair<String, String>> {
+
+		private float scaleFactor;
+
+		public TableScaledFunction(float scaleFactor) {
+			this.scaleFactor = scaleFactor;
+		}
+
+		@Override
+		public float scaleFactor() {
+			return scaleFactor;
+		}
+
+		@Override
+		public void process(String input, Emitter<Pair<String, String>> emitter) {
+			emitter.emit(Pair.of(input, input));
+
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/impl/mr/collect/UnionCollectionTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/collect/UnionCollectionTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/collect/UnionCollectionTest.java
new file mode 100644
index 0000000..7347383
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/impl/mr/collect/UnionCollectionTest.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTableKeyValueTest;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import com.google.common.collect.Lists;
+
+@RunWith(value = Parameterized.class)
+public class UnionCollectionTest {
+
+	private static final Log LOG = LogFactory.getLog(UnionCollectionTest.class);
+
+	private PTypeFamily typeFamily;
+	private Pipeline pipeline;
+	private PCollection<String> union;
+
+	private ArrayList<String> EXPECTED = Lists.newArrayList("a", "a", "b", "c", "c", "d", "e");
+
+	@Before
+	@SuppressWarnings("unchecked")
+	public void setUp() throws IOException {
+		String inputFile1 = FileHelper.createTempCopyOf("set1.txt");
+		String inputFile2 = FileHelper.createTempCopyOf("set2.txt");
+
+		PCollection<String> firstCollection = pipeline.read(At.textFile(inputFile1,
+				typeFamily.strings()));
+		PCollection<String> secondCollection = pipeline.read(At.textFile(inputFile2,
+				typeFamily.strings()));
+
+		LOG.info("Test fixture: [" + pipeline.getClass().getSimpleName() + " : "
+				+ typeFamily.getClass().getSimpleName() + "]  First: "
+				+ Lists.newArrayList(firstCollection.materialize().iterator()) + ", Second: "
+				+ Lists.newArrayList(secondCollection.materialize().iterator()));
+
+		union = secondCollection.union(firstCollection);
+	}
+
+	@After
+	public void tearDown() {
+		pipeline.done();
+	}
+
+	@Parameters
+	public static Collection<Object[]> data() throws IOException {
+		Object[][] data = new Object[][] {
+				{ WritableTypeFamily.getInstance(), new MRPipeline(PTableKeyValueTest.class) },
+				{ WritableTypeFamily.getInstance(), MemPipeline.getInstance() },
+				{ AvroTypeFamily.getInstance(), new MRPipeline(PTableKeyValueTest.class) },
+				{ AvroTypeFamily.getInstance(), MemPipeline.getInstance() } };
+		return Arrays.asList(data);
+	}
+
+	public UnionCollectionTest(PTypeFamily typeFamily, Pipeline pipeline) {
+		this.typeFamily = typeFamily;
+		this.pipeline = pipeline;
+	}
+
+	@Test
+	public void unionMaterializeShouldNotThrowNPE() {
+		checkMaterialized(union.materialize());
+		checkMaterialized(pipeline.materialize(union));
+	}
+
+	private void checkMaterialized(Iterable<String> materialized) {
+
+		List<String> materializedValues = Lists.newArrayList(materialized.iterator());
+		Collections.sort(materializedValues);
+		LOG.info("Materialized union: " + materializedValues);
+
+		assertEquals(EXPECTED, materializedValues);
+	}
+
+	@Test
+	public void unionWriteShouldNotThrowNPE() throws IOException {
+
+		File outputPath1 = FileHelper.createOutputPath();
+		File outputPath2 = FileHelper.createOutputPath();
+		File outputPath3 = FileHelper.createOutputPath();
+
+		if (typeFamily == AvroTypeFamily.getInstance()) {
+			union.write(To.avroFile(outputPath1.getAbsolutePath()));
+			pipeline.write(union, To.avroFile(outputPath2.getAbsolutePath()));
+
+			pipeline.run();
+
+			checkFileContents(outputPath1.getAbsolutePath());
+			checkFileContents(outputPath2.getAbsolutePath());
+
+		} else {
+
+			union.write(To.textFile(outputPath1.getAbsolutePath()));
+			pipeline.write(union, To.textFile(outputPath2.getAbsolutePath()));
+			pipeline.writeTextFile(union, outputPath3.getAbsolutePath());
+
+			pipeline.run();
+
+			checkFileContents(outputPath1.getAbsolutePath());
+			checkFileContents(outputPath2.getAbsolutePath());
+			checkFileContents(outputPath3.getAbsolutePath());
+		}
+
+	}
+
+	private void checkFileContents(String filePath) throws IOException {
+
+		List<String> fileContentValues = (typeFamily != AvroTypeFamily.getInstance() || !(pipeline instanceof MRPipeline)) ? Lists
+				.newArrayList(pipeline.read(At.textFile(filePath, typeFamily.strings())).materialize()
+						.iterator()) : Lists.newArrayList(pipeline.read(At.avroFile(filePath, Avros.strings()))
+				.materialize().iterator());
+
+		Collections.sort(fileContentValues);
+
+		LOG.info("Saved Union: " + fileContentValues);
+		assertEquals(EXPECTED, fileContentValues);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
new file mode 100644
index 0000000..0e0b9d8
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+import org.apache.crunch.types.writable.Writables;
+import com.google.common.collect.Lists;
+
+public class JobNameBuilderTest {
+
+  @Test
+  public void testBuild() {
+    final String pipelineName = "PipelineName";
+    final String nodeName = "outputNode";
+    DoNode doNode = DoNode.createOutputNode(nodeName, Writables.strings());
+    JobNameBuilder jobNameBuilder = new JobNameBuilder(pipelineName);
+    jobNameBuilder.visit(Lists.newArrayList(doNode));
+    String jobName = jobNameBuilder.build();
+    
+    assertEquals(String.format("%s: %s", pipelineName, nodeName), jobName);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/io/CompositePathIterableTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/io/CompositePathIterableTest.java b/crunch/src/test/java/org/apache/crunch/io/CompositePathIterableTest.java
new file mode 100644
index 0000000..f32873c
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/io/CompositePathIterableTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import org.apache.crunch.io.text.TextFileReaderFactory;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.writable.Writables;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+
+public class CompositePathIterableTest {
+
+  
+  @Test
+  public void testCreate_FilePresent() throws IOException{
+    String inputFilePath = FileHelper.createTempCopyOf("set1.txt");
+    Configuration conf = new Configuration();
+    LocalFileSystem local = FileSystem.getLocal(conf);
+    
+    Iterable<String> iterable = CompositePathIterable.create(local, new Path(inputFilePath), new TextFileReaderFactory<String>(Writables.strings(), conf));
+    
+    assertEquals(Lists.newArrayList("b", "c", "a", "e"), Lists.newArrayList(iterable));
+    
+  }
+  
+  @Test
+  public void testCreate_DirectoryPresentButNoFiles() throws IOException{
+    String inputFilePath = Files.createTempDir().getAbsolutePath();
+ 
+    Configuration conf = new Configuration();
+    LocalFileSystem local = FileSystem.getLocal(conf);
+    
+    Iterable<String> iterable = CompositePathIterable.create(local, new Path(inputFilePath), new TextFileReaderFactory<String>(Writables.strings(), conf));
+    
+    assertTrue(Lists.newArrayList(iterable).isEmpty());
+  }
+  
+  @Test(expected=IOException.class)
+  public void testCreate_DirectoryNotPresent() throws IOException{
+    File inputFileDir = Files.createTempDir();
+    inputFileDir.delete();
+    
+    // Sanity check
+    assertFalse(inputFileDir.exists());
+    
+    Configuration conf = new Configuration();
+    LocalFileSystem local = FileSystem.getLocal(conf);
+    
+    CompositePathIterable.create(local, new Path(inputFileDir.getAbsolutePath()), new TextFileReaderFactory<String>(Writables.strings(), conf));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/io/SourceTargetHelperTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/io/SourceTargetHelperTest.java b/crunch/src/test/java/org/apache/crunch/io/SourceTargetHelperTest.java
new file mode 100644
index 0000000..a1dbb5f
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/io/SourceTargetHelperTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+public class SourceTargetHelperTest {
+
+	@Test
+	public void testGetNonexistentPathSize() throws Exception {
+		File tmp = File.createTempFile("pathsize", "");
+		Path tmpPath = new Path(tmp.getAbsolutePath());
+		tmp.delete();
+		FileSystem fs = FileSystem.getLocal(new Configuration());
+		assertEquals(-1L, SourceTargetHelper.getPathSize(fs, tmpPath));
+	}
+
+	@Test
+	public void testGetNonExistentPathSize_NonExistantPath() throws IOException {
+		FileSystem mockFs = new MockFileSystem();
+		assertEquals(-1L, SourceTargetHelper.getPathSize(mockFs, new Path("does/not/exist")));
+	}
+
+	/**
+	 * Mock FileSystem that returns null for {@link FileSystem#listStatus(Path)}.
+	 */
+	static class MockFileSystem extends LocalFileSystem {
+
+		@Override
+		public FileStatus[] listStatus(Path f) throws IOException {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
new file mode 100644
index 0000000..885ed61
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileReaderFactoryTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.crunch.test.Person;
+import org.apache.crunch.types.avro.Avros;
+import com.google.common.collect.Lists;
+
+public class AvroFileReaderFactoryTest {
+
+	private File avroFile;
+
+	@Before
+	public void setUp() throws IOException {
+		// InputSupplier<InputStream> inputStreamSupplier =
+		// newInputStreamSupplier(getResource("person.avro"));
+		avroFile = File.createTempFile("test", ".av");
+	}
+
+	@After
+	public void tearDown() {
+		avroFile.delete();
+	}
+
+	private void populateGenericFile(List<GenericRecord> genericRecords,
+			Schema outputSchema) throws IOException {
+		FileOutputStream outputStream = new FileOutputStream(this.avroFile);
+		GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(
+				outputSchema);
+
+		DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(
+				genericDatumWriter);
+		dataFileWriter.create(outputSchema, outputStream);
+
+		for (GenericRecord record : genericRecords) {
+			dataFileWriter.append(record);
+		}
+
+		dataFileWriter.close();
+		outputStream.close();
+
+	}
+
+	@Test
+	public void testRead_GenericReader() throws IOException {
+		GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+		savedRecord.put("name", "John Doe");
+		savedRecord.put("age", 42);
+		savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+		populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+		AvroFileReaderFactory<GenericData.Record> genericReader = new AvroFileReaderFactory<GenericData.Record>(
+				Avros.generics(Person.SCHEMA$), new Configuration());
+		Iterator<GenericData.Record> recordIterator = genericReader.read(
+				FileSystem.getLocal(new Configuration()), new Path(
+						this.avroFile.getAbsolutePath()));
+
+		GenericRecord genericRecord = recordIterator.next();
+		assertEquals(savedRecord, genericRecord);
+		assertFalse(recordIterator.hasNext());
+	}
+
+	@Test
+	public void testRead_SpecificReader() throws IOException {
+		GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+		savedRecord.put("name", "John Doe");
+		savedRecord.put("age", 42);
+		savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+		populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+		AvroFileReaderFactory<Person> genericReader = new AvroFileReaderFactory<Person>(
+				Avros.records(Person.class), new Configuration());
+		Iterator<Person> recordIterator = genericReader.read(FileSystem
+				.getLocal(new Configuration()),
+				new Path(this.avroFile.getAbsolutePath()));
+
+		Person expectedPerson = new Person();
+		expectedPerson.setAge(42);
+		expectedPerson.setName("John Doe");
+		List<CharSequence> siblingNames = Lists.newArrayList();
+		siblingNames.add("Jimmy");
+		siblingNames.add("Jane");
+		expectedPerson.setSiblingnames(siblingNames);
+
+		Person person = recordIterator.next();
+
+		assertEquals(expectedPerson, person);
+		assertFalse(recordIterator.hasNext());
+	}
+
+	@Test
+	public void testRead_ReflectReader() throws IOException {
+		Schema reflectSchema = ReflectData.get().getSchema(PojoPerson.class);
+		GenericRecord savedRecord = new GenericData.Record(reflectSchema);
+		savedRecord.put("name", "John Doe");
+		populateGenericFile(Lists.newArrayList(savedRecord), reflectSchema);
+
+		AvroFileReaderFactory<PojoPerson> genericReader = new AvroFileReaderFactory<PojoPerson>(
+				Avros.reflects(PojoPerson.class), new Configuration());
+		Iterator<PojoPerson> recordIterator = genericReader.read(FileSystem
+				.getLocal(new Configuration()),
+				new Path(this.avroFile.getAbsolutePath()));
+
+		PojoPerson person = recordIterator.next();
+
+		assertEquals("John Doe", person.getName());
+		assertFalse(recordIterator.hasNext());
+	}
+
+	public static class PojoPerson {
+		private String name;
+
+		public String getName() {
+			return name;
+		}
+
+		public void setName(String name) {
+			this.name = name;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTargetTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTargetTest.java b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTargetTest.java
new file mode 100644
index 0000000..599f569
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTargetTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectData;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.avro.AvroFileReaderFactoryTest.PojoPerson;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.types.avro.Avros;
+import com.google.common.collect.Lists;
+
+@SuppressWarnings("serial")
+public class AvroFileSourceTargetTest implements Serializable {
+
+	private transient File avroFile;
+
+	@Before
+	public void setUp() throws IOException {
+		avroFile = File.createTempFile("test", ".avro");
+	}
+
+	@After
+	public void tearDown() {
+		avroFile.delete();
+	}
+
+	private void populateGenericFile(List<GenericRecord> genericRecords,
+			Schema schema) throws IOException {
+		FileOutputStream outputStream = new FileOutputStream(this.avroFile);
+		GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(
+				schema);
+
+		DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(
+				genericDatumWriter);
+		dataFileWriter.create(schema, outputStream);
+
+		for (GenericRecord record : genericRecords) {
+			dataFileWriter.append(record);
+		}
+
+		dataFileWriter.close();
+		outputStream.close();
+
+	}
+
+	@Test
+	public void testSpecific() throws IOException {
+		GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+		savedRecord.put("name", "John Doe");
+		savedRecord.put("age", 42);
+		savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+		populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+		Pipeline pipeline = new MRPipeline(AvroFileSourceTargetTest.class);
+		PCollection<Person> genericCollection = pipeline.read(At.avroFile(
+				avroFile.getAbsolutePath(), Avros.records(Person.class)));
+
+		List<Person> personList = Lists.newArrayList(genericCollection
+				.materialize());
+
+		Person expectedPerson = new Person();
+		expectedPerson.setName("John Doe");
+		expectedPerson.setAge(42);
+
+		List<CharSequence> siblingNames = Lists.newArrayList();
+		siblingNames.add("Jimmy");
+		siblingNames.add("Jane");
+		expectedPerson.setSiblingnames(siblingNames);
+
+		assertEquals(Lists.newArrayList(expectedPerson),
+				Lists.newArrayList(personList));
+	}
+
+	@Test
+	public void testGeneric() throws IOException {
+		String genericSchemaJson = Person.SCHEMA$.toString().replace("Person",
+				"GenericPerson");
+		Schema genericPersonSchema = new Schema.Parser()
+				.parse(genericSchemaJson);
+		GenericRecord savedRecord = new GenericData.Record(genericPersonSchema);
+		savedRecord.put("name", "John Doe");
+		savedRecord.put("age", 42);
+		savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+		populateGenericFile(Lists.newArrayList(savedRecord),
+				genericPersonSchema);
+
+		Pipeline pipeline = new MRPipeline(AvroFileSourceTargetTest.class);
+		PCollection<Record> genericCollection = pipeline
+				.read(At.avroFile(avroFile.getAbsolutePath(),
+						Avros.generics(genericPersonSchema)));
+
+		List<Record> recordList = Lists.newArrayList(genericCollection
+				.materialize());
+
+		assertEquals(Lists.newArrayList(savedRecord),
+				Lists.newArrayList(recordList));
+	}
+
+	@Test
+	public void testReflect() throws IOException {
+		Schema pojoPersonSchema = ReflectData.get().getSchema(PojoPerson.class);
+		GenericRecord savedRecord = new GenericData.Record(pojoPersonSchema);
+		savedRecord.put("name", "John Doe");
+		populateGenericFile(Lists.newArrayList(savedRecord), pojoPersonSchema);
+
+		Pipeline pipeline = new MRPipeline(AvroFileSourceTargetTest.class);
+		PCollection<PojoPerson> personCollection = pipeline.read(At.avroFile(
+				avroFile.getAbsolutePath(), Avros.reflects(PojoPerson.class)));
+
+		List<PojoPerson> recordList = Lists.newArrayList(personCollection
+				.materialize());
+
+		assertEquals(1, recordList.size());
+		PojoPerson person = recordList.get(0);
+		assertEquals("John Doe", person.getName());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
new file mode 100644
index 0000000..1f04c67
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/io/avro/AvroFileSourceTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.crunch.test.Person;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+
+public class AvroFileSourceTest {
+
+	private Job job;
+	File tempFile;
+
+	@Before
+	public void setUp() throws IOException {
+		job = new Job();
+		tempFile = File.createTempFile("test", ".avr");
+	}
+
+	@After
+	public void tearDown() {
+		tempFile.delete();
+	}
+
+	@Test
+	public void testConfigureJob_SpecificData() throws IOException {
+		AvroType<Person> avroSpecificType = Avros.records(Person.class);
+		AvroFileSource<Person> personFileSource = new AvroFileSource<Person>(
+				new Path(tempFile.getAbsolutePath()), avroSpecificType);
+
+		personFileSource.configureSource(job, -1);
+
+		assertFalse(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT,
+				true));
+		assertEquals(Person.SCHEMA$.toString(),
+				job.getConfiguration().get(AvroJob.INPUT_SCHEMA));
+	}
+
+	@Test
+	public void testConfigureJob_GenericData() throws IOException {
+		AvroType<Record> avroGenericType = Avros.generics(Person.SCHEMA$);
+		AvroFileSource<Record> personFileSource = new AvroFileSource<Record>(
+				new Path(tempFile.getAbsolutePath()), avroGenericType);
+
+		personFileSource.configureSource(job, -1);
+
+		assertTrue(job.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT,
+				false));
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/io/avro/AvroReflectTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/io/avro/AvroReflectTest.java b/crunch/src/test/java/org/apache/crunch/io/avro/AvroReflectTest.java
new file mode 100644
index 0000000..15d27e4
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/io/avro/AvroReflectTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.avro.Avros;
+import com.google.common.collect.Lists;
+
+public class AvroReflectTest implements Serializable {
+
+	static class StringWrapper {
+		private String value;
+
+		public StringWrapper() {
+			this(null);
+		}
+
+		public StringWrapper(String value) {
+			this.value = value;
+		}
+
+		public String getValue() {
+			return value;
+		}
+
+		public void setValue(String value) {
+			this.value = value;
+		}
+
+		@Override
+		public String toString() {
+			return String.format("<StringWrapper(%s)>", value);
+		}
+
+		@Override
+		public int hashCode() {
+			final int prime = 31;
+			int result = 1;
+			result = prime * result + ((value == null) ? 0 : value.hashCode());
+			return result;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (this == obj)
+				return true;
+			if (obj == null)
+				return false;
+			if (getClass() != obj.getClass())
+				return false;
+			StringWrapper other = (StringWrapper) obj;
+			if (value == null) {
+				if (other.value != null)
+					return false;
+			} else if (!value.equals(other.value))
+				return false;
+			return true;
+		}
+
+	}
+
+	@Test
+	public void testReflection() throws IOException {
+		Pipeline pipeline = new MRPipeline(AvroReflectTest.class);
+		PCollection<StringWrapper> stringWrapperCollection = pipeline
+				.readTextFile(FileHelper.createTempCopyOf("set1.txt"))
+				.parallelDo(new MapFn<String, StringWrapper>() {
+
+					@Override
+					public StringWrapper map(String input) {
+						StringWrapper stringWrapper = new StringWrapper();
+						stringWrapper.setValue(input);
+						return stringWrapper;
+					}
+				}, Avros.reflects(StringWrapper.class));
+
+		List<StringWrapper> stringWrappers = Lists
+				.newArrayList(stringWrapperCollection.materialize());
+
+		pipeline.done();
+
+		assertEquals(Lists.newArrayList(new StringWrapper("b"),
+				new StringWrapper("c"), new StringWrapper("a"),
+				new StringWrapper("e")), stringWrappers);
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/lib/AggregateTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/AggregateTest.java b/crunch/src/test/java/org/apache/crunch/lib/AggregateTest.java
new file mode 100644
index 0000000..cc305d0
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/AggregateTest.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.apache.crunch.types.writable.Writables.strings;
+import static org.apache.crunch.types.writable.Writables.tableOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.Employee;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.crunch.types.writable.Writables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+public class AggregateTest {
+
+  @Test public void testWritables() throws Exception {
+    Pipeline pipeline = new MRPipeline(AggregateTest.class);
+    String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
+    PCollection<String> shakes = pipeline.readTextFile(shakesInputPath);
+    runMinMax(shakes, WritableTypeFamily.getInstance());
+    pipeline.done();
+  }
+
+  @Test public void testAvro() throws Exception {
+    Pipeline pipeline = new MRPipeline(AggregateTest.class);
+    String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
+    PCollection<String> shakes = pipeline.readTextFile(shakesInputPath);
+    runMinMax(shakes, AvroTypeFamily.getInstance());
+    pipeline.done();
+  }
+
+  @Test public void testInMemoryAvro() throws Exception {
+    PCollection<String> someText = MemPipeline.collectionOf(
+        "first line", "second line", "third line");
+    runMinMax(someText, AvroTypeFamily.getInstance());
+  }
+  
+  public static void runMinMax(PCollection<String> shakes, PTypeFamily family) throws Exception {
+    PCollection<Integer> lengths = shakes.parallelDo(new MapFn<String, Integer>() {
+      @Override
+      public Integer map(String input) {
+        return input.length();
+      }
+    }, family.ints());
+    PCollection<Integer> negLengths = lengths.parallelDo(new MapFn<Integer, Integer>() {
+      @Override
+      public Integer map(Integer input) {
+        return -input;
+      }
+    }, family.ints());
+    Integer maxLengths = Iterables.getFirst(Aggregate.max(lengths).materialize(), null);
+    Integer minLengths = Iterables.getFirst(Aggregate.min(negLengths).materialize(), null);
+    assertTrue(maxLengths != null);
+    assertTrue(minLengths != null);
+    assertEquals(maxLengths.intValue(), -minLengths.intValue());
+  }
+  
+  private static class SplitFn extends MapFn<String, Pair<String, String>> {
+    @Override
+    public Pair<String, String> map(String input) {
+      String[] p = input.split("\\s+");
+      return Pair.of(p[0], p[1]);
+    }  
+  }
+  
+  @Test public void testCollectUrls() throws Exception {
+    Pipeline p = new MRPipeline(AggregateTest.class);
+    String urlsInputPath = FileHelper.createTempCopyOf("urls.txt");
+    PTable<String, Collection<String>> urls = Aggregate.collectValues(
+        p.readTextFile(urlsInputPath)
+        .parallelDo(new SplitFn(), tableOf(strings(), strings())));
+    for (Pair<String, Collection<String>> e : urls.materialize()) {
+      String key = e.first();
+      int expectedSize = 0;
+      if ("www.A.com".equals(key)) {
+        expectedSize = 4;
+      } else if ("www.B.com".equals(key) || "www.F.com".equals(key)) {
+        expectedSize = 2;
+      } else if ("www.C.com".equals(key) || "www.D.com".equals(key) || "www.E.com".equals(key)) {
+        expectedSize = 1;
+      }
+      assertEquals("Checking key = " + key, expectedSize, e.second().size());
+      p.done();
+    }
+  }
+  
+  @Test public void testTopN() throws Exception {
+    PTableType<String, Integer> ptype = Avros.tableOf(Avros.strings(), Avros.ints());
+    PTable<String, Integer> counts = MemPipeline.typedTableOf(ptype, "foo", 12, "bar", 17, "baz", 29);
+    
+    PTable<String, Integer> top2 = Aggregate.top(counts, 2, true);
+    assertEquals(ImmutableList.of(Pair.of("baz", 29), Pair.of("bar", 17)), top2.materialize());
+    
+    PTable<String, Integer> bottom2 = Aggregate.top(counts, 2, false);
+    assertEquals(ImmutableList.of(Pair.of("foo", 12), Pair.of("bar", 17)), bottom2.materialize());
+  }
+
+  @Test
+  public void testCollectValues_Writables() throws IOException {
+    Pipeline pipeline = new MRPipeline(AggregateTest.class);
+    Map<Integer, Collection<Text>> collectionMap = pipeline
+        .readTextFile(FileHelper.createTempCopyOf("set2.txt"))
+        .parallelDo(new MapStringToTextPair(),
+            Writables.tableOf(Writables.ints(), Writables.writables(Text.class))
+        ).collectValues().materializeToMap();
+
+    assertEquals(1, collectionMap.size());
+
+    assertEquals(Lists.newArrayList(new Text("c"), new Text("d"), new Text("a")),
+        collectionMap.get(1));
+  }
+
+  @Test
+  public void testCollectValues_Avro() throws IOException {
+
+    MapStringToEmployeePair mapFn = new MapStringToEmployeePair();
+    Pipeline pipeline = new MRPipeline(AggregateTest.class);
+    Map<Integer, Collection<Employee>> collectionMap = pipeline
+        .readTextFile(FileHelper.createTempCopyOf("set2.txt"))
+        .parallelDo(mapFn,
+            Avros.tableOf(Avros.ints(), Avros.records(Employee.class))).collectValues()
+        .materializeToMap();
+
+    assertEquals(1, collectionMap.size());
+
+    Employee empC = mapFn.map("c").second();
+    Employee empD = mapFn.map("d").second();
+    Employee empA = mapFn.map("a").second();
+
+    assertEquals(Lists.newArrayList(empC, empD, empA),
+        collectionMap.get(1));
+  }
+
+  private static class MapStringToTextPair extends MapFn<String, Pair<Integer, Text>> {
+    @Override
+    public Pair<Integer, Text> map(String input) {
+      return Pair.of(1, new Text(input));
+    }
+  }
+
+  private static class MapStringToEmployeePair extends MapFn<String, Pair<Integer, Employee>> {
+    @Override
+    public Pair<Integer, Employee> map(String input) {
+      Employee emp = new Employee();
+      emp.setName(input);
+      emp.setSalary(0);
+      emp.setDepartment("");
+      return Pair.of(1, emp);
+    }
+  }
+
+  public static class PojoText {
+    private String value;
+
+    public PojoText() {
+      this("");
+    }
+
+    public PojoText(String value) {
+      this.value = value;
+    }
+
+    public String getValue() {
+      return value;
+    }
+
+    public void setValue(String value) {
+      this.value = value;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("PojoText<%s>", this.value);
+    }
+
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      PojoText other = (PojoText) obj;
+      if (value == null) {
+        if (other.value != null)
+          return false;
+      } else if (!value.equals(other.value))
+        return false;
+      return true;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java b/crunch/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java
new file mode 100644
index 0000000..eb50662
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.AvroValue;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.crunch.lib.join.JoinUtils.AvroIndexedRecordPartitioner;
+
+public class AvroIndexedRecordPartitionerTest {
+
+	private AvroIndexedRecordPartitioner avroPartitioner;
+	
+	@Before
+	public void setUp(){
+		avroPartitioner = new AvroIndexedRecordPartitioner();
+	}
+	
+	@Test
+	public void testGetPartition() {
+		IndexedRecord indexedRecord = new MockIndexedRecord(3);
+		AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
+		
+		assertEquals(3, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 5));
+		assertEquals(1, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 2));
+	}
+	
+	@Test
+	public void testGetPartition_NegativeHashValue(){
+		IndexedRecord indexedRecord = new MockIndexedRecord(-3);
+		AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
+		
+		assertEquals(3, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 5));
+		assertEquals(1, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 2));
+	}
+	
+	@Test
+	public void testGetPartition_IntegerMinValue(){
+		IndexedRecord indexedRecord = new MockIndexedRecord(Integer.MIN_VALUE);
+		AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
+		
+		assertEquals(0, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), Integer.MAX_VALUE));
+	}
+	
+	/**
+	 * Mock implementation of IndexedRecord to give us control over the hashCode.
+	 */
+	static class MockIndexedRecord implements IndexedRecord {
+		
+		private Integer value;
+		
+		public MockIndexedRecord(Integer value){
+			this.value = value;
+		}
+		
+		@Override
+		public int hashCode() {
+			return value.hashCode();
+		}
+
+		@Override
+		public Schema getSchema() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public Object get(int arg0) {
+			return this.value;
+		}
+
+		@Override
+		public void put(int arg0, Object arg1) {
+			throw new UnsupportedOperationException();
+		}
+		
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/lib/AvroTypeSortTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/AvroTypeSortTest.java b/crunch/src/test/java/org/apache/crunch/lib/AvroTypeSortTest.java
new file mode 100644
index 0000000..1eab4d1
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/AvroTypeSortTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.apache.crunch.types.avro.Avros.ints;
+import static org.apache.crunch.types.avro.Avros.records;
+import static org.apache.crunch.types.avro.Avros.strings;
+import static junit.framework.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.Person;
+import com.google.common.collect.Lists;
+
+/**
+ * Test sorting Avro types by selected inner field
+ */
+public class AvroTypeSortTest implements Serializable {
+
+	private static final long serialVersionUID = 1344118240353796561L;
+
+	private transient File avroFile;
+
+	@Before
+	public void setUp() throws IOException {
+		avroFile = File.createTempFile("avrotest", ".avro");
+	}
+
+	@After
+	public void tearDown() {
+		avroFile.delete();
+	}
+
+	@Test
+	public void testSortAvroTypesBySelectedFields() throws Exception {
+
+		MRPipeline pipeline = new MRPipeline(AvroTypeSortTest.class);
+
+		Person ccc10 = createPerson("CCC", 10);
+		Person bbb20 = createPerson("BBB", 20);
+		Person aaa30 = createPerson("AAA", 30);
+
+		writeAvroFile(Lists.newArrayList(ccc10, bbb20, aaa30), avroFile);
+
+		PCollection<Person> unsorted = pipeline.read(At.avroFile(
+				avroFile.getAbsolutePath(), records(Person.class)));
+
+		// Sort by Name
+		MapFn<Person, String> nameExtractor = new MapFn<Person, String>() {
+
+			@Override
+			public String map(Person input) {
+				return input.getName().toString();
+			}
+		};
+
+		PCollection<Person> sortedByName = unsorted
+				.by(nameExtractor, strings()).groupByKey().ungroup().values();
+
+		List<Person> sortedByNameList = Lists.newArrayList(sortedByName
+				.materialize());
+
+		assertEquals(3, sortedByNameList.size());
+		assertEquals(aaa30, sortedByNameList.get(0));
+		assertEquals(bbb20, sortedByNameList.get(1));
+		assertEquals(ccc10, sortedByNameList.get(2));
+
+		// Sort by Age
+
+		MapFn<Person, Integer> ageExtractor = new MapFn<Person, Integer>() {
+
+			@Override
+			public Integer map(Person input) {
+				return input.getAge();
+			}
+		};
+
+		PCollection<Person> sortedByAge = unsorted.by(ageExtractor, ints())
+				.groupByKey().ungroup().values();
+
+		List<Person> sortedByAgeList = Lists.newArrayList(sortedByAge
+				.materialize());
+
+		assertEquals(3, sortedByAgeList.size());
+		assertEquals(ccc10, sortedByAgeList.get(0));
+		assertEquals(bbb20, sortedByAgeList.get(1));
+		assertEquals(aaa30, sortedByAgeList.get(2));
+
+		pipeline.done();
+	}
+
+	private void writeAvroFile(List<Person> people, File avroFile)
+			throws IOException {
+
+		FileOutputStream outputStream = new FileOutputStream(avroFile);
+		SpecificDatumWriter<Person> writer = new SpecificDatumWriter<Person>(
+				Person.class);
+
+		DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(
+				writer);
+		dataFileWriter.create(Person.SCHEMA$, outputStream);
+		for (Person person : people) {
+			dataFileWriter.append(person);
+		}
+		dataFileWriter.close();
+		outputStream.close();
+	}
+
+	private Person createPerson(String name, int age) throws IOException {
+
+		Person person = new Person();
+		person.setAge(age);
+		person.setName(name);
+		List<CharSequence> siblingNames = Lists.newArrayList();
+		person.setSiblingnames(siblingNames);
+
+		return person;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java b/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java
new file mode 100644
index 0000000..442a252
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.junit.Test;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.types.writable.Writables;
+import com.google.common.collect.ImmutableList;
+
+public class CartesianTest {
+
+  @Test
+  public void testCartesianCollection() {
+    ImmutableList<ImmutableList<Integer>> testCases = ImmutableList.of(
+        ImmutableList.of(1, 2, 3, 4, 5), ImmutableList.<Integer>of(1, 2, 3), ImmutableList.<Integer>of());
+
+    for (int t1 = 0; t1 < testCases.size(); t1++) {
+      ImmutableList<Integer> testCase1 = testCases.get(t1);
+      for (int t2 = t1; t2 < testCases.size(); t2++) {
+        ImmutableList<Integer> testCase2 = testCases.get(t2);
+
+        PCollection<Integer> X = MemPipeline.typedCollectionOf(Writables.ints(), testCase1);
+        PCollection<Integer> Y = MemPipeline.typedCollectionOf(Writables.ints(), testCase2);
+
+        PCollection<Pair<Integer,Integer>> cross = Cartesian.cross(X, Y);
+        HashSet<Pair<Integer, Integer>> crossSet = new HashSet<Pair<Integer, Integer>>();
+        for (Iterator<Pair<Integer, Integer>> i = cross.materialize().iterator(); i.hasNext(); ) {
+          crossSet.add(i.next());
+        }
+        assertEquals(crossSet.size(), testCase1.size() * testCase2.size());
+
+        for (int i = 0; i < testCase1.size(); i++) {
+          for (int j = 0; j < testCase2.size(); j++) {
+            assertTrue(crossSet.contains(Pair.of(testCase1.get(i), testCase2.get(j))));
+          }
+        }
+      }
+    }
+  }
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/lib/CogroupTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/CogroupTest.java b/crunch/src/test/java/org/apache/crunch/lib/CogroupTest.java
new file mode 100644
index 0000000..1cfbe59
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/CogroupTest.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.fn.MapKeysFn;
+import org.apache.crunch.fn.MapValuesFn;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import com.google.common.base.Splitter;
+import com.google.common.io.Files;
+
+public class CogroupTest {
+
+  private static class WordSplit extends DoFn<String, Pair<String, Long>> {
+    @Override
+    public void process(String input, Emitter<Pair<String, Long>> emitter) {
+      for (String word : Splitter.on(' ').split(input)) {
+        emitter.emit(Pair.of(word, 1L));
+      }
+    }
+  }
+
+  public static PTable<String, Long> join(PCollection<String> w1,
+      PCollection<String> w2, PTypeFamily ptf) {
+    PTableType<String, Long> ntt = ptf.tableOf(ptf.strings(), ptf.longs());
+    PTable<String, Long> ws1 = w1.parallelDo("ws1", new WordSplit(), ntt);
+    PTable<String, Long> ws2 = w2.parallelDo("ws2", new WordSplit(), ntt);
+    PTable<String, Pair<Collection<Long>, Collection<Long>>> cg = Cogroup.cogroup(ws1, ws2);
+    PTable<String, Long> sums = cg.parallelDo(
+        "wc",
+        new MapValuesFn<String, Pair<Collection<Long>, Collection<Long>>, Long>() {
+          @Override
+          public Long map(Pair<Collection<Long>, Collection<Long>> v) {
+            long sum = 0L;
+            for (Long value : v.first()) {
+              sum += value;
+            }
+            for (Long value : v.second()) {
+              sum += value;
+            }
+            return sum;
+          }
+        }, ntt);
+    return sums.parallelDo("firstletters", new MapKeysFn<String, String, Long>() {
+      @Override
+      public String map(String k1) {
+        if (k1.length() > 0) {
+          return k1.substring(0, 1).toLowerCase();
+        } else {
+          return "";
+        }
+      }
+    }, ntt).groupByKey().combineValues(CombineFn.<String>SUM_LONGS());
+  }
+
+  @Test
+  public void testWritableJoin() throws Exception {
+    run(new MRPipeline(CogroupTest.class), WritableTypeFamily.getInstance());
+  }
+  
+  @Test
+  public void testAvroJoin() throws Exception {
+    run(new MRPipeline(CogroupTest.class), AvroTypeFamily.getInstance());
+  }
+  
+  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+    String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
+    String maughamInputPath = FileHelper.createTempCopyOf("maugham.txt");
+    File output = FileHelper.createOutputPath();
+    
+    PCollection<String> shakespeare = pipeline.read(From.textFile(shakesInputPath));
+    PCollection<String> maugham = pipeline.read(From.textFile(maughamInputPath));
+    pipeline.writeTextFile(join(shakespeare, maugham, typeFamily), output.getAbsolutePath());
+    pipeline.done();
+    
+    File outputFile = new File(output, "part-r-00000");
+    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
+    boolean passed = false;
+    for (String line : lines) {
+      if (line.equals("j\t705")) {
+        passed = true;
+        break;
+      }
+    }
+    assertTrue(passed);
+    
+    output.deleteOnExit();
+  }
+}


Mime
View raw message