incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [11/33] CRUNCH-8: Moving the code into multiple Maven modules. Contributed by Matthias Friedrich
Date Wed, 11 Jul 2012 05:14:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/crunch/src/main/resources/log4j.properties b/crunch/src/main/resources/log4j.properties
new file mode 100644
index 0000000..dc08a07
--- /dev/null
+++ b/crunch/src/main/resources/log4j.properties
@@ -0,0 +1,8 @@
+# ***** Set root logger level to INFO and its only appender to A.
+log4j.logger.org.apache.crunch=info, A
+
+# ***** A is set to be a ConsoleAppender.
+log4j.appender.A=org.apache.log4j.ConsoleAppender
+# ***** A uses PatternLayout.
+log4j.appender.A.layout=org.apache.log4j.PatternLayout
+log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/CollectionsTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/CollectionsTest.java b/crunch/src/test/java/org/apache/crunch/CollectionsTest.java
new file mode 100644
index 0000000..896014a
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/CollectionsTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.junit.Test;
+
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+@SuppressWarnings("serial")
+public class CollectionsTest {
+  
+  public static class AggregateStringListFn implements CombineFn.Aggregator<Collection<String>> {
+    private final Collection<String> rtn = Lists.newArrayList();
+    
+    @Override
+    public void reset() {
+      rtn.clear();
+    }
+    
+    @Override
+    public void update(Collection<String> values) {
+      rtn.addAll(values);
+    }      
+    
+    @Override
+    public Iterable<Collection<String>> results() {
+      return ImmutableList.of(rtn);
+    }
+  }
+  
+  public static PTable<String, Collection<String>> listOfCharcters(PCollection<String> lines, PTypeFamily typeFamily) {
+     
+    return lines.parallelDo(new DoFn<String, Pair<String, Collection<String>>>() {
+      @Override
+      public void process(String line, Emitter<Pair<String, Collection<String>>> emitter) {
+        for (String word : line.split("\\s+")) {
+          Collection<String> characters = Lists.newArrayList();
+          for(char c : word.toCharArray()) {
+            characters.add(String.valueOf(c));
+          }
+          emitter.emit(Pair.of(word, characters));
+        }
+      }
+    }, typeFamily.tableOf(typeFamily.strings(), typeFamily.collections(typeFamily.strings())))
+    .groupByKey()
+    .combineValues(CombineFn.<String, Collection<String>>aggregator(new AggregateStringListFn()));
+  }
+  
+  @Test
+  public void testWritables() throws IOException {
+    run(new MRPipeline(CollectionsTest.class), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testAvro() throws IOException {
+    run(new MRPipeline(CollectionsTest.class), AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testInMemoryWritables() throws IOException {
+    run(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testInMemoryAvro() throws IOException {
+    run(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
+  }
+  
+  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+	String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
+    
+    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
+    Iterable<Pair<String, Collection<String>>> lines = listOfCharcters(shakespeare, typeFamily).materialize();
+    
+    boolean passed = false;
+    for (Pair<String, Collection<String>> line : lines) {
+      if(line.first().startsWith("yellow")) {
+        passed = true;
+        break;
+      }
+    }
+    pipeline.done();
+    assertTrue(passed);
+  }  
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/CombineFnTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/CombineFnTest.java b/crunch/src/test/java/org/apache/crunch/CombineFnTest.java
new file mode 100644
index 0000000..e015498
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/CombineFnTest.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.apache.crunch.CombineFn.MAX_BIGINTS;
+import static org.apache.crunch.CombineFn.MAX_DOUBLES;
+import static org.apache.crunch.CombineFn.MAX_FLOATS;
+import static org.apache.crunch.CombineFn.MAX_INTS;
+import static org.apache.crunch.CombineFn.MAX_LONGS;
+import static org.apache.crunch.CombineFn.MIN_BIGINTS;
+import static org.apache.crunch.CombineFn.MIN_DOUBLES;
+import static org.apache.crunch.CombineFn.MIN_FLOATS;
+import static org.apache.crunch.CombineFn.MIN_INTS;
+import static org.apache.crunch.CombineFn.MIN_LONGS;
+import static org.apache.crunch.CombineFn.SUM_BIGINTS;
+import static org.apache.crunch.CombineFn.SUM_DOUBLES;
+import static org.apache.crunch.CombineFn.SUM_FLOATS;
+import static org.apache.crunch.CombineFn.SUM_INTS;
+import static org.apache.crunch.CombineFn.SUM_LONGS;
+import static org.junit.Assert.assertEquals;
+
+import java.math.BigInteger;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.crunch.CombineFn.Aggregator;
+import org.apache.crunch.CombineFn.AggregatorFactory;
+import org.apache.crunch.CombineFn.FirstNAggregator;
+import org.apache.crunch.CombineFn.LastNAggregator;
+import org.apache.crunch.CombineFn.MaxNAggregator;
+import org.apache.crunch.CombineFn.MinNAggregator;
+import org.apache.crunch.CombineFn.PairAggregator;
+import org.apache.crunch.CombineFn.QuadAggregator;
+import org.apache.crunch.CombineFn.TripAggregator;
+import org.apache.crunch.CombineFn.TupleNAggregator;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+public class CombineFnTest {
+
+  private <T> Iterable<T> applyAggregator(AggregatorFactory<T> a, Iterable<T> values) {
+    return applyAggregator(a.create(), values);
+  }
+  
+  private <T> Iterable<T> applyAggregator(Aggregator<T> a, Iterable<T> values) {
+    a.reset();
+    for (T value : values) {
+      a.update(value);
+    }
+    return a.results();
+  }
+  
+  @Test
+  public void testSums() {
+    assertEquals(ImmutableList.of(1775L),
+        applyAggregator(SUM_LONGS, ImmutableList.of(29L, 17L, 1729L)));
+
+    assertEquals(ImmutableList.of(1765L),
+        applyAggregator(SUM_LONGS, ImmutableList.of(29L, 7L, 1729L)));
+
+    assertEquals(ImmutableList.of(1775),
+        applyAggregator(SUM_INTS, ImmutableList.of(29, 17, 1729)));
+
+    assertEquals(ImmutableList.of(1775.0f),
+        applyAggregator(SUM_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
+
+    assertEquals(ImmutableList.of(1775.0),
+        applyAggregator(SUM_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
+    
+    assertEquals(ImmutableList.of(new BigInteger("1775")),
+        applyAggregator(SUM_BIGINTS,
+            ImmutableList.of(new BigInteger("29"), new BigInteger("17"), new BigInteger("1729"))));
+  }
+  
+  @Test
+  public void testMax() {
+    assertEquals(ImmutableList.of(1729L),
+        applyAggregator(MAX_LONGS, ImmutableList.of(29L, 17L, 1729L)));
+    
+    assertEquals(ImmutableList.of(1729),
+        applyAggregator(MAX_INTS, ImmutableList.of(29, 17, 1729)));
+
+    assertEquals(ImmutableList.of(1729.0f),
+        applyAggregator(MAX_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
+
+    assertEquals(ImmutableList.of(1729.0),
+        applyAggregator(MAX_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
+    
+    assertEquals(ImmutableList.of(1745.0f),
+        applyAggregator(MAX_FLOATS, ImmutableList.of(29f, 1745f, 17f, 1729f)));
+
+    assertEquals(ImmutableList.of(new BigInteger("1729")),
+        applyAggregator(MAX_BIGINTS,
+            ImmutableList.of(new BigInteger("29"), new BigInteger("17"), new BigInteger("1729"))));
+  }
+  
+  @Test
+  public void testMin() {
+    assertEquals(ImmutableList.of(17L),
+        applyAggregator(MIN_LONGS, ImmutableList.of(29L, 17L, 1729L)));
+    
+    assertEquals(ImmutableList.of(17),
+        applyAggregator(MIN_INTS, ImmutableList.of(29, 17, 1729)));
+
+    assertEquals(ImmutableList.of(17.0f),
+        applyAggregator(MIN_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
+
+    assertEquals(ImmutableList.of(17.0),
+        applyAggregator(MIN_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
+    
+    assertEquals(ImmutableList.of(29),
+        applyAggregator(MIN_INTS, ImmutableList.of(29, 170, 1729)));
+    
+    assertEquals(ImmutableList.of(new BigInteger("17")),
+        applyAggregator(MIN_BIGINTS,
+            ImmutableList.of(new BigInteger("29"), new BigInteger("17"), new BigInteger("1729"))));
+  }
+
+  @Test
+  public void testMaxN() {
+    assertEquals(ImmutableList.of(98, 1009), applyAggregator(new MaxNAggregator<Integer>(2),
+        ImmutableList.of(17, 34, 98, 29, 1009)));
+  }
+
+  @Test
+  public void testMinN() {
+    assertEquals(ImmutableList.of(17, 29), applyAggregator(new MinNAggregator<Integer>(2),
+        ImmutableList.of(17, 34, 98, 29, 1009)));
+  }
+
+  @Test
+  public void testFirstN() {
+    assertEquals(ImmutableList.of(17, 34), applyAggregator(new FirstNAggregator<Integer>(2),
+        ImmutableList.of(17, 34, 98, 29, 1009)));
+  }
+
+  @Test
+  public void testLastN() {
+    assertEquals(ImmutableList.of(29, 1009), applyAggregator(new LastNAggregator<Integer>(2),
+        ImmutableList.of(17, 34, 98, 29, 1009)));
+  }
+  
+  @Test
+  public void testPairs() {
+    List<Pair<Long, Double>> input = ImmutableList.of(Pair.of(1720L, 17.29), Pair.of(9L, -3.14));
+    Aggregator<Pair<Long, Double>> a = new PairAggregator<Long, Double>(
+        SUM_LONGS.create(), MIN_DOUBLES.create());
+    assertEquals(Pair.of(1729L, -3.14), Iterables.getOnlyElement(applyAggregator(a, input)));
+  }
+  
+  @Test
+  public void testPairsTwoLongs() {
+    List<Pair<Long, Long>> input = ImmutableList.of(Pair.of(1720L, 1L), Pair.of(9L, 19L));
+    Aggregator<Pair<Long, Long>> a = new PairAggregator<Long, Long>(
+        SUM_LONGS.create(), SUM_LONGS.create());
+    assertEquals(Pair.of(1729L, 20L), Iterables.getOnlyElement(applyAggregator(a, input)));
+  }
+  
+  @Test
+  public void testTrips() {
+    List<Tuple3<Float, Double, Double>> input = ImmutableList.of(
+        Tuple3.of(17.29f, 12.2, 0.1), Tuple3.of(3.0f, 1.2, 3.14), Tuple3.of(-1.0f, 14.5, -0.98));
+    Aggregator<Tuple3<Float, Double, Double>> a = new TripAggregator<Float, Double, Double>(
+        MAX_FLOATS.create(), MAX_DOUBLES.create(), MIN_DOUBLES.create());
+    assertEquals(Tuple3.of(17.29f, 14.5, -0.98),
+        Iterables.getOnlyElement(applyAggregator(a, input)));
+  }
+  
+  @Test
+  public void testQuads() {
+    List<Tuple4<Float, Double, Double, Integer>> input = ImmutableList.of(
+        Tuple4.of(17.29f, 12.2, 0.1, 1), Tuple4.of(3.0f, 1.2, 3.14, 2),
+        Tuple4.of(-1.0f, 14.5, -0.98, 3));
+    Aggregator<Tuple4<Float, Double, Double, Integer>> a =
+        new QuadAggregator<Float, Double, Double, Integer>(MAX_FLOATS.create(),
+            MAX_DOUBLES.create(), MIN_DOUBLES.create(), SUM_INTS.create());
+    assertEquals(Tuple4.of(17.29f, 14.5, -0.98, 6),
+        Iterables.getOnlyElement(applyAggregator(a, input)));
+  }
+
+  @Test
+  public void testTupleN() {
+    List<TupleN> input = ImmutableList.of(new TupleN(1, 3.0, 1, 2.0, 4L),
+        new TupleN(4, 17.0, 1, 9.7, 12L));
+    Aggregator<TupleN> a = new TupleNAggregator(MIN_INTS.create(), SUM_DOUBLES.create(),
+        MAX_INTS.create(), MIN_DOUBLES.create(), MAX_LONGS.create());
+    assertEquals(new TupleN(1, 20.0, 1, 2.0, 12L),
+        Iterables.getOnlyElement(applyAggregator(a, input)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/FilterFnTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/FilterFnTest.java b/crunch/src/test/java/org/apache/crunch/FilterFnTest.java
new file mode 100644
index 0000000..a173bc5
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/FilterFnTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class FilterFnTest {
+
+  private static final FilterFn<String> TRUE = new FilterFn<String>() {
+    @Override
+    public boolean accept(String input) {
+      return true;
+    }
+  };
+
+  private static final FilterFn<String> FALSE = new FilterFn<String>() {
+    @Override
+    public boolean accept(String input) {
+      return false;
+    }
+  };
+  
+  @Test
+  public void testAnd() {
+    assertTrue(FilterFn.and(TRUE).accept("foo"));
+    assertTrue(FilterFn.and(TRUE, TRUE).accept("foo"));
+    assertFalse(FilterFn.and(TRUE, FALSE).accept("foo"));
+    assertFalse(FilterFn.and(FALSE, FALSE, FALSE).accept("foo"));
+  }
+  
+  @Test
+  public void testOr() {
+    assertFalse(FilterFn.or(FALSE).accept("foo"));
+    assertTrue(FilterFn.or(FALSE, TRUE).accept("foo"));
+    assertTrue(FilterFn.or(TRUE, FALSE, TRUE).accept("foo"));
+    assertFalse(FilterFn.or(FALSE, FALSE, FALSE).accept("foo"));
+  }
+
+  @Test
+  public void testNot() {
+    assertFalse(FilterFn.not(TRUE).accept("foo"));
+    assertTrue(FilterFn.not(FALSE).accept("foo"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/MapsTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/MapsTest.java b/crunch/src/test/java/org/apache/crunch/MapsTest.java
new file mode 100644
index 0000000..5c9eb57
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/MapsTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+public class MapsTest {
+
+  @Test
+  public void testWritables() throws Exception {
+	run(WritableTypeFamily.getInstance());
+  }
+  
+  @Test
+  public void testAvros() throws Exception {
+	run(AvroTypeFamily.getInstance());
+  }
+  
+  public static void run(PTypeFamily typeFamily) throws Exception {
+	Pipeline pipeline = new MRPipeline(MapsTest.class);
+    String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
+    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
+    Iterable<Pair<String, Map<String, Long>>> output = shakespeare.parallelDo(
+      new DoFn<String, Pair<String, Map<String, Long>>>() {
+	    @Override
+	    public void process(String input,
+		    Emitter<Pair<String, Map<String, Long>>> emitter) {
+		  String last = null;
+		  for (String word : input.toLowerCase().split("\\W+")) {
+		    if (!word.isEmpty()) {
+			  String firstChar = word.substring(0, 1);
+		      if (last != null) {
+		    	Map<String, Long> cc = ImmutableMap.of(firstChar, 1L);
+			    emitter.emit(Pair.of(last, cc));
+		      }
+		      last = firstChar;
+		    }
+		  }
+	    }
+      }, typeFamily.tableOf(typeFamily.strings(), typeFamily.maps(typeFamily.longs())))
+      .groupByKey()
+      .combineValues(new CombineFn<String, Map<String, Long>>() {
+	    @Override
+	    public void process(Pair<String, Iterable<Map<String, Long>>> input,
+		    Emitter<Pair<String, Map<String, Long>>> emitter) {
+		  Map<String, Long> agg = Maps.newHashMap();
+		  for (Map<String, Long> in : input.second()) {
+		    for (Map.Entry<String, Long> e : in.entrySet()) {
+			  if (!agg.containsKey(e.getKey())) {
+			    agg.put(e.getKey(), e.getValue());
+			  } else {
+			    agg.put(e.getKey(), e.getValue() + agg.get(e.getKey()));
+			  }
+		    }
+		  }
+		  emitter.emit(Pair.of(input.first(), agg));
+	    }
+	  }).materialize();
+    boolean passed = false;
+    for (Pair<String, Map<String, Long>> v : output) {
+      if (v.first() == "k" && v.second().get("n") == 8L) {
+    	passed = true;
+    	break;
+      }
+    }
+    pipeline.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/MaterializeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/MaterializeTest.java b/crunch/src/test/java/org/apache/crunch/MaterializeTest.java
new file mode 100644
index 0000000..f6b8ea2
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/MaterializeTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import com.google.common.collect.Lists;
+
+public class MaterializeTest {
+
+	/** Filter that rejects everything. */
+	@SuppressWarnings("serial")
+	private static class FalseFilterFn extends FilterFn<String> {
+
+		@Override
+		public boolean accept(final String input) {
+			return false;
+		}
+	}
+
+	@Test
+	public void testMaterializeInput_Writables() throws IOException {
+		runMaterializeInput(new MRPipeline(MaterializeTest.class), WritableTypeFamily.getInstance());
+	}
+
+	@Test
+	public void testMaterializeInput_Avro() throws IOException {
+		runMaterializeInput(new MRPipeline(MaterializeTest.class), AvroTypeFamily.getInstance());
+	}
+
+	@Test
+	public void testMaterializeInput_InMemoryWritables() throws IOException {
+		runMaterializeInput(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
+	}
+
+	@Test
+	public void testMaterializeInput_InMemoryAvro() throws IOException {
+		runMaterializeInput(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
+	}
+
+	@Test
+	public void testMaterializeEmptyIntermediate_Writables() throws IOException {
+		runMaterializeEmptyIntermediate(new MRPipeline(MaterializeTest.class),
+				WritableTypeFamily.getInstance());
+	}
+
+	@Test
+	public void testMaterializeEmptyIntermediate_Avro() throws IOException {
+		runMaterializeEmptyIntermediate(new MRPipeline(MaterializeTest.class),
+				AvroTypeFamily.getInstance());
+	}
+
+	@Test
+	public void testMaterializeEmptyIntermediate_InMemoryWritables() throws IOException {
+		runMaterializeEmptyIntermediate(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
+	}
+
+	@Test
+	public void testMaterializeEmptyIntermediate_InMemoryAvro() throws IOException {
+		runMaterializeEmptyIntermediate(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
+	}
+
+	public void runMaterializeInput(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+		List<String> expectedContent = Lists.newArrayList("b", "c", "a", "e");
+		String inputPath = FileHelper.createTempCopyOf("set1.txt");
+
+		PCollection<String> lines = pipeline.readTextFile(inputPath);
+		assertEquals(expectedContent, Lists.newArrayList(lines.materialize()));
+		pipeline.done();
+	}
+
+	public void runMaterializeEmptyIntermediate(Pipeline pipeline, PTypeFamily typeFamily)
+			throws IOException {
+		String inputPath = FileHelper.createTempCopyOf("set1.txt");
+		PCollection<String> empty = pipeline.readTextFile(inputPath).filter(new FalseFilterFn());
+
+		assertTrue(Lists.newArrayList(empty.materialize()).isEmpty());
+		pipeline.done();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/MaterializeToMapTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/MaterializeToMapTest.java b/crunch/src/test/java/org/apache/crunch/MaterializeToMapTest.java
new file mode 100644
index 0000000..5c92019
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/MaterializeToMapTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static junit.framework.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PTypeFamily;
+import com.google.common.collect.ImmutableList;
+
+public class MaterializeToMapTest {
+  
+  static final ImmutableList<Pair<Integer,String>> kvPairs = 
+      ImmutableList.of(
+          Pair.of(0, "a"),
+          Pair.of(1, "b"),
+          Pair.of(2, "c"),
+          Pair.of(3, "e"));
+  
+  public void assertMatches(Map<Integer,String> m) {
+    for (Integer k : m.keySet()) {
+      System.out.println(k + " " + kvPairs.get(k).second() + " " + m.get(k));
+      assertTrue(kvPairs.get(k).second().equals(m.get(k)));
+    }
+  }
+  
+  @Test
+  public void testMemMaterializeToMap() {
+    assertMatches(MemPipeline.tableOf(kvPairs).materializeToMap());
+  }
+  
+  private static class Set1Mapper extends MapFn<String,Pair<Integer,String>> {
+    @Override
+    public Pair<Integer, String> map(String input) {
+      
+      int k = -1;
+      if (input.equals("a")) k = 0;
+      else if (input.equals("b")) k = 1;
+      else if (input.equals("c")) k = 2;
+      else if (input.equals("e")) k = 3;
+      return Pair.of(k, input);
+    }
+  }
+  
+  @Test
+  public void testMRMaterializeToMap() throws IOException {
+    Pipeline p = new MRPipeline(MaterializeToMapTest.class);
+    String inputFile = FileHelper.createTempCopyOf("set1.txt");
+    PCollection<String> c = p.readTextFile(inputFile);
+    PTypeFamily tf = c.getTypeFamily();
+    PTable<Integer,String> t = c.parallelDo(new Set1Mapper(), tf.tableOf(tf.ints(), tf.strings()));
+    Map<Integer, String> m = t.materializeToMap();
+    assertMatches(m);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/MultipleOutputTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/MultipleOutputTest.java b/crunch/src/test/java/org/apache/crunch/MultipleOutputTest.java
new file mode 100644
index 0000000..ad78fda
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/MultipleOutputTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import com.google.common.io.Files;
+
+public class MultipleOutputTest {
+  
+  public static PCollection<String> evenCountLetters(PCollection<String> words, PTypeFamily typeFamily) {
+    return words.parallelDo("even", new FilterFn<String>(){
+
+        @Override
+        public boolean accept(String input) {
+            return input.length() % 2 == 0;
+        }}, typeFamily.strings());
+  }
+  
+  public static PCollection<String> oddCountLetters(PCollection<String> words, PTypeFamily typeFamily) {
+      return words.parallelDo("odd", new FilterFn<String>(){
+
+        @Override
+        public boolean accept(String input) {
+            return input.length() % 2 != 0;
+        }}, typeFamily.strings());
+       
+    }
+  
+  public static PTable<String, Long> substr(PTable<String, Long> ptable) {
+	return ptable.parallelDo(new DoFn<Pair<String, Long>, Pair<String, Long>>() {
+	  public void process(Pair<String, Long> input,
+		  Emitter<Pair<String, Long>> emitter) {
+		if (input.first().length() > 0) {
+		  emitter.emit(Pair.of(input.first().substring(0, 1), input.second()));
+		}
+	  }      
+    }, ptable.getPTableType());
+  }
+  
+  @Test
+  public void testWritables() throws IOException {
+    run(new MRPipeline(MultipleOutputTest.class), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testAvro() throws IOException {
+    run(new MRPipeline(MultipleOutputTest.class), AvroTypeFamily.getInstance());
+  }
+ 
+  
+  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+	String inputPath = FileHelper.createTempCopyOf("letters.txt");
+	File outputEven = FileHelper.createOutputPath();
+	File outputOdd = FileHelper.createOutputPath();
+	String outputPathEven = outputEven.getAbsolutePath();
+	String outputPathOdd = outputOdd.getAbsolutePath();
+	
+    PCollection<String> words = pipeline.read(
+         At.textFile(inputPath, typeFamily.strings()));
+    
+    PCollection<String> evenCountWords = evenCountLetters(words, typeFamily);
+    PCollection<String> oddCountWords = oddCountLetters(words, typeFamily);
+    pipeline.writeTextFile(evenCountWords, outputPathEven);
+    pipeline.writeTextFile(oddCountWords, outputPathOdd);
+    
+    pipeline.done();
+   
+    checkFileContents(outputPathEven, Arrays.asList("bb"));
+    checkFileContents(outputPathOdd, Arrays.asList("a"));
+   
+	outputEven.deleteOnExit();
+	outputOdd.deleteOnExit();
+  }  
+  
+  private void checkFileContents(String filePath, List<String> expected) throws IOException{
+    File outputFile = new File(filePath, "part-m-00000");
+    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
+    assertEquals(expected, lines);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/PCollectionGetSizeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/PCollectionGetSizeTest.java b/crunch/src/test/java/org/apache/crunch/PCollectionGetSizeTest.java
new file mode 100644
index 0000000..eee3cd5
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/PCollectionGetSizeTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.apache.crunch.io.At.sequenceFile;
+import static org.apache.crunch.io.At.textFile;
+import static org.apache.crunch.types.writable.Writables.strings;
+import static com.google.common.collect.Lists.newArrayList;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.FileHelper;
+
+public class PCollectionGetSizeTest {
+
+    private String emptyInputPath;
+    private String nonEmptyInputPath;
+    private String outputPath;
+
+    /** Filter that rejects everything. */
+    @SuppressWarnings("serial")
+    private static class FalseFilterFn extends FilterFn<String> {
+
+        @Override
+        public boolean accept(final String input) {
+            return false;
+        }
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        emptyInputPath = FileHelper.createTempCopyOf("emptyTextFile.txt");
+        nonEmptyInputPath = FileHelper.createTempCopyOf("set1.txt");
+        outputPath = FileHelper.createOutputPath().getAbsolutePath();
+    }
+
+    @Test
+    public void testGetSizeOfEmptyInput_MRPipeline() throws IOException {
+        testCollectionGetSizeOfEmptyInput(new MRPipeline(this.getClass()));
+    }
+
+    @Test
+    public void testGetSizeOfEmptyInput_MemPipeline() throws IOException {
+        testCollectionGetSizeOfEmptyInput(MemPipeline.getInstance());
+    }
+
+    private void testCollectionGetSizeOfEmptyInput(Pipeline pipeline) throws IOException {
+
+        assertThat(pipeline.read(textFile(emptyInputPath)).getSize(), is(0L));
+    }
+
+    @Test
+    public void testMaterializeEmptyInput_MRPipeline() throws IOException {
+        testMaterializeEmptyInput(new MRPipeline(this.getClass()));
+    }
+
+    @Test
+    public void testMaterializeEmptyImput_MemPipeline() throws IOException {
+        testMaterializeEmptyInput(MemPipeline.getInstance());
+    }
+
+    private void testMaterializeEmptyInput(Pipeline pipeline) throws IOException {
+        assertThat(newArrayList(pipeline.readTextFile(emptyInputPath).materialize().iterator()).size(), is(0));
+    }
+
+    @Test
+    public void testGetSizeOfEmptyIntermediatePCollection_MRPipeline() throws IOException {
+
+        PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(new MRPipeline(this.getClass()));
+
+        assertThat(emptyIntermediate.getSize(), is(0L));
+    }
+
+    @Test
+    @Ignore("GetSize of a DoCollection is only an estimate based on scale factor, so we can't count on it being reported as 0")
+    public void testGetSizeOfEmptyIntermediatePCollection_NoSave_MRPipeline() throws IOException {
+
+        PCollection<String> data = new MRPipeline(this.getClass()).readTextFile(nonEmptyInputPath);
+
+        PCollection<String> emptyPCollection = data.filter(new FalseFilterFn());
+
+        assertThat(emptyPCollection.getSize(), is(0L));
+    }
+
+    @Test
+    public void testGetSizeOfEmptyIntermediatePCollection_MemPipeline() {
+
+        PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(MemPipeline.getInstance());
+
+        assertThat(emptyIntermediate.getSize(), is(0L));
+    }
+
+    @Test
+    public void testMaterializeOfEmptyIntermediatePCollection_MRPipeline() throws IOException {
+
+        PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(new MRPipeline(this.getClass()));
+
+        assertThat(newArrayList(emptyIntermediate.materialize()).size(), is(0));
+    }
+
+    @Test
+    public void testMaterializeOfEmptyIntermediatePCollection_MemPipeline() {
+
+        PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(MemPipeline.getInstance());
+
+        assertThat(newArrayList(emptyIntermediate.materialize()).size(), is(0));
+    }
+
+    private PCollection<String> createPesistentEmptyIntermediate(Pipeline pipeline) {
+
+        PCollection<String> data = pipeline.readTextFile(nonEmptyInputPath);
+
+        PCollection<String> emptyPCollection = data.filter(new FalseFilterFn());
+
+        emptyPCollection.write(sequenceFile(outputPath, strings()));
+
+        pipeline.run();
+
+        return pipeline.read(sequenceFile(outputPath, strings()));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testExpectExceptionForGettingSizeOfNonExistingFile_MRPipeline() throws IOException {
+        new MRPipeline(this.getClass()).readTextFile("non_existing.file").getSize();
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testExpectExceptionForGettingSizeOfNonExistingFile_MemPipeline() {
+        MemPipeline.getInstance().readTextFile("non_existing.file").getSize();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/PTableKeyValueTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/PTableKeyValueTest.java b/crunch/src/test/java/org/apache/crunch/PTableKeyValueTest.java
new file mode 100644
index 0000000..eadff17
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/PTableKeyValueTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+
+import junit.framework.Assert;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import com.google.common.collect.Lists;
+
+@RunWith(value = Parameterized.class)
+public class PTableKeyValueTest implements Serializable {
+
+	private static final long serialVersionUID = 4374227704751746689L;
+
+	private transient PTypeFamily typeFamily;
+	private transient MRPipeline pipeline;
+	private transient String inputFile;
+
+	@Before
+	public void setUp() throws IOException {
+		pipeline = new MRPipeline(PTableKeyValueTest.class);
+		inputFile = FileHelper.createTempCopyOf("set1.txt");
+	}
+
+	@After
+	public void tearDown() {
+		pipeline.done();
+	}
+
+	public PTableKeyValueTest(PTypeFamily typeFamily) {
+		this.typeFamily = typeFamily;
+	}
+
+	@Parameters
+	public static Collection<Object[]> data() {
+		Object[][] data = new Object[][] {
+				{ WritableTypeFamily.getInstance() },
+				{ AvroTypeFamily.getInstance() } };
+		return Arrays.asList(data);
+	}
+
+	@Test
+	public void testKeysAndValues() throws Exception {
+
+		PCollection<String> collection = pipeline.read(At.textFile(inputFile,
+				typeFamily.strings()));
+
+		PTable<String, String> table = collection.parallelDo(
+				new DoFn<String, Pair<String, String>>() {
+
+					@Override
+					public void process(String input,
+							Emitter<Pair<String, String>> emitter) {
+						emitter.emit(Pair.of(input.toUpperCase(), input));
+
+					}
+				}, typeFamily.tableOf(typeFamily.strings(),
+						typeFamily.strings()));
+
+		PCollection<String> keys = table.keys();
+		PCollection<String> values = table.values();
+
+		ArrayList<String> keyList = Lists.newArrayList(keys.materialize()
+				.iterator());
+		ArrayList<String> valueList = Lists.newArrayList(values.materialize()
+				.iterator());
+
+		Assert.assertEquals(keyList.size(), valueList.size());
+		for (int i = 0; i < keyList.size(); i++) {
+			Assert.assertEquals(keyList.get(i), valueList.get(i).toUpperCase());
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/PageRankTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/PageRankTest.java b/crunch/src/test/java/org/apache/crunch/PageRankTest.java
new file mode 100644
index 0000000..5edc5d6
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/PageRankTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.crunch.util.PTypes;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+public class PageRankTest {
+
+  public static class PageRankData {
+	public float score;
+	public float lastScore;
+	public List<String> urls;
+	
+	public PageRankData() { }
+	
+	public PageRankData(float score, float lastScore, Iterable<String> urls) {
+	  this.score = score;
+	  this.lastScore = lastScore;
+	  this.urls = Lists.newArrayList(urls);
+	}
+	
+	public PageRankData next(float newScore) {
+	  return new PageRankData(newScore, score, urls);
+	}
+	
+	public float propagatedScore() {
+	  return score / urls.size();
+	}
+	
+	@Override
+	public String toString() {
+	  return score + " " + lastScore + " " + urls;
+	}
+  }
+  
+  @Test public void testAvroReflect() throws Exception {
+	PTypeFamily tf = AvroTypeFamily.getInstance();
+	PType<PageRankData> prType = Avros.reflects(PageRankData.class);
+    run(new MRPipeline(PageRankTest.class), prType, tf);	
+  }
+  
+  @Test public void testAvroMReflectInMemory() throws Exception {
+    PTypeFamily tf = AvroTypeFamily.getInstance();
+    PType<PageRankData> prType = Avros.reflects(PageRankData.class);
+    run(MemPipeline.getInstance(), prType, tf);        
+  }
+  
+  @Test public void testAvroJSON() throws Exception {
+	PTypeFamily tf = AvroTypeFamily.getInstance();
+	PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
+    run(new MRPipeline(PageRankTest.class), prType, tf);
+  }
+
+  @Test public void testAvroBSON() throws Exception {
+	PTypeFamily tf = AvroTypeFamily.getInstance();
+	PType<PageRankData> prType = PTypes.smile(PageRankData.class, tf);
+    run(new MRPipeline(PageRankTest.class), prType, tf);
+  }
+  
+  @Test public void testWritablesJSON() throws Exception {
+	PTypeFamily tf = WritableTypeFamily.getInstance();
+	PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
+    run(new MRPipeline(PageRankTest.class), prType, tf);
+  }
+
+  @Test public void testWritablesBSON() throws Exception {
+	PTypeFamily tf = WritableTypeFamily.getInstance();
+	PType<PageRankData> prType = PTypes.smile(PageRankData.class, tf);
+    run(new MRPipeline(PageRankTest.class), prType, tf);
+  }
+  
+  public static PTable<String, PageRankData> pageRank(PTable<String, PageRankData> input, final float d) {
+    PTypeFamily ptf = input.getTypeFamily();
+    PTable<String, Float> outbound = input.parallelDo(
+        new DoFn<Pair<String, PageRankData>, Pair<String, Float>>() {
+          @Override
+          public void process(Pair<String, PageRankData> input, Emitter<Pair<String, Float>> emitter) {
+            PageRankData prd = input.second();
+            for (String link : prd.urls) {
+              emitter.emit(Pair.of(link, prd.propagatedScore()));
+            }
+          }
+        }, ptf.tableOf(ptf.strings(), ptf.floats()));
+    
+    return input.cogroup(outbound).parallelDo(
+        new MapFn<Pair<String, Pair<Collection<PageRankData>, Collection<Float>>>, Pair<String, PageRankData>>() {
+              @Override
+              public Pair<String, PageRankData> map(Pair<String, Pair<Collection<PageRankData>, Collection<Float>>> input) {
+                PageRankData prd = Iterables.getOnlyElement(input.second().first());
+                Collection<Float> propagatedScores = input.second().second();
+                float sum = 0.0f;
+                for (Float s : propagatedScores) {
+                  sum += s;
+                }
+                return Pair.of(input.first(), prd.next(d + (1.0f - d)*sum));
+              }
+            }, input.getPTableType());
+  }
+  
+  public static void run(Pipeline pipeline, PType<PageRankData> prType, PTypeFamily ptf) throws Exception {
+    String urlInput = FileHelper.createTempCopyOf("urls.txt");
+    PTable<String, PageRankData> scores = pipeline.readTextFile(urlInput)
+        .parallelDo(new MapFn<String, Pair<String, String>>() {
+          @Override
+          public Pair<String, String> map(String input) {
+            String[] urls = input.split("\\t");
+            return Pair.of(urls[0], urls[1]);
+          }
+        }, ptf.tableOf(ptf.strings(), ptf.strings()))
+        .groupByKey()
+        .parallelDo(new MapFn<Pair<String, Iterable<String>>, Pair<String, PageRankData>>() {
+              @Override
+              public Pair<String, PageRankData> map(Pair<String, Iterable<String>> input) {
+                return Pair.of(input.first(), new PageRankData(1.0f, 0.0f, input.second()));
+              }
+            }, ptf.tableOf(ptf.strings(), prType));
+    
+    Float delta = 1.0f;
+    while (delta > 0.01) {
+      scores = pageRank(scores, 0.5f);
+      scores.materialize().iterator(); // force the write
+      delta = Iterables.getFirst(Aggregate.max(
+          scores.parallelDo(new MapFn<Pair<String, PageRankData>, Float>() {
+            @Override
+            public Float map(Pair<String, PageRankData> input) {
+              PageRankData prd = input.second();
+              return Math.abs(prd.score - prd.lastScore);
+            }
+          }, ptf.floats())).materialize(), null);
+    }
+    assertEquals(0.0048, delta, 0.001);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/PairTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/PairTest.java b/crunch/src/test/java/org/apache/crunch/PairTest.java
new file mode 100644
index 0000000..eff183b
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/PairTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+
+public class PairTest {
+  
+  @Test
+  public void testPairConstructor() {
+    Pair<String, Integer> pair = new Pair<String, Integer>("brock", 45);
+    test(pair);
+  }
+
+  @Test
+  public void testPairOf() {
+    Pair<String, Integer> pair = Pair.of("brock", 45);
+    test(pair);
+  }
+
+  protected void test(Pair<String, Integer> pair) {
+    assertTrue(pair.size() == 2);
+    
+    assertEquals("brock", pair.first());
+    assertEquals(new Integer(45), pair.second());
+    assertEquals(Pair.of("brock", 45), pair);
+    
+    assertEquals("brock", pair.get(0));
+    assertEquals(new Integer(45), pair.get(1));
+
+    try {
+      pair.get(-1);
+      fail();
+    } catch (IndexOutOfBoundsException e) {
+      // expected
+    }
+  }
+  
+  @Test
+  public void testPairComparisons() {
+    assertEquals(0, Pair.of(null, null).compareTo(Pair.of(null, null)));
+    assertEquals(0, Pair.of(1, 2).compareTo(Pair.of(1, 2)));
+    assertTrue(Pair.of(2, "a").compareTo(Pair.of(1, "a")) > 0);
+    assertTrue(Pair.of("a", 2).compareTo(Pair.of("a", 1)) > 0);
+    assertTrue(Pair.of(null, 17).compareTo(Pair.of(null, 29)) < 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/TFIDFTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/TFIDFTest.java b/crunch/src/test/java/org/apache/crunch/TFIDFTest.java
new file mode 100644
index 0000000..d22bf06
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/TFIDFTest.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static com.google.common.io.Resources.getResource;
+import static com.google.common.io.Resources.newInputStreamSupplier;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import org.apache.crunch.fn.MapKeysFn;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.seq.SeqFileSourceTarget;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.lib.Join;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+@SuppressWarnings("serial")
+public class TFIDFTest implements Serializable {  
+  // total number of documents, should calculate
+  protected static final double N = 2;
+  
+  @Test
+  public void testWritablesSingleRun() throws IOException {
+    run(new MRPipeline(TFIDFTest.class), WritableTypeFamily.getInstance(), true);
+  }
+
+  @Test
+  public void testWritablesMultiRun() throws IOException {
+    run(new MRPipeline(TFIDFTest.class), WritableTypeFamily.getInstance(), false);
+  }
+
+  /**
+   * This method should generate a TF-IDF score for the input.
+   */
+  public PTable<String, Collection<Pair<String, Double>>>  generateTFIDF(PCollection<String> docs,
+      Path termFreqPath, PTypeFamily ptf) throws IOException {    
+    
+    /*
+     * Input: String
+     * Input title  text
+     * 
+     * Output: PTable<Pair<String, String>, Long> 
+     * Pair<Pair<word, title>, count in title>
+     */
+    PTable<Pair<String, String>, Long> tf = Aggregate.count(docs.parallelDo("term document frequency",
+        new DoFn<String, Pair<String, String>>() {
+      @Override
+      public void process(String doc, Emitter<Pair<String, String>> emitter) {
+        String[] kv = doc.split("\t");
+        String title = kv[0];
+        String text = kv[1];
+        for (String word : text.split("\\W+")) {
+          if(word.length() > 0) {
+            Pair<String, String> pair = Pair.of(word.toLowerCase(), title);
+            emitter.emit(pair);
+          }
+        }
+      }
+    }, ptf.pairs(ptf.strings(), ptf.strings())));
+    
+    tf.write(new SeqFileSourceTarget<Pair<Pair<String, String>, Long>>(termFreqPath, tf.getPType()));
+    
+    /*
+     * Input: Pair<Pair<String, String>, Long>
+     * Pair<Pair<word, title>, count in title>
+     * 
+     * Output: PTable<String, Long>
+     * PTable<word, # of docs containing word>
+     */
+    PTable<String, Long> n = Aggregate.count(tf.parallelDo("little n (# of docs contain word)",  
+        new DoFn<Pair<Pair<String, String>, Long>, String>() {
+      @Override
+      public void process(Pair<Pair<String, String>, Long> input,
+          Emitter<String> emitter) {
+        emitter.emit(input.first().first());
+      }
+    }, ptf.strings()));
+    
+    /*
+     * Input: Pair<Pair<String, String>, Long>
+     * Pair<Pair<word, title>, count in title>
+     * 
+     * Output: PTable<String, Pair<String, Long>>
+     * PTable<word, Pair<title, count in title>>
+     */
+    PTable<String, Collection<Pair<String, Long>>> wordDocumentCountPair = tf.parallelDo("transform wordDocumentPairCount",
+        new DoFn<Pair<Pair<String, String>, Long>, Pair<String, Collection<Pair<String, Long>>>>() {
+          Collection<Pair<String, Long>> buffer;
+          String key;
+          @Override
+          public void process(Pair<Pair<String, String>, Long> input,
+        	  Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
+            Pair<String, String> wordDocumentPair = input.first();
+            if(!wordDocumentPair.first().equals(key)) {
+              flush(emitter);
+              key = wordDocumentPair.first();
+              buffer = Lists.newArrayList();
+            }
+            buffer.add(Pair.of(wordDocumentPair.second(), input.second()));            
+          }
+          protected void flush(Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
+            if(buffer != null) {
+              emitter.emit(Pair.of(key, buffer));
+              buffer = null;
+            }
+          }
+          @Override
+          public void cleanup(Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
+            flush(emitter);
+          }
+      }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.longs()))));
+
+    PTable<String, Pair<Long, Collection<Pair<String, Long>>>> joinedResults = Join.join(n, wordDocumentCountPair);
+
+    /*
+     * Input: Pair<String, Pair<Long, Collection<Pair<String, Long>>>
+     * Pair<word, Pair<# of docs containing word, Collection<Pair<title, term frequency>>>
+     * 
+     * Output: Pair<String, Collection<Pair<String, Double>>>
+     * Pair<word, Collection<Pair<title, tfidf>>>
+     */
+    return joinedResults.parallelDo("calculate tfidf",
+        new MapFn<Pair<String, Pair<Long, Collection<Pair<String, Long>>>>, Pair<String, Collection<Pair<String, Double>>>>() {
+          @Override
+          public Pair<String, Collection<Pair<String, Double>>> map(Pair<String, Pair<Long, Collection<Pair<String, Long>>>> input) {
+            Collection<Pair<String, Double>> tfidfs = Lists.newArrayList();
+            String word = input.first();
+            double n = input.second().first();
+            double idf = Math.log(N / n);
+            for(Pair<String, Long> tf : input.second().second()) {
+              double tfidf = tf.second() * idf;
+              tfidfs.add(Pair.of(tf.first(), tfidf));
+            }
+            return Pair.of(word, tfidfs);
+          }
+      
+    }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.doubles()))));
+  }
+  
+  public void run(Pipeline pipeline, PTypeFamily typeFamily, boolean singleRun) throws IOException {
+    File input = File.createTempFile("docs", "txt");
+    input.deleteOnExit();
+    Files.copy(newInputStreamSupplier(getResource("docs.txt")), input);
+    
+    String outputPath1 = getOutput();
+    String outputPath2 = getOutput();
+    
+    Path tfPath = new Path(getOutput("termfreq"));
+    
+    PCollection<String> docs = pipeline.readTextFile(input.getAbsolutePath());
+        
+    PTable<String, Collection<Pair<String, Double>>> results =
+        generateTFIDF(docs, tfPath, typeFamily);
+    pipeline.writeTextFile(results, outputPath1);
+    if (!singleRun) {
+      pipeline.run();
+    }
+    
+    PTable<String, Collection<Pair<String, Double>>> uppercased = results.parallelDo(
+        new MapKeysFn<String, String, Collection<Pair<String, Double>>>() {
+          @Override
+          public String map(String k1) {
+            return k1.toUpperCase();
+          } 
+        }, results.getPTableType());
+    pipeline.writeTextFile(uppercased, outputPath2);
+    pipeline.done();
+    
+    // Check the lowercase version...
+    File outputFile = new File(outputPath1, "part-r-00000");
+    outputFile.deleteOnExit();
+    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
+    boolean passed = false;
+    for (String line : lines) {
+      if (line.startsWith("the") && line.contains("B,0.6931471805599453")) {
+        passed = true;
+        break;
+      }
+    }
+    assertTrue(passed);
+    
+    // ...and the uppercase version
+    outputFile = new File(outputPath2, "part-r-00000");
+    outputFile.deleteOnExit();
+    lines = Files.readLines(outputFile, Charset.defaultCharset());
+    passed = false;
+    for (String line : lines) {
+      if (line.startsWith("THE") && line.contains("B,0.6931471805599453")) {
+        passed = true;
+        break;
+      }
+    }
+    assertTrue(passed);
+  }
+  
+  public static String getOutput() throws IOException {
+    return getOutput("output");
+  }
+  
+  public static String getOutput(String prefix) throws IOException {
+    File output = File.createTempFile(prefix, "");
+    String path = output.getAbsolutePath();
+    output.delete();
+    return path;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/TermFrequencyTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/TermFrequencyTest.java b/crunch/src/test/java/org/apache/crunch/TermFrequencyTest.java
new file mode 100644
index 0000000..fced0ee
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/TermFrequencyTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.junit.Test;
+
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+
+@SuppressWarnings("serial")
+public class TermFrequencyTest implements Serializable {  
+  
+  @Test
+  public void testTermFrequencyWithNoTransform() throws IOException {
+    run(new MRPipeline(TermFrequencyTest.class), WritableTypeFamily.getInstance(), false);
+  }
+  
+  @Test
+  public void testTermFrequencyWithTransform() throws IOException {
+    run(new MRPipeline(TermFrequencyTest.class), WritableTypeFamily.getInstance(), true);
+  }
+  
+  @Test
+  public void testTermFrequencyNoTransformInMemory() throws IOException {
+    run(MemPipeline.getInstance(), WritableTypeFamily.getInstance(), false);  
+  }
+
+  @Test
+  public void testTermFrequencyWithTransformInMemory() throws IOException {
+    run(MemPipeline.getInstance(), WritableTypeFamily.getInstance(), true);
+  }
+  
+
+  public void run(Pipeline pipeline, PTypeFamily typeFamily, boolean transformTF) throws IOException {
+    String input = FileHelper.createTempCopyOf("docs.txt");
+    
+    File transformedOutput = FileHelper.createOutputPath();
+    File tfOutput = FileHelper.createOutputPath();
+    
+    PCollection<String> docs = pipeline.readTextFile(input);
+    
+    PTypeFamily ptf = docs.getTypeFamily();
+    
+    /*
+     * Input: String
+     * Input title  text
+     * 
+     * Output: PTable<Pair<String, String>, Long> 
+     * Pair<Pair<word, title>, count in title>
+     */
+    PTable<Pair<String, String>, Long> tf = Aggregate.count(docs.parallelDo("term document frequency",
+        new DoFn<String, Pair<String, String>>() {
+      @Override
+      public void process(String doc, Emitter<Pair<String, String>> emitter) {
+        String[] kv = doc.split("\t");
+        String title = kv[0];
+        String text = kv[1];
+        for (String word : text.split("\\W+")) {
+          if(word.length() > 0) {
+            Pair<String, String> pair = Pair.of(word.toLowerCase(), title);
+            emitter.emit(pair);
+          }
+        }
+      }
+    }, ptf.pairs(ptf.strings(), ptf.strings())));
+    
+    if(transformTF) {
+      /*
+       * Input: Pair<Pair<String, String>, Long>
+       * Pair<Pair<word, title>, count in title>
+       * 
+       * Output: PTable<String, Pair<String, Long>>
+       * PTable<word, Pair<title, count in title>>
+       */
+      PTable<String, Pair<String, Long>> wordDocumentCountPair = tf.parallelDo("transform wordDocumentPairCount",
+          new MapFn<Pair<Pair<String, String>, Long>, Pair<String, Pair<String, Long>>>() {
+            @Override
+            public Pair<String, Pair<String, Long>> map(Pair<Pair<String, String>, Long> input) {
+              Pair<String, String> wordDocumentPair = input.first();            
+              return Pair.of(wordDocumentPair.first(), Pair.of(wordDocumentPair.second(), input.second()));
+            }
+        }, ptf.tableOf(ptf.strings(), ptf.pairs(ptf.strings(), ptf.longs())));
+      
+      pipeline.writeTextFile(wordDocumentCountPair, transformedOutput.getAbsolutePath());
+    }
+    
+    SourceTarget<String> st = At.textFile(tfOutput.getAbsolutePath());
+    pipeline.write(tf, st);
+    
+    pipeline.run();
+    
+    // test the case we should see
+    Iterable<String> lines = ((ReadableSourceTarget<String>) st).read(pipeline.getConfiguration());
+    boolean passed = false;
+    for (String line : lines) {
+      if ("[well,A]\t0".equals(line)) {
+        fail("Found " + line + " but well is in Document A 1 time");
+      }
+      if ("[well,A]\t1".equals(line)) {
+        passed = true;
+      }
+    }
+    assertTrue(passed);
+    pipeline.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/TextPairTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/TextPairTest.java b/crunch/src/test/java/org/apache/crunch/TextPairTest.java
new file mode 100644
index 0000000..2a966c4
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/TextPairTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.types.writable.Writables;
+
+public class TextPairTest  {
+
+  @Test
+  public void testWritables() throws IOException {
+    run(new MRPipeline(TextPairTest.class));
+  }
+  
+  private static final String CANARY = "Writables.STRING_TO_TEXT";
+  
+  public static PCollection<Pair<String, String>> wordDuplicate(PCollection<String> words) {
+    return words.parallelDo("my word duplicator", new DoFn<String, Pair<String, String>>() {
+      public void process(String line, Emitter<Pair<String, String>> emitter) {
+        for (String word : line.split("\\W+")) {
+          if(word.length() > 0) {
+            Pair<String, String> pair = Pair.of(CANARY, word);
+            emitter.emit(pair);
+          }
+        }
+      }
+    }, Writables.pairs(Writables.strings(), Writables.strings()));
+  }
+  
+  public void run(Pipeline pipeline) throws IOException {
+    String input = FileHelper.createTempCopyOf("shakes.txt");
+        
+    PCollection<String> shakespeare = pipeline.read(From.textFile(input));
+    Iterable<Pair<String, String>> lines = pipeline.materialize(wordDuplicate(shakespeare));    
+    boolean passed = false;
+    for (Pair<String, String> line : lines) {
+      if (line.first().contains(CANARY)) {
+        passed = true;
+        break;
+      }
+    }
+    
+    pipeline.done();
+    assertTrue(passed);
+  }  
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/TupleNClassCastBugTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/TupleNClassCastBugTest.java b/crunch/src/test/java/org/apache/crunch/TupleNClassCastBugTest.java
new file mode 100644
index 0000000..5c22e2b
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/TupleNClassCastBugTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static com.google.common.io.Resources.getResource;
+import static com.google.common.io.Resources.newInputStreamSupplier;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import com.google.common.io.Files;
+
+public class TupleNClassCastBugTest {
+
+  public static PCollection<TupleN> mapGroupDo(PCollection<String> lines, PTypeFamily ptf) {
+    PTable<String, TupleN> mapped = lines.parallelDo(new MapFn<String, Pair<String, TupleN>>() {
+
+      @Override
+      public Pair<String, TupleN> map(String line) {
+        String[] columns = line.split("\\t");
+        String docId = columns[0];
+        String docLine = columns[1];
+        return Pair.of(docId, new TupleN(docId, docLine));
+      }
+    }, ptf.tableOf(ptf.strings(), ptf.tuples(ptf.strings(), ptf.strings())));
+    return mapped.groupByKey().parallelDo(new DoFn<Pair<String, Iterable<TupleN>>, TupleN>() {
+      @Override
+      public void process(Pair<String, Iterable<TupleN>> input, Emitter<TupleN> tupleNEmitter) {
+        for (TupleN tuple : input.second()) {
+          tupleNEmitter.emit(tuple);
+        }
+      }
+    }, ptf.tuples(ptf.strings(), ptf.strings()));
+  }
+
+  @Test
+  public void testWritables() throws IOException {
+    run(new MRPipeline(TupleNClassCastBugTest.class), WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testAvro() throws IOException {
+    run(new MRPipeline(TupleNClassCastBugTest.class), AvroTypeFamily.getInstance());
+  }
+
+  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+    File input = File.createTempFile("docs", "txt");
+    input.deleteOnExit();
+    Files.copy(newInputStreamSupplier(getResource("docs.txt")), input);
+
+    File output = File.createTempFile("output", "");
+    String outputPath = output.getAbsolutePath();
+    output.delete();
+
+    PCollection<String> docLines = pipeline.readTextFile(input.getAbsolutePath());
+    pipeline.writeTextFile(mapGroupDo(docLines, typeFamily), outputPath);
+    pipeline.done();
+
+    // *** We are not directly testing the output, we are looking for a ClassCastException
+    // *** which is thrown in a different thread during the reduce phase. If all is well
+    // *** the file will exist and have six lines. Otherwise the bug is present.
+    File outputFile = new File(output, "part-r-00000");
+    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
+    int lineCount = 0;
+    for (String line : lines) {
+      lineCount++;
+    }
+    assertEquals(6, lineCount);
+    output.deleteOnExit();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/TupleTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/TupleTest.java b/crunch/src/test/java/org/apache/crunch/TupleTest.java
new file mode 100644
index 0000000..f259acc
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/TupleTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+
+import org.apache.crunch.types.TupleFactory;
+
+public class TupleTest {
+  private String first = "foo";
+  private Integer second = 1729;
+  private Double third = 64.2;
+  private Boolean fourth = false;
+  private Float fifth = 17.29f;
+  
+  @Test
+  public void testTuple3() {
+    Tuple3<String, Integer, Double> t = new Tuple3<String, Integer, Double>(first, second, third);
+    assertEquals(3, t.size());
+    assertEquals(first, t.first());
+    assertEquals(second, t.second());
+    assertEquals(third, t.third());
+    assertEquals(first, t.get(0));
+    assertEquals(second, t.get(1));
+    assertEquals(third, t.get(2));
+    try {
+      t.get(-1);
+      fail();
+    } catch (IndexOutOfBoundsException e) {
+      // expected
+    }
+  }
+  
+  @Test
+  public void testTuple3Equality() {
+    Tuple3<String, Integer, Double> t = new Tuple3<String, Integer, Double>(first, second, third);
+    assertTrue(t.equals(new Tuple3(first, second, third)));
+    assertFalse(t.equals(new Tuple3(first, null, third)));
+    assertFalse((new Tuple3(null, null, null)).equals(t));
+    assertTrue((new Tuple3(first, null, null)).equals(new Tuple3(first, null, null)));
+  }
+  
+  @Test
+  public void testTuple4() {
+    Tuple4<String, Integer, Double, Boolean> t = 
+      new Tuple4<String, Integer, Double, Boolean>(first, second, third, fourth);
+    assertEquals(4, t.size());
+    assertEquals(first, t.first());
+    assertEquals(second, t.second());
+    assertEquals(third, t.third());
+    assertEquals(fourth, t.fourth());
+    assertEquals(first, t.get(0));
+    assertEquals(second, t.get(1));
+    assertEquals(third, t.get(2));
+    assertEquals(fourth, t.get(3));
+    try {
+      t.get(-1);
+      fail();
+    } catch (IndexOutOfBoundsException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testTuple4Equality() {
+    Tuple4<String, Integer, Double, Boolean> t = 
+      new Tuple4<String, Integer, Double, Boolean>(first, second, third, fourth);
+    assertFalse(t.equals(new Tuple3(first, second, third)));
+    assertFalse(t.equals(new Tuple4(first, null, third, null)));
+    assertFalse((new Tuple4(null, null, null, null)).equals(t));
+    assertTrue((new Tuple4(first, null, third, null)).equals(
+        new Tuple4(first, null, third, null)));
+  }
+
+  @Test
+  public void testTupleN() {
+    TupleN t = new TupleN(first, second, third, fourth, fifth);
+    assertEquals(5, t.size());
+    assertEquals(first, t.get(0));
+    assertEquals(second, t.get(1));
+    assertEquals(third, t.get(2));
+    assertEquals(fourth, t.get(3));
+    assertEquals(fifth, t.get(4));
+    try {
+      t.get(-1);
+      fail();
+    } catch (IndexOutOfBoundsException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testTupleNEquality() {
+	TupleN t = new TupleN(first, second, third, fourth, fifth);
+	assertTrue(t.equals(new TupleN(first, second, third, fourth, fifth)));
+    assertFalse(t.equals(new TupleN(first, null, third, null)));
+    assertFalse((new TupleN(null, null, null, null, null)).equals(t));
+    assertTrue((new TupleN(first, second, third, null, null)).equals(
+        new TupleN(first, second, third, null, null)));
+  }
+
+  @Test
+  public void testTupleFactory() {
+    checkTuple(TupleFactory.PAIR.makeTuple("a", "b"), Pair.class, "a", "b");
+    checkTuple(TupleFactory.TUPLE3.makeTuple("a", "b", "c"), Tuple3.class, "a", "b", "c");
+    checkTuple(TupleFactory.TUPLE4.makeTuple("a", "b", "c", "d"), Tuple4.class, "a", "b", "c", "d");
+    checkTuple(TupleFactory.TUPLEN.makeTuple("a", "b", "c", "d", "e"), TupleN.class, "a", "b", "c", "d", "e");
+
+    checkTuple(TupleFactory.TUPLEN.makeTuple("a", "b"), TupleN.class, "a", "b");
+    checkTuple(TupleFactory.TUPLEN.makeTuple("a", "b", "c"), TupleN.class, "a", "b", "c");
+    checkTuple(TupleFactory.TUPLEN.makeTuple("a", "b", "c", "d"), TupleN.class, "a", "b", "c", "d");
+    checkTuple(TupleFactory.TUPLEN.makeTuple("a", "b", "c", "d", "e"), TupleN.class, "a", "b", "c", "d", "e");
+  }
+
+  private void checkTuple(Tuple t, Class<? extends Tuple> type, Object... values) {
+    assertEquals(type, t.getClass());
+    assertEquals(values.length, t.size());
+    for (int i = 0; i < values.length; i++)
+      assertEquals(values[i], t.get(i));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/test/java/org/apache/crunch/WordCountHBaseTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/WordCountHBaseTest.java b/crunch/src/test/java/org/apache/crunch/WordCountHBaseTest.java
new file mode 100644
index 0000000..10459ce
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/WordCountHBaseTest.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Random;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.hbase.HBaseSourceTarget;
+import org.apache.crunch.io.hbase.HBaseTarget;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.crunch.util.DistCache;
+import com.google.common.io.ByteStreams;
+
+public class WordCountHBaseTest {
+  protected static final Log LOG = LogFactory.getLog(WordCountHBaseTest.class);
+
+  private static final byte[] COUNTS_COLFAM = Bytes.toBytes("cf");
+  private static final byte[] WORD_COLFAM = Bytes.toBytes("cf");
+
+  private HBaseTestingUtility hbaseTestUtil = new HBaseTestingUtility();
+  
+  @SuppressWarnings("serial")
+  public static PCollection<Put> wordCount(PTable<ImmutableBytesWritable, Result> words) {
+    PTable<String, Long> counts = Aggregate.count(words.parallelDo(
+        new DoFn<Pair<ImmutableBytesWritable, Result>, String>() {
+          @Override
+          public void process(Pair<ImmutableBytesWritable, Result> row, Emitter<String> emitter) {
+            byte[] word = row.second().getValue(WORD_COLFAM, null);
+            if (word != null) {
+              emitter.emit(Bytes.toString(word));
+            }
+          }
+        }, words.getTypeFamily().strings()));
+
+    return counts.parallelDo("convert to put",
+        new DoFn<Pair<String, Long>, Put>() {
+          @Override
+          public void process(Pair<String, Long> input, Emitter<Put> emitter) {
+            Put put = new Put(Bytes.toBytes(input.first()));
+            put.add(COUNTS_COLFAM, null,
+                Bytes.toBytes(input.second()));
+            emitter.emit(put);
+          }
+
+        }, Writables.writables(Put.class));
+  }
+
+  @SuppressWarnings("deprecation")
+  @Before
+  public void setUp() throws Exception {
+    Configuration conf = hbaseTestUtil.getConfiguration();
+    File tmpDir = File.createTempFile("logdir", "");
+    tmpDir.delete();
+    tmpDir.mkdir();
+    tmpDir.deleteOnExit();
+    conf.set("hadoop.log.dir", tmpDir.getAbsolutePath());
+    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+    conf.setInt("hbase.master.info.port", -1);
+    conf.setInt("hbase.regionserver.info.port", -1);
+
+    hbaseTestUtil.startMiniZKCluster();
+    hbaseTestUtil.startMiniCluster();
+    hbaseTestUtil.startMiniMapReduceCluster(1);
+    
+    // For Hadoop-2.0.0, we have to do a bit more work.
+    if (TaskAttemptContext.class.isInterface()) {
+      conf = hbaseTestUtil.getConfiguration();
+      FileSystem fs = FileSystem.get(conf);
+      Path tmpPath = new Path("target", "WordCountHBaseTest-tmpDir");
+      FileSystem localFS = FileSystem.getLocal(conf);
+      for (FileStatus jarFile : localFS.listStatus(new Path("target/lib/"))) {
+        Path target = new Path(tmpPath, jarFile.getPath().getName());
+        fs.copyFromLocalFile(jarFile.getPath(), target);
+        DistributedCache.addFileToClassPath(target, conf, fs);
+      }
+    
+      // Create a programmatic container for this jar.
+      JarOutputStream jos = new JarOutputStream(new FileOutputStream("WordCountHBaseTest.jar"));
+      File baseDir = new File("target/test-classes");
+      String prefix = "org/apache/crunch/";
+      jarUp(jos, baseDir, prefix + "WordCountHBaseTest.class");
+      jarUp(jos, baseDir, prefix + "WordCountHBaseTest$1.class");
+      jarUp(jos, baseDir, prefix + "WordCountHBaseTest$2.class");
+      jos.close();
+
+      Path target = new Path(tmpPath, "WordCountHBaseTest.jar");
+      fs.copyFromLocalFile(true, new Path("WordCountHBaseTest.jar"), target);
+      DistributedCache.addFileToClassPath(target, conf, fs);
+    }
+  }
+  
+  private void jarUp(JarOutputStream jos, File baseDir, String classDir) throws IOException {
+    File file = new File(baseDir, classDir);
+    JarEntry e = new JarEntry(classDir);
+    e.setTime(file.lastModified());
+    jos.putNextEntry(e);
+    ByteStreams.copy(new FileInputStream(file), jos);
+    jos.closeEntry();
+  }
+  
+  @Test
+  public void testWordCount() throws IOException {
+    run(new MRPipeline(WordCountHBaseTest.class, hbaseTestUtil.getConfiguration()));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    hbaseTestUtil.shutdownMiniMapReduceCluster();
+    hbaseTestUtil.shutdownMiniCluster();
+    hbaseTestUtil.shutdownMiniZKCluster();
+  }
+  
+  public void run(Pipeline pipeline) throws IOException {
+    
+    Random rand = new Random();
+    int postFix = Math.abs(rand.nextInt());
+    String inputTableName = "crunch_words_" + postFix;
+    String outputTableName = "crunch_counts_" + postFix;
+
+    try {
+      
+      HTable inputTable = hbaseTestUtil.createTable(Bytes.toBytes(inputTableName),
+          WORD_COLFAM);
+      HTable outputTable = hbaseTestUtil.createTable(Bytes.toBytes(outputTableName),
+          COUNTS_COLFAM);
+  
+      int key = 0;
+      key = put(inputTable, key, "cat");
+      key = put(inputTable, key, "cat");
+      key = put(inputTable, key, "dog");
+      Scan scan = new Scan();
+      scan.addColumn(WORD_COLFAM, null);
+      HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan);
+      PTable<ImmutableBytesWritable, Result> shakespeare = pipeline.read(source);
+      pipeline.write(wordCount(shakespeare), new HBaseTarget(outputTableName));
+      pipeline.done();
+      
+      assertIsLong(outputTable, "cat", 2);
+      assertIsLong(outputTable, "dog", 1);    
+    } finally {
+      // not quite sure...
+    }
+  }
+  
+  protected int put(HTable table, int key, String value) throws IOException {
+    Put put = new Put(Bytes.toBytes(key));
+    put.add(WORD_COLFAM, null, Bytes.toBytes(value));    
+    table.put(put);
+    return key + 1;
+  }
+  
+  protected void assertIsLong(HTable table, String key, long i) throws IOException {
+    Get get = new Get(Bytes.toBytes(key));
+    get.addColumn(COUNTS_COLFAM, null);
+    Result result = table.get(get);
+    
+    byte[] rawCount = result.getValue(COUNTS_COLFAM, null);
+    assertTrue(rawCount != null);
+    assertEquals(new Long(i), new Long(Bytes.toLong(rawCount)));
+  }
+}


Mime
View raw message