crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [41/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues
Date Tue, 23 Apr 2013 20:41:43 GMT
http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/fn/AggregatorsIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/fn/AggregatorsIT.java b/crunch-core/src/it/java/org/apache/crunch/fn/AggregatorsIT.java
new file mode 100644
index 0000000..c9584a1
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/fn/AggregatorsIT.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.apache.crunch.fn.Aggregators.SUM_INTS;
+import static org.apache.crunch.fn.Aggregators.pairAggregator;
+import static org.apache.crunch.types.writable.Writables.ints;
+import static org.apache.crunch.types.writable.Writables.pairs;
+import static org.apache.crunch.types.writable.Writables.strings;
+import static org.apache.crunch.types.writable.Writables.tableOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.test.Tests;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+
+@RunWith(Parameterized.class)
+public class AggregatorsIT {
+  private Pipeline pipeline;
+
+  @Parameters
+  public static Collection<Object[]> params() {
+    return Tests.pipelinesParams(AggregatorsIT.class);
+  }
+
+  public AggregatorsIT(Pipeline pipeline) {
+    this.pipeline = pipeline;
+  }
+
+  @Test
+  public void testPairAggregator() {
+    PCollection<String> lines = pipeline.readTextFile(Tests.pathTo(this, "ints.txt"));
+
+    PTable<String, Pair<Integer, Integer>> table = lines.parallelDo(new SplitLine(),
+        tableOf(strings(), pairs(ints(), ints())));
+
+    PTable<String, Pair<Integer, Integer>> combinedTable = table.groupByKey().combineValues(
+        pairAggregator(SUM_INTS(), SUM_INTS()));
+
+    Map<String, Pair<Integer, Integer>> result = combinedTable.asMap().getValue();
+
+    assertThat(result.size(), is(2));
+    assertThat(result.get("a"), is(Pair.of(9,  12)));
+    assertThat(result.get("b"), is(Pair.of(11,  13)));
+  }
+
+  private static final class SplitLine extends MapFn<String, Pair<String, Pair<Integer, Integer>>> {
+    @Override
+    public Pair<String, Pair<Integer, Integer>> map(String input) {
+      String[] split = input.split("\t");
+      return Pair.of(split[0],
+          Pair.of(Integer.parseInt(split[1]), Integer.parseInt(split[2])));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
new file mode 100644
index 0000000..976a43e
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mem;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+
+public class MemPipelineFileWritingIT {
+  @Rule
+  public TemporaryPath baseTmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testMemPipelineFileWriter() throws Exception {
+    File tmpDir = baseTmpDir.getFile("mempipe");
+    Pipeline p = MemPipeline.getInstance();
+    PCollection<String> lines = MemPipeline.collectionOf("hello", "world");
+    p.writeTextFile(lines, tmpDir.toString());
+    p.done();
+    assertTrue(tmpDir.exists());
+    File[] files = tmpDir.listFiles();
+    assertTrue(files != null && files.length > 0);
+    for (File f : files) {
+      if (!f.getName().startsWith(".")) {
+        List<String> txt = Files.readLines(f, Charsets.UTF_8);
+        assertEquals(ImmutableList.of("hello", "world"), txt);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
new file mode 100644
index 0000000..f9f73b2
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.collect;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTableKeyValueIT;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+
+@RunWith(value = Parameterized.class)
+public class UnionCollectionIT {
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  private static final Log LOG = LogFactory.getLog(UnionCollectionIT.class);
+
+  private PTypeFamily typeFamily;
+  private Pipeline pipeline;
+  private PCollection<String> union;
+
+  private ArrayList<String> EXPECTED = Lists.newArrayList("a", "a", "b", "c", "c", "d", "e");
+
+  private Class pipelineClass;
+
+  @Before
+  @SuppressWarnings("unchecked")
+  public void setUp() throws IOException {
+    String inputFile1 = tmpDir.copyResourceFileName("set1.txt");
+    String inputFile2 = tmpDir.copyResourceFileName("set2.txt");
+    if (pipelineClass == null) {
+      pipeline = MemPipeline.getInstance();
+    } else {
+      pipeline = new MRPipeline(pipelineClass, tmpDir.getDefaultConfiguration());
+    }
+    PCollection<String> firstCollection = pipeline.read(At.textFile(inputFile1, typeFamily.strings()));
+    PCollection<String> secondCollection = pipeline.read(At.textFile(inputFile2, typeFamily.strings()));
+
+    LOG.info("Test fixture: [" + pipeline.getClass().getSimpleName() + " : " + typeFamily.getClass().getSimpleName()
+        + "]  First: " + Lists.newArrayList(firstCollection.materialize().iterator()) + ", Second: "
+        + Lists.newArrayList(secondCollection.materialize().iterator()));
+
+    union = secondCollection.union(firstCollection);
+  }
+
+  @Parameters
+  public static Collection<Object[]> data() throws IOException {
+    Object[][] data = new Object[][] { { WritableTypeFamily.getInstance(), PTableKeyValueIT.class },
+        { WritableTypeFamily.getInstance(), null }, { AvroTypeFamily.getInstance(), PTableKeyValueIT.class },
+        { AvroTypeFamily.getInstance(), null } };
+    return Arrays.asList(data);
+  }
+
+  public UnionCollectionIT(PTypeFamily typeFamily, Class pipelineClass) {
+    this.typeFamily = typeFamily;
+    this.pipelineClass = pipelineClass;
+  }
+
+  @Test
+  public void unionMaterializeShouldNotThrowNPE() throws Exception {
+    checkMaterialized(union.materialize());
+    checkMaterialized(pipeline.materialize(union));
+  }
+
+  private void checkMaterialized(Iterable<String> materialized) {
+    List<String> materializedValues = Lists.newArrayList(materialized.iterator());
+    Collections.sort(materializedValues);
+    LOG.info("Materialized union: " + materializedValues);
+    assertEquals(EXPECTED, materializedValues);
+  }
+
+  @Test
+  public void unionWriteShouldNotThrowNPE() throws IOException {
+    String outputPath1 = tmpDir.getFileName("output1");
+    String outputPath2 = tmpDir.getFileName("output2");
+    String outputPath3 = tmpDir.getFileName("output3");
+
+    if (typeFamily == AvroTypeFamily.getInstance()) {
+      union.write(To.avroFile(outputPath1));
+      pipeline.write(union, To.avroFile(outputPath2));
+
+      pipeline.run();
+
+      checkFileContents(outputPath1);
+      checkFileContents(outputPath2);
+
+    } else {
+
+      union.write(To.textFile(outputPath1));
+      pipeline.write(union, To.textFile(outputPath2));
+      pipeline.writeTextFile(union, outputPath3);
+
+      pipeline.run();
+
+      checkFileContents(outputPath1);
+      checkFileContents(outputPath2);
+      checkFileContents(outputPath3);
+    }
+  }
+
+  private void checkFileContents(String filePath) throws IOException {
+
+    List<String> fileContentValues = (typeFamily != AvroTypeFamily.getInstance() || !(pipeline instanceof MRPipeline)) ? Lists
+        .newArrayList(pipeline.read(At.textFile(filePath, typeFamily.strings())).materialize().iterator()) : Lists
+        .newArrayList(pipeline.read(At.avroFile(filePath, Avros.strings())).materialize().iterator());
+
+    Collections.sort(fileContentValues);
+
+    LOG.info("Saved Union: " + fileContentValues);
+    assertEquals(EXPECTED, fileContentValues);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java b/crunch-core/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
new file mode 100644
index 0000000..08d226d
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/CompositePathIterableIT.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.crunch.io.text.TextFileReaderFactory;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class CompositePathIterableIT {
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testCreate_FilePresent() throws IOException {
+    String inputFilePath = tmpDir.copyResourceFileName("set1.txt");
+    Configuration conf = new Configuration();
+    LocalFileSystem local = FileSystem.getLocal(conf);
+
+    Iterable<String> iterable = CompositePathIterable.create(local, new Path(inputFilePath),
+        new TextFileReaderFactory<String>(Writables.strings()));
+
+    assertEquals(Lists.newArrayList("b", "c", "a", "e"), Lists.newArrayList(iterable));
+
+  }
+
+  @Test
+  public void testCreate_DirectoryPresentButNoFiles() throws IOException {
+    Path emptyInputDir = tmpDir.getRootPath();
+
+    Configuration conf = new Configuration();
+    LocalFileSystem local = FileSystem.getLocal(conf);
+
+    Iterable<String> iterable = CompositePathIterable.create(local, emptyInputDir,
+        new TextFileReaderFactory<String>(Writables.strings()));
+
+    assertTrue(Lists.newArrayList(iterable).isEmpty());
+  }
+
+  @Test(expected = IOException.class)
+  public void testCreate_DirectoryNotPresent() throws IOException {
+    File nonExistentDir = tmpDir.getFile("not-there");
+
+    // Sanity check
+    assertFalse(nonExistentDir.exists());
+
+    Configuration conf = new Configuration();
+    LocalFileSystem local = FileSystem.getLocal(conf);
+
+    CompositePathIterable.create(local, new Path(nonExistentDir.getAbsolutePath()), new TextFileReaderFactory<String>(
+        Writables.strings()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/io/NLineInputIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/NLineInputIT.java b/crunch-core/src/it/java/org/apache/crunch/io/NLineInputIT.java
new file mode 100644
index 0000000..52b8ff5
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/NLineInputIT.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.text.NLineFileSource;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class NLineInputIT {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+  
+  @Test
+  public void testNLine() throws Exception {
+    String urlsInputPath = tmpDir.copyResourceFileName("urls.txt");
+    Configuration conf = new Configuration(tmpDir.getDefaultConfiguration());
+    conf.setInt("io.sort.mb", 10);
+    Pipeline pipeline = new MRPipeline(NLineInputIT.class, conf);
+    PCollection<String> urls = pipeline.read(new NLineFileSource<String>(urlsInputPath,
+        Writables.strings(), 2));
+    assertEquals(new Integer(2),
+        urls.parallelDo(new LineCountFn(), Avros.ints()).max().getValue());
+  }
+  
+  private static class LineCountFn extends DoFn<String, Integer> {
+
+    private int lineCount = 0;
+    
+    @Override
+    public void initialize() {
+      this.lineCount = 0;
+    }
+    
+    @Override
+    public void process(String input, Emitter<Integer> emitter) {
+      lineCount++;
+    }
+    
+    @Override
+    public void cleanup(Emitter<Integer> emitter) {
+      emitter.emit(lineCount);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/io/TextFileTableIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/TextFileTableIT.java b/crunch-core/src/it/java/org/apache/crunch/io/TextFileTableIT.java
new file mode 100644
index 0000000..bddc0b5
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/TextFileTableIT.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import static org.apache.crunch.types.writable.Writables.*;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Set;
+
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.text.TextFileTableSource;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+
+/**
+ *
+ */
+public class TextFileTableIT {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+  
+  @Test
+  public void testTextFileTable() throws Exception {
+    String urlsFile = tmpDir.copyResourceFileName("urls.txt");
+    Pipeline pipeline = new MRPipeline(TextFileTableIT.class, tmpDir.getDefaultConfiguration());
+    PTable<String, String> urls = pipeline.read(
+        new TextFileTableSource<String, String>(urlsFile, tableOf(strings(), strings())));
+    Set<Pair<String, Long>> cnts = ImmutableSet.copyOf(urls.keys().count().materialize());
+    assertEquals(ImmutableSet.of(Pair.of("www.A.com", 4L), Pair.of("www.B.com", 2L),
+        Pair.of("www.C.com", 1L), Pair.of("www.D.com", 1L), Pair.of("www.E.com", 1L),
+        Pair.of("www.F.com", 2L)), cnts);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
new file mode 100644
index 0000000..671b920
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroFileSourceTargetIT.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+@SuppressWarnings("serial")
+public class AvroFileSourceTargetIT implements Serializable {
+
+  private transient File avroFile;
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Before
+  public void setUp() throws IOException {
+    avroFile = tmpDir.getFile("test.avro");
+  }
+
+  private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException {
+    FileOutputStream outputStream = new FileOutputStream(this.avroFile);
+    GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(schema);
+
+    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter);
+    dataFileWriter.create(schema, outputStream);
+
+    for (GenericRecord record : genericRecords) {
+      dataFileWriter.append(record);
+    }
+
+    dataFileWriter.close();
+    outputStream.close();
+
+  }
+
+  @Test
+  public void testSpecific() throws IOException {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+        Avros.records(Person.class)));
+
+    List<Person> personList = Lists.newArrayList(genericCollection.materialize());
+
+    Person expectedPerson = new Person();
+    expectedPerson.name = "John Doe";
+    expectedPerson.age = 42;
+
+    List<CharSequence> siblingNames = Lists.newArrayList();
+    siblingNames.add("Jimmy");
+    siblingNames.add("Jane");
+    expectedPerson.siblingnames = siblingNames;
+
+    assertEquals(Lists.newArrayList(expectedPerson), Lists.newArrayList(personList));
+  }
+
+  @Test
+  public void testGeneric() throws IOException {
+    String genericSchemaJson = Person.SCHEMA$.toString().replace("Person", "GenericPerson");
+    Schema genericPersonSchema = new Schema.Parser().parse(genericSchemaJson);
+    GenericRecord savedRecord = new GenericData.Record(genericPersonSchema);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), genericPersonSchema);
+
+    Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Record> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+        Avros.generics(genericPersonSchema)));
+
+    List<Record> recordList = Lists.newArrayList(genericCollection.materialize());
+
+    assertEquals(Lists.newArrayList(savedRecord), Lists.newArrayList(recordList));
+  }
+
+  @Test
+  public void testReflect() throws IOException {
+    Schema pojoPersonSchema = ReflectData.get().getSchema(StringWrapper.class);
+    GenericRecord savedRecord = new GenericData.Record(pojoPersonSchema);
+    savedRecord.put("value", "stringvalue");
+    populateGenericFile(Lists.newArrayList(savedRecord), pojoPersonSchema);
+
+    Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<StringWrapper> stringValueCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+        Avros.reflects(StringWrapper.class)));
+
+    List<StringWrapper> recordList = Lists.newArrayList(stringValueCollection.materialize());
+
+    assertEquals(1, recordList.size());
+    StringWrapper stringWrapper = recordList.get(0);
+    assertEquals("stringvalue", stringWrapper.getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
new file mode 100644
index 0000000..29bf4f5
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPipelineIT.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.crunch.io.avro;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.FileUtils;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class AvroPipelineIT implements Serializable {
+
+  private transient File avroFile;
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Before
+  public void setUp() throws IOException {
+    avroFile = tmpDir.getFile("test.avro");
+  }
+
+  private void populateGenericFile(List<GenericRecord> genericRecords, Schema schema) throws IOException {
+    FileOutputStream outputStream = new FileOutputStream(this.avroFile);
+    GenericDatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(schema);
+
+    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter);
+    dataFileWriter.create(schema, outputStream);
+
+    for (GenericRecord record : genericRecords) {
+      dataFileWriter.append(record);
+    }
+
+    dataFileWriter.close();
+    outputStream.close();
+
+  }
+
+  @Test
+  public void toTextShouldWriteAvroDataAsDatumText() throws Exception {
+    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(AvroFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Person> genericCollection = pipeline.read(At.avroFile(avroFile.getAbsolutePath(),
+        Avros.records(Person.class)));
+    File outputFile = tmpDir.getFile("output");
+    Target textFile = To.textFile(outputFile.getAbsolutePath());
+    pipeline.write(genericCollection, textFile);
+    pipeline.run();
+    Person person = genericCollection.materialize().iterator().next();
+    String outputString = FileUtils.readFileToString(new File(outputFile, "part-m-00000"));
+    assertTrue(outputString.contains(person.toString()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
new file mode 100644
index 0000000..7a90517
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class AvroReflectIT implements Serializable {
+
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testReflection() throws IOException {
+    Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<StringWrapper> stringWrapperCollection = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"))
+        .parallelDo(new MapFn<String, StringWrapper>() {
+
+          @Override
+          public StringWrapper map(String input) {
+            StringWrapper stringWrapper = new StringWrapper();
+            stringWrapper.setValue(input);
+            return stringWrapper;
+          }
+        }, Avros.reflects(StringWrapper.class));
+
+    List<StringWrapper> stringWrappers = Lists.newArrayList(stringWrapperCollection.materialize());
+
+    pipeline.done();
+
+    assertEquals(Lists.newArrayList(new StringWrapper("b"), new StringWrapper("c"), new StringWrapper("a"),
+        new StringWrapper("e")), stringWrappers);
+
+  }
+
+  // Verify that running with a combination of reflect and specific schema
+  // doesn't crash
+  @Test
+  public void testCombinationOfReflectionAndSpecific() throws IOException {
+    Assume.assumeTrue(Avros.CAN_COMBINE_SPECIFIC_AND_REFLECT_SCHEMAS);
+    Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Pair<StringWrapper, Person>> hybridPairCollection = pipeline.readTextFile(
+        tmpDir.copyResourceFileName("set1.txt")).parallelDo(new MapFn<String, Pair<StringWrapper, Person>>() {
+
+      @Override
+      public Pair<StringWrapper, Person> map(String input) {
+        Person person = new Person();
+        person.name = input;
+        person.age = 42;
+        person.siblingnames = Lists.<CharSequence> newArrayList(input);
+
+        return Pair.of(new StringWrapper(input), person);
+      }
+    }, Avros.pairs(Avros.reflects(StringWrapper.class), Avros.records(Person.class)));
+
+    PCollection<Pair<String, Long>> countCollection = Aggregate.count(hybridPairCollection).parallelDo(
+        new MapFn<Pair<Pair<StringWrapper, Person>, Long>, Pair<String, Long>>() {
+
+          @Override
+          public Pair<String, Long> map(Pair<Pair<StringWrapper, Person>, Long> input) {
+            return Pair.of(input.first().first().getValue(), input.second());
+          }
+        }, Avros.pairs(Avros.strings(), Avros.longs()));
+
+    List<Pair<String, Long>> materialized = Lists.newArrayList(countCollection.materialize());
+    List<Pair<String, Long>> expected = Lists.newArrayList(Pair.of("a", 1L), Pair.of("b", 1L), Pair.of("c", 1L),
+        Pair.of("e", 1L));
+    Collections.sort(materialized);
+
+    assertEquals(expected, materialized);
+    pipeline.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroWritableIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroWritableIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroWritableIT.java
new file mode 100644
index 0000000..cbb7fde
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroWritableIT.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import static org.apache.crunch.types.avro.Avros.ints;
+import static org.apache.crunch.types.avro.Avros.tableOf;
+import static org.apache.crunch.types.avro.Avros.writables;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.hadoop.io.DoubleWritable;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Verify handling of both a ByteBuffer and byte array as input from an Avro job (depending
+ * on the version of Avro being used).
+ */
+public class AvroWritableIT implements Serializable {
+
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+  
+  @Test
+  public void testAvroBasedWritablePipeline() throws Exception {
+    String customersInputPath = tmpDir.copyResourceFileName("customers.txt");
+    Pipeline pipeline = new MRPipeline(AvroWritableIT.class, tmpDir.getDefaultConfiguration());
+    pipeline.enableDebug();
+    PCollection<String> customerLines = pipeline.readTextFile(customersInputPath);
+    Map<Integer, DoubleWritable> outputMap = customerLines.parallelDo(
+        new MapFn<String, Pair<Integer, DoubleWritable>>() {
+          @Override
+          public Pair<Integer, DoubleWritable> map(String input) {
+            int len = input.length();
+            return Pair.of(len, new DoubleWritable(len));
+          }
+        }, tableOf(ints(), writables(DoubleWritable.class)))
+    .groupByKey()
+    .combineValues(new CombineFn<Integer, DoubleWritable>() {
+      @Override
+      public void process(Pair<Integer, Iterable<DoubleWritable>> input,
+          Emitter<Pair<Integer, DoubleWritable>> emitter) {
+        double sum = 0.0;
+        for (DoubleWritable dw : input.second()) {
+          sum += dw.get();
+        }
+        emitter.emit(Pair.of(input.first(), new DoubleWritable(sum)));
+      }
+    })
+    .materializeToMap();
+    
+    Map<Integer, DoubleWritable> expectedMap = Maps.newHashMap();
+    expectedMap.put(17, new DoubleWritable(17.0));
+    expectedMap.put(16, new DoubleWritable(16.0));
+    expectedMap.put(12, new DoubleWritable(24.0));
+   
+    assertEquals(expectedMap, outputMap);
+    
+    pipeline.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
new file mode 100644
index 0000000..56ee3ac
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.apache.crunch.types.writable.Writables.strings;
+import static org.apache.crunch.types.writable.Writables.tableOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.Employee;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.io.Text;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class AggregateIT {
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testWritables() throws Exception {
+    Pipeline pipeline = new MRPipeline(AggregateIT.class, tmpDir.getDefaultConfiguration());
+    String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
+    PCollection<String> shakes = pipeline.readTextFile(shakesInputPath);
+    runMinMax(shakes, WritableTypeFamily.getInstance());
+    pipeline.done();
+  }
+
+  @Test
+  public void testAvro() throws Exception {
+    Pipeline pipeline = new MRPipeline(AggregateIT.class, tmpDir.getDefaultConfiguration());
+    String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
+    PCollection<String> shakes = pipeline.readTextFile(shakesInputPath);
+    runMinMax(shakes, AvroTypeFamily.getInstance());
+    pipeline.done();
+  }
+
+  @Test
+  public void testInMemoryAvro() throws Exception {
+    PCollection<String> someText = MemPipeline.collectionOf("first line", "second line", "third line");
+    runMinMax(someText, AvroTypeFamily.getInstance());
+  }
+
+  public static void runMinMax(PCollection<String> shakes, PTypeFamily family) throws Exception {
+    PCollection<Integer> lengths = shakes.parallelDo(new MapFn<String, Integer>() {
+      @Override
+      public Integer map(String input) {
+        return input.length();
+      }
+    }, family.ints());
+    PCollection<Integer> negLengths = lengths.parallelDo(new MapFn<Integer, Integer>() {
+      @Override
+      public Integer map(Integer input) {
+        return -input;
+      }
+    }, family.ints());
+    Integer maxLengths = Aggregate.max(lengths).getValue();
+    Integer minLengths = Aggregate.min(negLengths).getValue();
+    assertTrue(maxLengths != null);
+    assertTrue(minLengths != null);
+    assertEquals(maxLengths.intValue(), -minLengths.intValue());
+  }
+
+  private static class SplitFn extends MapFn<String, Pair<String, String>> {
+    @Override
+    public Pair<String, String> map(String input) {
+      String[] p = input.split("\\s+");
+      return Pair.of(p[0], p[1]);
+    }
+  }
+
+  @Test
+  public void testCollectUrls() throws Exception {
+    Pipeline p = new MRPipeline(AggregateIT.class, tmpDir.getDefaultConfiguration());
+    String urlsInputPath = tmpDir.copyResourceFileName("urls.txt");
+    PTable<String, Collection<String>> urls = Aggregate.collectValues(p.readTextFile(urlsInputPath).parallelDo(
+        new SplitFn(), tableOf(strings(), strings())));
+    for (Pair<String, Collection<String>> e : urls.materialize()) {
+      String key = e.first();
+      int expectedSize = 0;
+      if ("www.A.com".equals(key)) {
+        expectedSize = 4;
+      } else if ("www.B.com".equals(key) || "www.F.com".equals(key)) {
+        expectedSize = 2;
+      } else if ("www.C.com".equals(key) || "www.D.com".equals(key) || "www.E.com".equals(key)) {
+        expectedSize = 1;
+      }
+      assertEquals("Checking key = " + key, expectedSize, e.second().size());
+      p.done();
+    }
+  }
+
+  @Test
+  public void testTopN() throws Exception {
+    PTableType<String, Integer> ptype = Avros.tableOf(Avros.strings(), Avros.ints());
+    PTable<String, Integer> counts = MemPipeline.typedTableOf(ptype, "foo", 12, "bar", 17, "baz", 29);
+
+    PTable<String, Integer> top2 = Aggregate.top(counts, 2, true);
+    assertEquals(ImmutableList.of(Pair.of("baz", 29), Pair.of("bar", 17)), top2.materialize());
+
+    PTable<String, Integer> bottom2 = Aggregate.top(counts, 2, false);
+    assertEquals(ImmutableList.of(Pair.of("foo", 12), Pair.of("bar", 17)), bottom2.materialize());
+  }
+
+  @Test
+  public void testCollectValues_Writables() throws IOException {
+    Pipeline pipeline = new MRPipeline(AggregateIT.class, tmpDir.getDefaultConfiguration());
+    Map<Integer, Collection<Text>> collectionMap = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))
+        .parallelDo(new MapStringToTextPair(), Writables.tableOf(Writables.ints(), Writables.writables(Text.class)))
+        .collectValues().materializeToMap();
+
+    assertEquals(1, collectionMap.size());
+
+    assertTrue(collectionMap.get(1).containsAll(Lists.newArrayList(new Text("c"), new Text("d"), new Text("a"))));
+  }
+
+  @Test
+  public void testCollectValues_Avro() throws IOException {
+
+    MapStringToEmployeePair mapFn = new MapStringToEmployeePair();
+    Pipeline pipeline = new MRPipeline(AggregateIT.class, tmpDir.getDefaultConfiguration());
+    Map<Integer, Collection<Employee>> collectionMap = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))
+        .parallelDo(mapFn, Avros.tableOf(Avros.ints(), Avros.records(Employee.class))).collectValues()
+        .materializeToMap();
+
+    assertEquals(1, collectionMap.size());
+
+    Employee empC = mapFn.map("c").second();
+    Employee empD = mapFn.map("d").second();
+    Employee empA = mapFn.map("a").second();
+
+    assertTrue(collectionMap.get(1).containsAll(Lists.newArrayList(empC, empD, empA)));
+  }
+
+  private static class MapStringToTextPair extends MapFn<String, Pair<Integer, Text>> {
+    @Override
+    public Pair<Integer, Text> map(String input) {
+      return Pair.of(1, new Text(input));
+    }
+  }
+
+  private static class MapStringToEmployeePair extends MapFn<String, Pair<Integer, Employee>> {
+    @Override
+    public Pair<Integer, Employee> map(String input) {
+      Employee emp = new Employee();
+      emp.name = input;
+      emp.salary = 0;
+      emp.department = "";
+      return Pair.of(1, emp);
+    }
+  }
+
+  public static class PojoText {
+    private String value;
+
+    public PojoText() {
+      this("");
+    }
+
+    public PojoText(String value) {
+      this.value = value;
+    }
+
+    public String getValue() {
+      return value;
+    }
+
+    public void setValue(String value) {
+      this.value = value;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("PojoText<%s>", this.value);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      PojoText other = (PojoText) obj;
+      if (value == null) {
+        if (other.value != null)
+          return false;
+      } else if (!value.equals(other.value))
+        return false;
+      return true;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java
new file mode 100644
index 0000000..a832a5d
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/AvroTypeSortIT.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static junit.framework.Assert.assertEquals;
+import static org.apache.crunch.types.avro.Avros.ints;
+import static org.apache.crunch.types.avro.Avros.records;
+import static org.apache.crunch.types.avro.Avros.strings;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Test sorting Avro types by selected inner field
+ */
+public class AvroTypeSortIT implements Serializable {
+
+  private static final long serialVersionUID = 1344118240353796561L;
+
+  private transient File avroFile;
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Before
+  public void setUp() throws IOException {
+    avroFile = File.createTempFile("avrotest", ".avro");
+  }
+
+  @After
+  public void tearDown() {
+    avroFile.delete();
+  }
+
+  @Test
+  public void testSortAvroTypesBySelectedFields() throws Exception {
+
+    MRPipeline pipeline = new MRPipeline(AvroTypeSortIT.class, tmpDir.getDefaultConfiguration());
+
+    Person ccc10 = createPerson("CCC", 10);
+    Person bbb20 = createPerson("BBB", 20);
+    Person aaa30 = createPerson("AAA", 30);
+
+    writeAvroFile(Lists.newArrayList(ccc10, bbb20, aaa30), avroFile);
+
+    PCollection<Person> unsorted = pipeline.read(At.avroFile(avroFile.getAbsolutePath(), records(Person.class)));
+
+    // Sort by Name
+    MapFn<Person, String> nameExtractor = new MapFn<Person, String>() {
+
+      @Override
+      public String map(Person input) {
+        return input.name.toString();
+      }
+    };
+
+    PCollection<Person> sortedByName = unsorted.by(nameExtractor, strings()).groupByKey().ungroup().values();
+
+    List<Person> sortedByNameList = Lists.newArrayList(sortedByName.materialize());
+
+    assertEquals(3, sortedByNameList.size());
+    assertEquals(aaa30, sortedByNameList.get(0));
+    assertEquals(bbb20, sortedByNameList.get(1));
+    assertEquals(ccc10, sortedByNameList.get(2));
+
+    // Sort by Age
+
+    MapFn<Person, Integer> ageExtractor = new MapFn<Person, Integer>() {
+
+      @Override
+      public Integer map(Person input) {
+        return input.age;
+      }
+    };
+
+    PCollection<Person> sortedByAge = unsorted.by(ageExtractor, ints()).groupByKey().ungroup().values();
+
+    List<Person> sortedByAgeList = Lists.newArrayList(sortedByAge.materialize());
+
+    assertEquals(3, sortedByAgeList.size());
+    assertEquals(ccc10, sortedByAgeList.get(0));
+    assertEquals(bbb20, sortedByAgeList.get(1));
+    assertEquals(aaa30, sortedByAgeList.get(2));
+
+    pipeline.done();
+  }
+
+  private void writeAvroFile(List<Person> people, File avroFile) throws IOException {
+
+    FileOutputStream outputStream = new FileOutputStream(avroFile);
+    SpecificDatumWriter<Person> writer = new SpecificDatumWriter<Person>(Person.class);
+
+    DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(writer);
+    dataFileWriter.create(Person.SCHEMA$, outputStream);
+    for (Person person : people) {
+      dataFileWriter.append(person);
+    }
+    dataFileWriter.close();
+    outputStream.close();
+  }
+
+  private Person createPerson(String name, int age) throws IOException {
+
+    Person person = new Person();
+    person.age = age;
+    person.name = name;
+    List<CharSequence> siblingNames = Lists.newArrayList();
+    person.siblingnames = siblingNames;
+
+    return person;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
new file mode 100644
index 0000000..4b28da7
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.test.Tests;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+
+public class CogroupIT {
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+  private MRPipeline pipeline;
+  private PCollection<String> lines1;
+  private PCollection<String> lines2;
+
+
+  @Before
+  public void setUp() throws IOException {
+    pipeline = new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration());
+    lines1 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt")));
+    lines2 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt")));
+  }
+
+  @After
+  public void tearDown() {
+    pipeline.done();
+  }
+
+  @Test
+  public void testCogroupWritables() {
+    runCogroup(WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testCogroupAvro() {
+    runCogroup(AvroTypeFamily.getInstance());
+  }
+
+  public void runCogroup(PTypeFamily ptf) {
+    PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
+
+    PTable<String, String> kv1 = lines1.parallelDo("kv1", new KeyValueSplit(), tt);
+    PTable<String, String> kv2 = lines2.parallelDo("kv2", new KeyValueSplit(), tt);
+
+    PTable<String, Pair<Collection<String>, Collection<String>>> cg = Cogroup.cogroup(kv1, kv2);
+
+    Map<String, Pair<Collection<String>, Collection<String>>> actual = cg.materializeToMap();
+
+    Map<String, Pair<Collection<String>, Collection<String>>> expected = ImmutableMap.of(
+        "a", Pair.of(coll("1-1", "1-4"), coll()),
+        "b", Pair.of(coll("1-2"), coll("2-1")),
+        "c", Pair.of(coll("1-3"), coll("2-2", "2-3")),
+        "d", Pair.of(coll(), coll("2-4"))
+    );
+
+    assertThat(actual, is(expected));
+  }
+
+
+  private static class KeyValueSplit extends DoFn<String, Pair<String, String>> {
+    @Override
+    public void process(String input, Emitter<Pair<String, String>> emitter) {
+      String[] fields = input.split(",");
+      emitter.emit(Pair.of(fields[0], fields[1]));
+    }
+  }
+
+  private static Collection<String> coll(String... values) {
+    return ImmutableList.copyOf(values);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/SecondarySortIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
new file mode 100644
index 0000000..242f621
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/SecondarySortIT.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.apache.crunch.types.avro.Avros.*;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+
+
+public class SecondarySortIT extends CrunchTestSupport implements Serializable {
+
+  @Test
+  public void testSecondarySort() throws Exception {
+    Pipeline p = new MRPipeline(SecondarySortIT.class, tempDir.getDefaultConfiguration());
+    String inputFile = tempDir.copyResourceFileName("secondary_sort_input.txt");
+    
+    PTable<String, Pair<Integer, Integer>> in = p.read(From.textFile(inputFile))
+        .parallelDo(new MapFn<String, Pair<String, Pair<Integer, Integer>>>() {
+          @Override
+          public Pair<String, Pair<Integer, Integer>> map(String input) {
+            String[] pieces = input.split(",");
+            return Pair.of(pieces[0],
+                Pair.of(Integer.valueOf(pieces[1].trim()), Integer.valueOf(pieces[2].trim())));
+          }
+        }, tableOf(strings(), pairs(ints(), ints())));
+    Iterable<String> lines = SecondarySort.sortAndApply(in, new MapFn<Pair<String, Iterable<Pair<Integer, Integer>>>, String>() {
+      @Override
+      public String map(Pair<String, Iterable<Pair<Integer, Integer>>> input) {
+        Joiner j = Joiner.on(',');
+        return j.join(input.first(), j.join(input.second()));
+      }
+    }, strings()).materialize();
+    assertEquals(ImmutableList.of("one,[-5,10],[1,1],[2,-3]", "three,[0,-1]", "two,[1,7],[2,6],[4,5]"),
+        ImmutableList.copyOf(lines));
+    p.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/SetIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/SetIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/SetIT.java
new file mode 100644
index 0000000..d1300d2
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/SetIT.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+
+@RunWith(value = Parameterized.class)
+public class SetIT {
+
+  private PTypeFamily typeFamily;
+
+  private Pipeline pipeline;
+  private PCollection<String> set1;
+  private PCollection<String> set2;
+
+  public SetIT(PTypeFamily typeFamily) {
+    this.typeFamily = typeFamily;
+  }
+  
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Parameters
+  public static Collection<Object[]> data() {
+    Object[][] data = new Object[][] { { WritableTypeFamily.getInstance() }, { AvroTypeFamily.getInstance() } };
+    return Arrays.asList(data);
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    String set1InputPath = tmpDir.copyResourceFileName("set1.txt");
+    String set2InputPath = tmpDir.copyResourceFileName("set2.txt");
+    pipeline = new MRPipeline(SetIT.class, tmpDir.getDefaultConfiguration());
+    set1 = pipeline.read(At.textFile(set1InputPath, typeFamily.strings()));
+    set2 = pipeline.read(At.textFile(set2InputPath, typeFamily.strings()));
+  }
+
+  @After
+  public void tearDown() {
+    pipeline.done();
+  }
+
+  @Test
+  public void testDifference() throws Exception {
+    PCollection<String> difference = Set.difference(set1, set2);
+    assertEquals(Lists.newArrayList("b", "e"), Lists.newArrayList(difference.materialize()));
+  }
+
+  @Test
+  public void testIntersection() throws Exception {
+    PCollection<String> intersection = Set.intersection(set1, set2);
+    assertEquals(Lists.newArrayList("a", "c"), Lists.newArrayList(intersection.materialize()));
+  }
+
+  @Test
+  public void testComm() throws Exception {
+    PCollection<Tuple3<String, String, String>> comm = Set.comm(set1, set2);
+    Iterator<Tuple3<String, String, String>> i = comm.materialize().iterator();
+    checkEquals(null, null, "a", i.next());
+    checkEquals("b", null, null, i.next());
+    checkEquals(null, null, "c", i.next());
+    checkEquals(null, "d", null, i.next());
+    checkEquals("e", null, null, i.next());
+    assertFalse(i.hasNext());
+  }
+
+  private void checkEquals(String s1, String s2, String s3, Tuple3<String, String, String> tuple) {
+    assertEquals("first string", s1, tuple.first());
+    assertEquals("second string", s2, tuple.second());
+    assertEquals("third string", s3, tuple.third());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/SortByValueIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/SortByValueIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/SortByValueIT.java
new file mode 100644
index 0000000..e19c7d3
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/SortByValueIT.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.lib.Sort.ColumnOrder;
+import org.apache.crunch.lib.Sort.Order;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ *
+ */
+public class SortByValueIT {
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+  
+  private static class SplitFn extends MapFn<String, Pair<String, Long>> {
+    private String sep;
+    
+    public SplitFn(String sep) {
+      this.sep = sep;
+    }
+    
+    @Override
+    public Pair<String, Long> map(String input) {
+      String[] pieces = input.split(sep);
+      return Pair.of(pieces[0], Long.valueOf(pieces[1]));
+    }
+  }
+  
+  @Test
+  public void testSortByValueWritables() throws Exception {
+    run(new MRPipeline(SortByValueIT.class), WritableTypeFamily.getInstance());
+  }
+  
+  @Test
+  public void testSortByValueAvro() throws Exception {
+    run(new MRPipeline(SortByValueIT.class), AvroTypeFamily.getInstance());
+  }
+  
+  public void run(Pipeline pipeline, PTypeFamily ptf) throws Exception {
+    String sbv = tmpDir.copyResourceFileName("sort_by_value.txt");
+    PTable<String, Long> letterCounts = pipeline.read(From.textFile(sbv)).parallelDo(new SplitFn("\t"),
+        ptf.tableOf(ptf.strings(), ptf.longs()));
+    PCollection<Pair<String, Long>> sorted = Sort.sortPairs(
+        letterCounts,
+        new ColumnOrder(2, Order.DESCENDING),
+        new ColumnOrder(1, Order.ASCENDING));
+    assertEquals(
+        ImmutableList.of(Pair.of("C", 3L), Pair.of("A", 2L), Pair.of("D", 2L), Pair.of("B", 1L), Pair.of("E", 1L)),
+        ImmutableList.copyOf(sorted.materialize()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/it/java/org/apache/crunch/lib/SortIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/SortIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/SortIT.java
new file mode 100644
index 0000000..bad4864
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/SortIT.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.apache.crunch.lib.Sort.ColumnOrder.by;
+import static org.apache.crunch.lib.Sort.Order.ASCENDING;
+import static org.apache.crunch.lib.Sort.Order.DESCENDING;
+import static org.apache.crunch.test.StringWrapper.wrap;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.lib.Sort.ColumnOrder;
+import org.apache.crunch.lib.Sort.Order;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SortIT implements Serializable {
+  @Rule
+  public transient TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testWritableSortAsc() throws Exception {
+    runSingle(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), Order.ASCENDING,
+        "A\tand this text as well");
+  }
+
+  @Test
+  public void testWritableSortDesc() throws Exception {
+    runSingle(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), Order.DESCENDING,
+        "B\tthis doc has some text");
+  }
+
+  @Test
+  public void testWritableSortAscDesc() throws Exception {
+    runPair(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING), "A",
+        "this doc has this text");
+  }
+
+  @Test
+  public void testWritableSortSecondDescFirstAsc() throws Exception {
+    runPair(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
+        "this doc has this text");
+  }
+
+  @Test
+  public void testWritableSortTripleAscDescAsc() throws Exception {
+    runTriple(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+        by(3, ASCENDING), "A", "this", "doc");
+  }
+
+  @Test
+  public void testWritableSortQuadAscDescAscDesc() throws Exception {
+    runQuad(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+        by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
+  }
+
+  @Test
+  public void testWritableSortTupleNAscDesc() throws Exception {
+    runTupleN(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(),
+        new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" });
+  }
+
+  @Test
+  public void testWritableSortTable() throws Exception {
+    runTable(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance(), "A");
+  }
+
+  @Test
+  public void testAvroSortAsc() throws Exception {
+    runSingle(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), Order.ASCENDING, "A\tand this text as well");
+  }
+
+  @Test
+  public void testAvroSortDesc() throws Exception {
+    runSingle(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), Order.DESCENDING, "B\tthis doc has some text");
+  }
+
+  @Test
+  public void testAvroSortPairAscDesc() throws Exception {
+    runPair(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING), "A",
+        "this doc has this text");
+  }
+
+  @Test
+  public void testAvroSortPairSecondDescFirstAsc() throws Exception {
+    runPair(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), by(2, DESCENDING), by(1, ASCENDING), "A",
+        "this doc has this text");
+  }
+
+  @Test
+  public void testAvroSortTripleAscDescAsc() throws Exception {
+    runTriple(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+        by(3, ASCENDING), "A", "this", "doc");
+  }
+
+  @Test
+  public void testAvroSortQuadAscDescAscDesc() throws Exception {
+    runQuad(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), by(1, ASCENDING), by(2, DESCENDING),
+        by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
+  }
+
+  @Test
+  public void testAvroSortTupleNAscDesc() throws Exception {
+    runTupleN(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(),
+        new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text" });
+  }
+
+  @Test
+  public void testAvroReflectSortPair() throws IOException {
+    Pipeline pipeline = new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration());
+    pipeline.enableDebug();
+    String rsrc = tmpDir.copyResourceFileName("set2.txt");
+    PCollection<Pair<String, StringWrapper>> in = pipeline.readTextFile(rsrc)
+        .parallelDo(new MapFn<String, Pair<String, StringWrapper>>() {
+
+          @Override
+          public Pair<String, StringWrapper> map(String input) {
+            return Pair.of(input, wrap(input));
+          }
+        }, Avros.pairs(Avros.strings(), Avros.reflects(StringWrapper.class)));
+    PCollection<Pair<String, StringWrapper>> sorted = Sort.sort(in, Order.ASCENDING);
+    
+    List<Pair<String, StringWrapper>> expected = Lists.newArrayList();
+    expected.add(Pair.of("a", wrap("a")));
+    expected.add(Pair.of("c", wrap("c")));
+    expected.add(Pair.of("d", wrap("d")));
+
+    assertEquals(expected, Lists.newArrayList(sorted.materialize()));
+  }
+
+  @Test
+  public void testAvroReflectSortTable() throws IOException {
+    Pipeline pipeline = new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration());
+    PTable<String, StringWrapper> unsorted = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt")).parallelDo(
+        new MapFn<String, Pair<String, StringWrapper>>() {
+
+          @Override
+          public Pair<String, StringWrapper> map(String input) {
+            return Pair.of(input, wrap(input));
+          }
+        }, Avros.tableOf(Avros.strings(), Avros.reflects(StringWrapper.class)));
+
+    PTable<String, StringWrapper> sorted = Sort.sort(unsorted);
+
+    List<Pair<String, StringWrapper>> expected = Lists.newArrayList();
+    expected.add(Pair.of("a", wrap("a")));
+    expected.add(Pair.of("c", wrap("c")));
+    expected.add(Pair.of("d", wrap("d")));
+
+    assertEquals(expected, Lists.newArrayList(sorted.materialize()));
+  }
+
+  @Test
+  public void testAvroSortTable() throws Exception {
+    runTable(new MRPipeline(SortIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance(), "A");
+  }
+
+  private void runSingle(Pipeline pipeline, PTypeFamily typeFamily, Order order, String firstLine) throws IOException {
+    String inputPath = tmpDir.copyResourceFileName("docs.txt");
+
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    // following turns the input from Writables to required type family
+    PCollection<String> input2 = input.parallelDo(new DoFn<String, String>() {
+      @Override
+      public void process(String input, Emitter<String> emitter) {
+        emitter.emit(input);
+      }
+    }, typeFamily.strings());
+    PCollection<String> sorted = Sort.sort(input2, order);
+    Iterable<String> lines = sorted.materialize();
+
+    assertEquals(firstLine, lines.iterator().next());
+    pipeline.done(); // TODO: finally
+  }
+
+  private void runPair(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first, ColumnOrder second,
+      String firstField, String secondField) throws IOException {
+    String inputPath = tmpDir.copyResourceFileName("docs.txt");
+
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PTable<String, String> kv = input.parallelDo(new DoFn<String, Pair<String, String>>() {
+      @Override
+      public void process(String input, Emitter<Pair<String, String>> emitter) {
+        String[] split = input.split("[\t]+");
+        emitter.emit(Pair.of(split[0], split[1]));
+      }
+    }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
+    PCollection<Pair<String, String>> sorted = Sort.sortPairs(kv, first, second);
+    List<Pair<String, String>> lines = Lists.newArrayList(sorted.materialize());
+    Pair<String, String> l = lines.iterator().next();
+    assertEquals(firstField, l.first());
+    assertEquals(secondField, l.second());
+    pipeline.done();
+  }
+
+  private void runTriple(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first, ColumnOrder second,
+      ColumnOrder third, String firstField, String secondField, String thirdField) throws IOException {
+    String inputPath = tmpDir.copyResourceFileName("docs.txt");
+
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PCollection<Tuple3<String, String, String>> kv = input.parallelDo(
+        new DoFn<String, Tuple3<String, String, String>>() {
+          @Override
+          public void process(String input, Emitter<Tuple3<String, String, String>> emitter) {
+            String[] split = input.split("[\t ]+");
+            int len = split.length;
+            emitter.emit(Tuple3.of(split[0], split[1 % len], split[2 % len]));
+          }
+        }, typeFamily.triples(typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
+    PCollection<Tuple3<String, String, String>> sorted = Sort.sortTriples(kv, first, second, third);
+    List<Tuple3<String, String, String>> lines = Lists.newArrayList(sorted.materialize());
+    Tuple3<String, String, String> l = lines.iterator().next();
+    assertEquals(firstField, l.first());
+    assertEquals(secondField, l.second());
+    assertEquals(thirdField, l.third());
+    pipeline.done();
+  }
+
+  private void runQuad(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first, ColumnOrder second,
+      ColumnOrder third, ColumnOrder fourth, String firstField, String secondField, String thirdField,
+      String fourthField) throws IOException {
+    String inputPath = tmpDir.copyResourceFileName("docs.txt");
+
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PCollection<Tuple4<String, String, String, String>> kv = input.parallelDo(
+        new DoFn<String, Tuple4<String, String, String, String>>() {
+          @Override
+          public void process(String input, Emitter<Tuple4<String, String, String, String>> emitter) {
+            String[] split = input.split("[\t ]+");
+            int len = split.length;
+            emitter.emit(Tuple4.of(split[0], split[1 % len], split[2 % len], split[3 % len]));
+          }
+        }, typeFamily.quads(typeFamily.strings(), typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
+    PCollection<Tuple4<String, String, String, String>> sorted = Sort.sortQuads(kv, first, second, third, fourth);
+    Iterable<Tuple4<String, String, String, String>> lines = sorted.materialize();
+    Tuple4<String, String, String, String> l = lines.iterator().next();
+    assertEquals(firstField, l.first());
+    assertEquals(secondField, l.second());
+    assertEquals(thirdField, l.third());
+    assertEquals(fourthField, l.fourth());
+    pipeline.done();
+  }
+
+  private void runTupleN(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder[] orders, String[] fields)
+      throws IOException {
+    String inputPath = tmpDir.copyResourceFileName("docs.txt");
+
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PType[] types = new PType[orders.length];
+    Arrays.fill(types, typeFamily.strings());
+    PCollection<TupleN> kv = input.parallelDo(new DoFn<String, TupleN>() {
+      @Override
+      public void process(String input, Emitter<TupleN> emitter) {
+        String[] split = input.split("[\t]+");
+        emitter.emit(new TupleN(split));
+      }
+    }, typeFamily.tuples(types));
+    PCollection<TupleN> sorted = Sort.sortTuples(kv, orders);
+    Iterable<TupleN> lines = sorted.materialize();
+    TupleN l = lines.iterator().next();
+    int i = 0;
+    for (String field : fields) {
+      assertEquals(field, l.get(i++));
+    }
+    pipeline.done();
+  }
+
+  private void runTable(Pipeline pipeline, PTypeFamily typeFamily, String firstKey) throws IOException {
+    String inputPath = tmpDir.copyResourceFileName("docs.txt");
+
+    PCollection<String> input = pipeline.readTextFile(inputPath);
+    PTable<String, String> table = input.parallelDo(new DoFn<String, Pair<String, String>>() {
+      @Override
+      public void process(String input, Emitter<Pair<String, String>> emitter) {
+        String[] split = input.split("[\t]+");
+        emitter.emit(Pair.of(split[0], split[1]));
+      }
+    }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
+
+    PTable<String, String> sorted = Sort.sort(table);
+    Iterable<Pair<String, String>> lines = sorted.materialize();
+    Pair<String, String> l = lines.iterator().next();
+    assertEquals(firstKey, l.first());
+    pipeline.done();
+  }
+
+}


Mime
View raw message