crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject [1/2] git commit: CRUNCH-111: Reorganize CombineFn/Aggregator.
Date Wed, 14 Nov 2012 17:26:23 GMT
Updated Branches:
  refs/heads/master f6f965c4d -> 228657740


CRUNCH-111: Reorganize CombineFn/Aggregator.

Add PGroupedTable.combineValues(Aggregator).
Add Aggregators class with static factories.
Deprecate almost everything in CombineFn in favor of Aggregators.
Add unit and integration tests and testing infrastructure.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/22865774
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/22865774
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/22865774

Branch: refs/heads/master
Commit: 228657740cf10d7b5f99137bc43b3a93286e10b8
Parents: f6f965c
Author: Matthias Friedrich <matt@mafr.de>
Authored: Thu Nov 1 18:18:25 2012 +0100
Committer: Matthias Friedrich <matt@mafr.de>
Committed: Mon Nov 12 20:07:45 2012 +0100

----------------------------------------------------------------------
 .../contrib/bloomfilter/BloomFilterFactory.java    |    5 +-
 .../apache/crunch/examples/AverageBytesByIP.java   |   12 +-
 .../org/apache/crunch/examples/TotalBytesByIP.java |    9 +-
 .../crunch/examples/WordAggregationHBase.java      |    6 +-
 .../it/java/org/apache/crunch/CollectionsIT.java   |    9 +-
 .../src/it/java/org/apache/crunch/WordCountIT.java |    3 +-
 .../java/org/apache/crunch/fn/AggregatorsIT.java   |   83 ++
 .../it/java/org/apache/crunch/lib/CogroupIT.java   |    3 +-
 .../src/it/java/org/apache/crunch/test/Tests.java  |   66 +
 .../apache/crunch/fn/AggregatorsITData/ints.txt    |    5 +
 .../main/java/org/apache/crunch/Aggregator.java    |   86 ++
 .../src/main/java/org/apache/crunch/CombineFn.java |  230 ++++
 .../main/java/org/apache/crunch/PGroupedTable.java |   13 +
 .../java/org/apache/crunch/fn/Aggregators.java     | 1055 +++++++++++++++
 .../crunch/impl/mem/collect/MemGroupedTable.java   |    7 +
 .../crunch/impl/mr/collect/PGroupedTableImpl.java  |    8 +
 .../main/java/org/apache/crunch/lib/Aggregate.java |    5 +-
 .../java/org/apache/crunch/fn/AggregatorsTest.java |  228 ++++
 pom.xml                                            |    1 +
 19 files changed, 1811 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/22865774/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFactory.java
----------------------------------------------------------------------
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFactory.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFactory.java
index 825b445..9191a6c 100644
--- a/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFactory.java
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/bloomfilter/BloomFilterFactory.java
@@ -20,8 +20,7 @@ package org.apache.crunch.contrib.bloomfilter;
 import java.io.IOException;
 import java.util.Map;
 
-import org.apache.crunch.CombineFn.Aggregator;
-import org.apache.crunch.CombineFn.AggregatorCombineFn;
+import org.apache.crunch.Aggregator;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PObject;
 import org.apache.crunch.PTable;
@@ -75,7 +74,7 @@ public class BloomFilterFactory {
     PTypeFamily tf = collection.getTypeFamily();
     PTable<String, BloomFilter> table = collection.parallelDo(filterFn,
         tf.tableOf(tf.strings(), Writables.writables(BloomFilter.class)));
-    return table.groupByKey(1).combineValues(new AggregatorCombineFn<String, BloomFilter>(new BloomFilterAggregator()));
+    return table.groupByKey(1).combineValues(new BloomFilterAggregator());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/22865774/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java
----------------------------------------------------------------------
diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java b/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java
index 8abbb73..a9e8d1b 100644
--- a/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java
+++ b/crunch-examples/src/main/java/org/apache/crunch/examples/AverageBytesByIP.java
@@ -17,11 +17,14 @@
  */
 package org.apache.crunch.examples;
 
+import static org.apache.crunch.fn.Aggregators.SUM_LONGS;
+import static org.apache.crunch.fn.Aggregators.pairAggregator;
+
 import java.io.Serializable;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.crunch.CombineFn;
+import org.apache.crunch.Aggregator;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.MapFn;
@@ -61,15 +64,14 @@ public class AverageBytesByIP extends Configured implements Tool, Serializable {
     // Reference a given text file as a collection of Strings.
     PCollection<String> lines = pipeline.readTextFile(args[0]);
 
-    // Combiner used for summing up response size and count
-    CombineFn<String, Pair<Long, Long>> stringPairOfLongsSumCombiner = CombineFn.pairAggregator(CombineFn.SUM_LONGS,
-        CombineFn.SUM_LONGS);
+    // Aggregator used for summing up response size and count
+    Aggregator<Pair<Long, Long>> agg = pairAggregator(SUM_LONGS(), SUM_LONGS());
 
     // Table of (ip, sum(response size), count)
     PTable<String, Pair<Long, Long>> remoteAddrResponseSize = lines
         .parallelDo(extractResponseSize,
             Writables.tableOf(Writables.strings(), Writables.pairs(Writables.longs(), Writables.longs()))).groupByKey()
-        .combineValues(stringPairOfLongsSumCombiner);
+        .combineValues(agg);
 
     // Calculate average response size by ip address
     PTable<String, Double> avgs = remoteAddrResponseSize.parallelDo(calulateAverage,

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/22865774/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java
----------------------------------------------------------------------
diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java b/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java
index 44776ea..ff4dc85 100644
--- a/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java
+++ b/crunch-examples/src/main/java/org/apache/crunch/examples/TotalBytesByIP.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.crunch.CombineFn;
+import org.apache.crunch.Aggregator;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.PCollection;
@@ -29,6 +29,7 @@ import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.PipelineResult;
+import org.apache.crunch.fn.Aggregators;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
@@ -60,13 +61,13 @@ public class TotalBytesByIP extends Configured implements Tool, Serializable {
     // Reference a given text file as a collection of Strings.
     PCollection<String> lines = pipeline.readTextFile(args[0]);
 
-    // Combiner used for summing up response size
-    CombineFn<String, Long> longSumCombiner = CombineFn.SUM_LONGS();
+    // Aggregator used for summing up response size
+    Aggregator<Long> agg = Aggregators.SUM_LONGS();
 
     // Table of (ip, sum(response size))
     PTable<String, Long> ipAddrResponseSize = lines
         .parallelDo(extractIPResponseSize, Writables.tableOf(Writables.strings(), Writables.longs())).groupByKey()
-        .combineValues(longSumCombiner);
+        .combineValues(agg);
 
     pipeline.writeTextFile(ipAddrResponseSize, args[1]);
     // Execute the pipeline as a MapReduce.

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/22865774/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java
----------------------------------------------------------------------
diff --git a/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java b/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java
index 691721d..6214c4f 100644
--- a/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java
+++ b/crunch-examples/src/main/java/org/apache/crunch/examples/WordAggregationHBase.java
@@ -25,13 +25,13 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.CombineFn;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.fn.Aggregators;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.hbase.HBaseSourceTarget;
 import org.apache.crunch.io.hbase.HBaseTarget;
@@ -125,8 +125,8 @@ public class WordAggregationHBase extends Configured implements Tool, Serializab
     // We process the data from the source HTable then concatenate all data
     // with the same rowkey
     PTable<String, String> textExtracted = extractText(rawText);
-    CombineFn<String, String> stringConcatCombine = CombineFn.STRING_CONCAT(" ", true);
-    PTable<String, String> result = textExtracted.groupByKey().combineValues(stringConcatCombine);
+    PTable<String, String> result = textExtracted.groupByKey()
+        .combineValues(Aggregators.STRING_CONCAT(" ",  true));
 
     // We create the collection of puts from the concatenated datas
     PCollection<Put> resultPut = createPut(result);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/22865774/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CollectionsIT.java b/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
index 0d5803e..17d0cae 100644
--- a/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
+++ b/crunch/src/it/java/org/apache/crunch/CollectionsIT.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.util.Collection;
 
+import org.apache.crunch.fn.Aggregators.SimpleAggregator;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.TemporaryPath;
@@ -38,7 +39,7 @@ import com.google.common.collect.Lists;
 @SuppressWarnings("serial")
 public class CollectionsIT {
 
-  public static class AggregateStringListFn extends CombineFn.SimpleAggregator<Collection<String>> {
+  private static class AggregateStringListFn extends SimpleAggregator<Collection<String>> {
     private final Collection<String> rtn = Lists.newArrayList();
 
     @Override
@@ -57,7 +58,7 @@ public class CollectionsIT {
     }
   }
 
-  public static PTable<String, Collection<String>> listOfCharcters(PCollection<String> lines, PTypeFamily typeFamily) {
+  private static PTable<String, Collection<String>> listOfCharcters(PCollection<String> lines, PTypeFamily typeFamily) {
 
     return lines.parallelDo(new DoFn<String, Pair<String, Collection<String>>>() {
       @Override
@@ -70,8 +71,8 @@ public class CollectionsIT {
           emitter.emit(Pair.of(word, characters));
         }
       }
-    }, typeFamily.tableOf(typeFamily.strings(), typeFamily.collections(typeFamily.strings()))).groupByKey()
-        .combineValues(CombineFn.<String, Collection<String>> aggregator(new AggregateStringListFn()));
+    }, typeFamily.tableOf(typeFamily.strings(), typeFamily.collections(typeFamily.strings())))
+        .groupByKey().combineValues(new AggregateStringListFn());
   }
 
   @Rule

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/22865774/crunch/src/it/java/org/apache/crunch/WordCountIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/WordCountIT.java b/crunch/src/it/java/org/apache/crunch/WordCountIT.java
index 5124663..f46a1ee 100644
--- a/crunch/src/it/java/org/apache/crunch/WordCountIT.java
+++ b/crunch/src/it/java/org/apache/crunch/WordCountIT.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.List;
 
+import org.apache.crunch.fn.Aggregators;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
 import org.apache.crunch.io.To;
@@ -143,7 +144,7 @@ public class WordCountIT {
 
     if (runSecond) {
       String substrPath = tmpDir.getFileName("substr");
-      PTable<String, Long> we = substr(wordCount).groupByKey().combineValues(CombineFn.<String> SUM_LONGS());
+      PTable<String, Long> we = substr(wordCount).groupByKey().combineValues(Aggregators.SUM_LONGS());
       pipeline.writeTextFile(we, substrPath);
     }
     PipelineResult res = pipeline.done();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/22865774/crunch/src/it/java/org/apache/crunch/fn/AggregatorsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/fn/AggregatorsIT.java b/crunch/src/it/java/org/apache/crunch/fn/AggregatorsIT.java
new file mode 100644
index 0000000..c9584a1
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/fn/AggregatorsIT.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.apache.crunch.fn.Aggregators.SUM_INTS;
+import static org.apache.crunch.fn.Aggregators.pairAggregator;
+import static org.apache.crunch.types.writable.Writables.ints;
+import static org.apache.crunch.types.writable.Writables.pairs;
+import static org.apache.crunch.types.writable.Writables.strings;
+import static org.apache.crunch.types.writable.Writables.tableOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.test.Tests;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+
+@RunWith(Parameterized.class)
+public class AggregatorsIT {
+  private Pipeline pipeline;
+
+  @Parameters
+  public static Collection<Object[]> params() {
+    return Tests.pipelinesParams(AggregatorsIT.class);
+  }
+
+  public AggregatorsIT(Pipeline pipeline) {
+    this.pipeline = pipeline;
+  }
+
+  @Test
+  public void testPairAggregator() {
+    PCollection<String> lines = pipeline.readTextFile(Tests.pathTo(this, "ints.txt"));
+
+    PTable<String, Pair<Integer, Integer>> table = lines.parallelDo(new SplitLine(),
+        tableOf(strings(), pairs(ints(), ints())));
+
+    PTable<String, Pair<Integer, Integer>> combinedTable = table.groupByKey().combineValues(
+        pairAggregator(SUM_INTS(), SUM_INTS()));
+
+    Map<String, Pair<Integer, Integer>> result = combinedTable.asMap().getValue();
+
+    assertThat(result.size(), is(2));
+    assertThat(result.get("a"), is(Pair.of(9,  12)));
+    assertThat(result.get("b"), is(Pair.of(11,  13)));
+  }
+
+  private static final class SplitLine extends MapFn<String, Pair<String, Pair<Integer, Integer>>> {
+    @Override
+    public Pair<String, Pair<Integer, Integer>> map(String input) {
+      String[] split = input.split("\t");
+      return Pair.of(split[0],
+          Pair.of(Integer.parseInt(split[1]), Integer.parseInt(split[2])));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/22865774/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
index b6f5029..2bdc9ef 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
@@ -32,6 +32,7 @@ import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.fn.Aggregators;
 import org.apache.crunch.fn.MapKeysFn;
 import org.apache.crunch.fn.MapValuesFn;
 import org.apache.crunch.impl.mr.MRPipeline;
@@ -89,7 +90,7 @@ public class CogroupIT {
           return "";
         }
       }
-    }, ntt).groupByKey().combineValues(CombineFn.<String> SUM_LONGS());
+    }, ntt).groupByKey().combineValues(Aggregators.SUM_LONGS());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/22865774/crunch/src/it/java/org/apache/crunch/test/Tests.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/test/Tests.java b/crunch/src/it/java/org/apache/crunch/test/Tests.java
new file mode 100644
index 0000000..f2c7a86
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/test/Tests.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.test;
+
+import java.util.Collection;
+
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Resources;
+
+
+/**
+ * Utilities for integration tests.
+ */
+public final class Tests {
+
+  private Tests() {
+    // nothing
+  }
+
+  /**
+   * Get the path to and integration test resource file, as per naming convention.
+   *
+   * @param testCase The executing test case instance
+   * @param resourceName The file name of the resource
+   * @return The path to the resource (never null)
+   * @throws IllegalArgumentException Thrown if the resource doesn't exist
+   */
+  public static String pathTo(Object testCase, String resourceName) {
+    // Note: We append "Data" because otherwise Eclipse would complain about the
+    //       the case's class name clashing with the resource directory's name.
+    String path = testCase.getClass().getName().replaceAll("\\.", "/") + "Data/" + resourceName;
+    return Resources.getResource(path).getFile();
+  }
+
+  /**
+   * Return our two types of {@link Pipeline}s for a JUnit Parameterized test.
+   *
+   * @param testCase The executing test case's class
+   * @return The collection to return from a {@link Parameters} provider method
+   */
+  public static Collection<Object[]> pipelinesParams(Class<?> testCase) {
+    return ImmutableList.copyOf(
+        new Object[][] { { MemPipeline.getInstance() }, { new MRPipeline(testCase) }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/22865774/crunch/src/it/resources/org/apache/crunch/fn/AggregatorsITData/ints.txt
----------------------------------------------------------------------
diff --git a/crunch/src/it/resources/org/apache/crunch/fn/AggregatorsITData/ints.txt b/crunch/src/it/resources/org/apache/crunch/fn/AggregatorsITData/ints.txt
new file mode 100644
index 0000000..680cb09
--- /dev/null
+++ b/crunch/src/it/resources/org/apache/crunch/fn/AggregatorsITData/ints.txt
@@ -0,0 +1,5 @@
+a	1	2
+a	3	4
+b	2	3
+a	5	6
+b	9	10

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/22865774/crunch/src/main/java/org/apache/crunch/Aggregator.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Aggregator.java b/crunch/src/main/java/org/apache/crunch/Aggregator.java
new file mode 100644
index 0000000..432452b
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/Aggregator.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+
+
+/**
+ * Aggregate a sequence of values into a possibly smaller sequence of the same type.
+ *
+ * <p>In most cases, an Aggregator will turn multiple values into a single value,
+ * like creating a sum, finding the minimum or maximum, etc. In some cases
+ * (ie. finding the top K elements), an implementation may return more than
+ * one value. The {@link org.apache.crunch.fn.Aggregators} utility class contains
+ * factory methods for creating all kinds of pre-defined Aggregators that should
+ * cover the most common cases.</p>
+ *
+ * <p>Aggregator implementations should usually be <em>associative</em> and
+ * <em>commutative</em>, which makes their results deterministic. If your aggregation
+ * function isn't commutative, you can still use secondary sort to that effect.</p>
+ *
+ * <p>The lifecycle of an {@link Aggregator} always begins with you instantiating
+ * it and passing it to Crunch. When running your {@link Pipeline}, Crunch serializes
+ * the instance and deserializes it wherever it is needed on the cluster. This is how
+ * Crunch uses a deserialized instance:<p>
+ *
+ * <ol>
+ *   <li>call {@link #initialize(Configuration)} once</li>
+ *   <li>call {@link #reset()}
+ *   <li>call {@link #update(Object)} multiple times until all values of a sequence
+ *       have been aggregated</li>
+ *   <li>call {@link #results()} to retrieve the aggregated result</li>
+ *   <li>go back to step 2 until all sequences have been aggregated</li>
+ * </ol>
+ *
+ * @param <T> The value types to aggregate
+ */
+public interface Aggregator<T> extends Serializable {
+
+  /**
+   * Perform any setup of this instance that is required prior to processing
+   * inputs.
+   *
+   * @param conf Hadoop configuration
+   */
+  void initialize(Configuration conf);
+
+  /**
+   * Clears the internal state of this Aggregator and prepares it for the
+   * values associated with the next key.
+   *
+   * Depending on what you aggregate, this typically means setting a variable
+   * to zero or clearing a list. Failing to do this will yield wrong results!
+   */
+  void reset();
+
+  /**
+   * Incorporate the given value into the aggregate state maintained by this
+   * instance.
+   *
+   * @param value The value to add to the aggregated state
+   */
+  void update(T value);
+
+  /**
+   * Returns the current aggregated state of this instance.
+   */
+  Iterable<T> results();
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/22865774/crunch/src/main/java/org/apache/crunch/CombineFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/CombineFn.java b/crunch/src/main/java/org/apache/crunch/CombineFn.java
index d45940b..c42e48f 100644
--- a/crunch/src/main/java/org/apache/crunch/CombineFn.java
+++ b/crunch/src/main/java/org/apache/crunch/CombineFn.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.SortedSet;
 
+import org.apache.crunch.fn.Aggregators;
 import org.apache.crunch.util.Tuples;
 import org.apache.hadoop.conf.Configuration;
 
@@ -41,6 +42,9 @@ import com.google.common.collect.Sets;
  */
 public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, T>> {
 
+  /**
+   * @deprecated Use {@link org.apache.crunch.Aggregator}
+   */
   public static interface Aggregator<T> extends Serializable {
     /**
      * Perform any setup of this instance that is required prior to processing
@@ -68,6 +72,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
 
   /**
    * Base class for aggregators that do not require any initialization.
+   *
+   * @deprecated Use {@link org.apache.crunch.fn.Aggregators.SimpleAggregator}
    */
   public static abstract class SimpleAggregator<T> implements Aggregator<T> {
     @Override
@@ -78,6 +84,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
   
   /**
    * Interface for constructing new aggregator instances.
+   *
+   * @deprecated Use {@link PGroupedTable#combineValues(Aggregator)} which doesn't require a factory.
    */
   public static interface AggregatorFactory<T> {
     Aggregator<T> create();
@@ -86,6 +94,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
   /**
    * A {@code CombineFn} that delegates all of the actual work to an
    * {@code Aggregator} instance.
+   *
+   * @deprecated Use the {@link Aggregators#toCombineFn(org.apache.crunch.Aggregator)} adapter
    */
   public static class AggregatorCombineFn<K, V> extends CombineFn<K, V> {
 
@@ -147,6 +157,9 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#pairAggregator(Aggregator, Aggregator)}
+   */
   public static class PairAggregator<V1, V2> extends TupleAggregator<Pair<V1, V2>> {
 
     public PairAggregator(Aggregator<V1> a1, Aggregator<V2> a2) {
@@ -164,6 +177,9 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#tripAggregator(Aggregator, Aggregator, Aggregator)}
+   */
   public static class TripAggregator<A, B, C> extends TupleAggregator<Tuple3<A, B, C>> {
 
     public TripAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3) {
@@ -182,6 +198,9 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#quadAggregator(Aggregator, Aggregator, Aggregator, Aggregator)}
+   */
   public static class QuadAggregator<A, B, C, D> extends TupleAggregator<Tuple4<A, B, C, D>> {
 
     public QuadAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3, Aggregator<D> a4) {
@@ -200,6 +219,9 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#tupleAggregator(Aggregator...)}
+   */
   public static class TupleNAggregator extends TupleAggregator<TupleN> {
 
     private final int size;
@@ -225,29 +247,47 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
 
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#toCombineFn(Aggregator)}
+   */
   public static final <K, V> CombineFn<K, V> aggregator(Aggregator<V> aggregator) {
     return new AggregatorCombineFn<K, V>(aggregator);
   }
 
+  /**
+   * @deprecated Use {@link PGroupedTable#combineValues(Aggregator)} which doesn't require a factory.
+   */
   public static final <K, V> CombineFn<K, V> aggregatorFactory(AggregatorFactory<V> aggregator) {
     return new AggregatorCombineFn<K, V>(aggregator.create());
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#pairAggregator(Aggregator, Aggregator)}
+   */
   public static final <K, V1, V2> CombineFn<K, Pair<V1, V2>> pairAggregator(AggregatorFactory<V1> a1,
       AggregatorFactory<V2> a2) {
     return aggregator(new PairAggregator<V1, V2>(a1.create(), a2.create()));
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#tripAggregator(Aggregator, Aggregator, Aggregator)}
+   */
   public static final <K, A, B, C> CombineFn<K, Tuple3<A, B, C>> tripAggregator(AggregatorFactory<A> a1,
       AggregatorFactory<B> a2, AggregatorFactory<C> a3) {
     return aggregator(new TripAggregator<A, B, C>(a1.create(), a2.create(), a3.create()));
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#quadAggregator(Aggregator, Aggregator, Aggregator, Aggregator)}
+   */
   public static final <K, A, B, C, D> CombineFn<K, Tuple4<A, B, C, D>> quadAggregator(AggregatorFactory<A> a1,
       AggregatorFactory<B> a2, AggregatorFactory<C> a3, AggregatorFactory<D> a4) {
     return aggregator(new QuadAggregator<A, B, C, D>(a1.create(), a2.create(), a3.create(), a4.create()));
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#tupleAggregator(Aggregator...)}
+   */
   public static final <K> CombineFn<K, TupleN> tupleAggregator(AggregatorFactory<?>... factories) {
     Aggregator<?>[] aggs = new Aggregator[factories.length];
     for (int i = 0; i < aggs.length; i++) {
@@ -256,110 +296,191 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     return aggregator(new TupleNAggregator(aggs));
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#SUM_LONGS()}
+   */
   public static final <K> CombineFn<K, Long> SUM_LONGS() {
     return aggregatorFactory(SUM_LONGS);
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#SUM_INTS()}
+   */
   public static final <K> CombineFn<K, Integer> SUM_INTS() {
     return aggregatorFactory(SUM_INTS);
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#SUM_FLOATS()}
+   */
   public static final <K> CombineFn<K, Float> SUM_FLOATS() {
     return aggregatorFactory(SUM_FLOATS);
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#SUM_DOUBLES()}
+   */
   public static final <K> CombineFn<K, Double> SUM_DOUBLES() {
     return aggregatorFactory(SUM_DOUBLES);
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#SUM_BIGINTS()}
+   */
   public static final <K> CombineFn<K, BigInteger> SUM_BIGINTS() {
     return aggregatorFactory(SUM_BIGINTS);
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_LONGS()}
+   */
   public static final <K> CombineFn<K, Long> MAX_LONGS() {
     return aggregatorFactory(MAX_LONGS);
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_LONGS(int)}
+   */
   public static final <K> CombineFn<K, Long> MAX_LONGS(int n) {
     return aggregator(new MaxNAggregator<Long>(n));
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_INTS()}
+   */
   public static final <K> CombineFn<K, Integer> MAX_INTS() {
     return aggregatorFactory(MAX_INTS);
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_INTS(int)}
+   */
   public static final <K> CombineFn<K, Integer> MAX_INTS(int n) {
     return aggregator(new MaxNAggregator<Integer>(n));
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_FLOATS()}
+   */
   public static final <K> CombineFn<K, Float> MAX_FLOATS() {
     return aggregatorFactory(MAX_FLOATS);
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_FLOATS(int)}
+   */
   public static final <K> CombineFn<K, Float> MAX_FLOATS(int n) {
     return aggregator(new MaxNAggregator<Float>(n));
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_DOUBLES()}
+   */
   public static final <K> CombineFn<K, Double> MAX_DOUBLES() {
     return aggregatorFactory(MAX_DOUBLES);
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_DOUBLES(int)}
+   */
   public static final <K> CombineFn<K, Double> MAX_DOUBLES(int n) {
     return aggregator(new MaxNAggregator<Double>(n));
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_BIGINTS()}
+   */
   public static final <K> CombineFn<K, BigInteger> MAX_BIGINTS() {
     return aggregatorFactory(MAX_BIGINTS);
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_BIGINTS(int)}
+   */
   public static final <K> CombineFn<K, BigInteger> MAX_BIGINTS(int n) {
     return aggregator(new MaxNAggregator<BigInteger>(n));
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_LONGS()}
+   */
   public static final <K> CombineFn<K, Long> MIN_LONGS() {
     return aggregatorFactory(MIN_LONGS);
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_LONGS(int)}
+   */
   public static final <K> CombineFn<K, Long> MIN_LONGS(int n) {
     return aggregator(new MinNAggregator<Long>(n));
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_INTS()}
+   */
   public static final <K> CombineFn<K, Integer> MIN_INTS() {
     return aggregatorFactory(MIN_INTS);
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_INTS(int)}
+   */
   public static final <K> CombineFn<K, Integer> MIN_INTS(int n) {
     return aggregator(new MinNAggregator<Integer>(n));
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_FLOATS()}
+   */
   public static final <K> CombineFn<K, Float> MIN_FLOATS() {
     return aggregatorFactory(MIN_FLOATS);
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_FLOATS(int)}
+   */
   public static final <K> CombineFn<K, Float> MIN_FLOATS(int n) {
     return aggregator(new MinNAggregator<Float>(n));
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_DOUBLES()}
+   */
   public static final <K> CombineFn<K, Double> MIN_DOUBLES() {
     return aggregatorFactory(MIN_DOUBLES);
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_DOUBLES(int)}
+   */
   public static final <K> CombineFn<K, Double> MIN_DOUBLES(int n) {
     return aggregator(new MinNAggregator<Double>(n));
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_BIGINTS()}
+   */
   public static final <K> CombineFn<K, BigInteger> MIN_BIGINTS() {
     return aggregatorFactory(MIN_BIGINTS);
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_BIGINTS(int)}
+   */
   public static final <K> CombineFn<K, BigInteger> MIN_BIGINTS(int n) {
     return aggregator(new MinNAggregator<BigInteger>(n));
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#FIRST_N(int)}
+   */
   public static final <K, V> CombineFn<K, V> FIRST_N(int n) {
     return aggregator(new FirstNAggregator<V>(n));
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#LAST_N(int)}
+   */
   public static final <K, V> CombineFn<K, V> LAST_N(int n) {
     return aggregator(new LastNAggregator<V>(n));
   }
@@ -375,6 +496,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
    *            NullPointerException if set to false and there is a null
    *            value.
    * @return
+   *
+   * @deprecated Use {@link Aggregators#STRING_CONCAT(String, boolean)}
    */
   public static final <K> CombineFn<K, String> STRING_CONCAT(final String separator, final boolean skipNull) {
       return aggregator(new StringConcatAggregator(separator, skipNull));
@@ -403,11 +526,16 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
    *            there is no limits. The number of characters of the int string
    *            will be < maxInputLength to be concatenated.
    * @return
+   *
+   * @deprecated Use {@link Aggregators#STRING_CONCAT(String, boolean, long, long)}
    */
   public static final <K> CombineFn<K, String> STRING_CONCAT(final String separator, final boolean skipNull, final long maxOutputLength, final long maxInputLength) {
       return aggregator(new StringConcatAggregator(separator, skipNull, maxOutputLength, maxInputLength));
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#SUM_LONGS()}
+   */
   public static class SumLongs extends SimpleAggregator<Long> {
     private long sum = 0;
 
@@ -427,12 +555,18 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#SUM_LONGS()}
+   */
   public static AggregatorFactory<Long> SUM_LONGS = new AggregatorFactory<Long>() {
     public Aggregator<Long> create() {
       return new SumLongs();
     }
   };
 
+  /**
+   * @deprecated Use {@link Aggregators#SUM_INTS()}
+   */
   public static class SumInts extends SimpleAggregator<Integer> {
     private int sum = 0;
 
@@ -452,12 +586,18 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#SUM_INTS()}
+   */
   public static AggregatorFactory<Integer> SUM_INTS = new AggregatorFactory<Integer>() {
     public Aggregator<Integer> create() {
       return new SumInts();
     }
   };
 
+  /**
+   * @deprecated Use {@link Aggregators#SUM_FLOATS()}
+   */
   public static class SumFloats extends SimpleAggregator<Float> {
     private float sum = 0;
 
@@ -477,12 +617,18 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#SUM_FLOATS()}
+   */
   public static AggregatorFactory<Float> SUM_FLOATS = new AggregatorFactory<Float>() {
     public Aggregator<Float> create() {
       return new SumFloats();
     }
   };
 
+  /**
+   * @deprecated Use {@link Aggregators#SUM_DOUBLES()}
+   */
   public static class SumDoubles extends SimpleAggregator<Double> {
     private double sum = 0;
 
@@ -502,12 +648,18 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#SUM_DOUBLES()}
+   */
   public static AggregatorFactory<Double> SUM_DOUBLES = new AggregatorFactory<Double>() {
     public Aggregator<Double> create() {
       return new SumDoubles();
     }
   };
 
+  /**
+   * @deprecated Use {@link Aggregators#SUM_BIGINTS()}
+   */
   public static class SumBigInts extends SimpleAggregator<BigInteger> {
     private BigInteger sum = BigInteger.ZERO;
 
@@ -527,12 +679,18 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#SUM_BIGINTS()}
+   */
   public static AggregatorFactory<BigInteger> SUM_BIGINTS = new AggregatorFactory<BigInteger>() {
     public Aggregator<BigInteger> create() {
       return new SumBigInts();
     }
   };
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_LONGS()}
+   */
   public static class MaxLongs extends SimpleAggregator<Long> {
     private Long max = null;
 
@@ -554,12 +712,18 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_LONGS()}
+   */
   public static AggregatorFactory<Long> MAX_LONGS = new AggregatorFactory<Long>() {
     public Aggregator<Long> create() {
       return new MaxLongs();
     }
   };
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_INTS()}
+   */
   public static class MaxInts extends SimpleAggregator<Integer> {
     private Integer max = null;
 
@@ -581,12 +745,18 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_INTS()}
+   */
   public static AggregatorFactory<Integer> MAX_INTS = new AggregatorFactory<Integer>() {
     public Aggregator<Integer> create() {
       return new MaxInts();
     }
   };
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_FLOATS()}
+   */
   public static class MaxFloats extends SimpleAggregator<Float> {
     private Float max = null;
 
@@ -608,12 +778,18 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_FLOATS()}
+   */
   public static AggregatorFactory<Float> MAX_FLOATS = new AggregatorFactory<Float>() {
     public Aggregator<Float> create() {
       return new MaxFloats();
     }
   };
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_DOUBLES()}
+   */
   public static class MaxDoubles extends SimpleAggregator<Double> {
     private Double max = null;
 
@@ -635,12 +811,18 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_DOUBLES()}
+   */
   public static AggregatorFactory<Double> MAX_DOUBLES = new AggregatorFactory<Double>() {
     public Aggregator<Double> create() {
       return new MaxDoubles();
     }
   };
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_BIGINTS()}
+   */
   public static class MaxBigInts extends SimpleAggregator<BigInteger> {
     private BigInteger max = null;
 
@@ -662,12 +844,18 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_BIGINTS()}
+   */
   public static AggregatorFactory<BigInteger> MAX_BIGINTS = new AggregatorFactory<BigInteger>() {
     public Aggregator<BigInteger> create() {
       return new MaxBigInts();
     }
   };
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_LONGS()}
+   */
   public static class MinLongs extends SimpleAggregator<Long> {
     private Long min = null;
 
@@ -689,12 +877,18 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_LONGS()}
+   */
   public static AggregatorFactory<Long> MIN_LONGS = new AggregatorFactory<Long>() {
     public Aggregator<Long> create() {
       return new MinLongs();
     }
   };
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_INTS()}
+   */
   public static class MinInts extends SimpleAggregator<Integer> {
     private Integer min = null;
 
@@ -716,12 +910,18 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_INTS()}
+   */
   public static AggregatorFactory<Integer> MIN_INTS = new AggregatorFactory<Integer>() {
     public Aggregator<Integer> create() {
       return new MinInts();
     }
   };
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_FLOATS()}
+   */
   public static class MinFloats extends SimpleAggregator<Float> {
     private Float min = null;
 
@@ -743,12 +943,18 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_FLOATS()}
+   */
   public static AggregatorFactory<Float> MIN_FLOATS = new AggregatorFactory<Float>() {
     public Aggregator<Float> create() {
       return new MinFloats();
     }
   };
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_DOUBLES()}
+   */
   public static class MinDoubles extends SimpleAggregator<Double> {
     private Double min = null;
 
@@ -770,12 +976,18 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_DOUBLES()}
+   */
   public static AggregatorFactory<Double> MIN_DOUBLES = new AggregatorFactory<Double>() {
     public Aggregator<Double> create() {
       return new MinDoubles();
     }
   };
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_BIGINTS()}
+   */
   public static class MinBigInts extends SimpleAggregator<BigInteger> {
     private BigInteger min = null;
 
@@ -797,12 +1009,18 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_BIGINTS()}
+   */
   public static AggregatorFactory<BigInteger> MIN_BIGINTS = new AggregatorFactory<BigInteger>() {
     public Aggregator<BigInteger> create() {
       return new MinBigInts();
     }
   };
 
+  /**
+   * @deprecated Use {@link Aggregators#MAX_N(int, Class)}
+   */
   public static class MaxNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
     private final int arity;
     private transient SortedSet<V> elements;
@@ -836,6 +1054,9 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#MIN_N(int, Class)}
+   */
   public static class MinNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
     private final int arity;
     private transient SortedSet<V> elements;
@@ -869,6 +1090,9 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#FIRST_N(int)}
+   */
   public static class FirstNAggregator<V> extends SimpleAggregator<V> {
     private final int arity;
     private final List<V> elements;
@@ -896,6 +1120,9 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#LAST_N(int)}
+   */
   public static class LastNAggregator<V> extends SimpleAggregator<V> {
     private final int arity;
     private final LinkedList<V> elements;
@@ -924,6 +1151,9 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S,
     }
   }
 
+  /**
+   * @deprecated Use {@link Aggregators#STRING_CONCAT(String, boolean, long, long)}
+   */
   public static class StringConcatAggregator extends SimpleAggregator<String> {
     private final String separator;
     private final boolean skipNulls;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/22865774/crunch/src/main/java/org/apache/crunch/PGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PGroupedTable.java b/crunch/src/main/java/org/apache/crunch/PGroupedTable.java
index e727b70..d77ffdb 100644
--- a/crunch/src/main/java/org/apache/crunch/PGroupedTable.java
+++ b/crunch/src/main/java/org/apache/crunch/PGroupedTable.java
@@ -17,11 +17,14 @@
  */
 package org.apache.crunch;
 
+import org.apache.crunch.Aggregator;
+
 /**
  * The Crunch representation of a grouped {@link PTable}.
  * 
  */
 public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
+
   /**
    * Combines the values of this grouping using the given {@code CombineFn}.
    * 
@@ -32,6 +35,16 @@ public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
   PTable<K, V> combineValues(CombineFn<K, V> combineFn);
 
   /**
+   * Combine the values in each group using the given {@link Aggregator}.
+   *
+   * @param aggregator The function to use
+   * @return A {@link PTable} where each group key maps to an aggregated
+   *         value. Group keys may be repeated if an aggregator returns
+   *         more than one value.
+   */
+  PTable<K, V> combineValues(Aggregator<V> aggregator);
+
+  /**
    * Convert this grouping back into a multimap.
    * 
    * @return an ungrouped version of the data in this {@code PGroupedTable}.

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/22865774/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java
new file mode 100644
index 0000000..9ee0de7
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java
@@ -0,0 +1,1055 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import java.math.BigInteger;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.SortedSet;
+
+import org.apache.crunch.Aggregator;
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
+import org.apache.crunch.util.Tuples;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+
+/**
+ * A collection of pre-defined {@link org.apache.crunch.Aggregator}s.
+ *
+ * <p>The factory methods of this class return {@link org.apache.crunch.Aggregator}
+ * instances that you can use to combine the values of a {@link PGroupedTable}.
+ * In most cases, they turn a multimap (multiple entries per key) into a map (one
+ * entry per key).</p>
+ *
+ * <p><strong>Note</strong>: When using composed aggregators, like those built by the
+ * {@link #pairAggregator(Aggregator, Aggregator) pairAggregator()}
+ * factory method, you typically don't want to put in the same child aggregator more than once,
+ * even if all child aggregators have the same type. In most cases, this is what you want:</p>
+ *
+ * <pre>
+ *   PTable&lt;K, Long&gt; result = groupedTable.combineValues(
+ *      pairAggregator(SUM_LONGS(), SUM_LONGS())
+ *   );
+ * </pre>
+ */
+public final class Aggregators {
+
+  private Aggregators() {
+    // utility class, not for instantiation
+  }
+
+  /**
+   * Sum up all {@code long} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Long> SUM_LONGS() {
+    return new SumLongs();
+  }
+
+  /**
+   * Sum up all {@code int} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Integer> SUM_INTS() {
+    return new SumInts();
+  }
+
+  /**
+   * Sum up all {@code float} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Float> SUM_FLOATS() {
+    return new SumFloats();
+  }
+
+  /**
+   * Sum up all {@code double} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Double> SUM_DOUBLES() {
+    return new SumDoubles();
+  }
+
+  /**
+   * Sum up all {@link BigInteger} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<BigInteger> SUM_BIGINTS() {
+    return new SumBigInts();
+  }
+
+  /**
+   * Return the maximum of all given {@code long} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Long> MAX_LONGS() {
+    return new MaxLongs();
+  }
+
+  /**
+   * Return the {@code n} largest {@code long} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Long> MAX_LONGS(int n) {
+    return new MaxLongs();
+  }
+
+  /**
+   * Return the maximum of all given {@code int} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Integer> MAX_INTS() {
+    return new MaxInts();
+  }
+
+  /**
+   * Return the {@code n} largest {@code int} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Integer> MAX_INTS(int n) {
+    return new MaxNAggregator<Integer>(n);
+  }
+
+  /**
+   * Return the maximum of all given {@code float} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Float> MAX_FLOATS() {
+    return new MaxFloats();
+  }
+
+  /**
+   * Return the {@code n} largest {@code float} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Float> MAX_FLOATS(int n) {
+    return new MaxNAggregator<Float>(n);
+  }
+
+  /**
+   * Return the maximum of all given {@code double} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Double> MAX_DOUBLES() {
+    return new MaxDoubles();
+  }
+
+  /**
+   * Return the {@code n} largest {@code double} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Double> MAX_DOUBLES(int n) {
+    return new MaxNAggregator<Double>(n);
+  }
+
+  /**
+   * Return the maximum of all given {@link BigInteger} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<BigInteger> MAX_BIGINTS() {
+    return new MaxBigInts();
+  }
+
+  /**
+   * Return the {@code n} largest {@link BigInteger} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<BigInteger> MAX_BIGINTS(int n) {
+    return new MaxNAggregator<BigInteger>(n);
+  }
+
+  /**
+   * Return the {@code n} largest values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @param cls The type of the values to aggregate (must implement {@link Comparable}!)
+   * @return The newly constructed instance
+   */
+  public static <V extends Comparable<V>> Aggregator<V> MAX_N(int n, Class<V> cls) {
+    return new MaxNAggregator<V>(n);
+  }
+
+  /**
+   * Return the minimum of all given {@code long} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Long> MIN_LONGS() {
+    return new MinLongs();
+  }
+
+  /**
+   * Return the {@code n} smallest {@code long} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Long> MIN_LONGS(int n) {
+    return new MinNAggregator<Long>(n);
+  }
+
+  /**
+   * Return the minimum of all given {@code int} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Integer> MIN_INTS() {
+    return new MinInts();
+  }
+
+  /**
+   * Return the {@code n} smallest {@code int} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Integer> MIN_INTS(int n) {
+    return new MinNAggregator<Integer>(n);
+  }
+
+  /**
+   * Return the minimum of all given {@code float} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Float> MIN_FLOATS() {
+    return new MinFloats();
+  }
+
+  /**
+   * Return the {@code n} smallest {@code float} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Float> MIN_FLOATS(int n) {
+    return new MinNAggregator<Float>(n);
+  }
+
+  /**
+   * Return the minimum of all given {@code double} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Double> MIN_DOUBLES() {
+    return new MinDoubles();
+  }
+
+  /**
+   * Return the {@code n} smallest {@code double} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<Double> MIN_DOUBLES(int n) {
+    return new MinNAggregator<Double>(n);
+  }
+
+  /**
+   * Return the minimum of all given {@link BigInteger} values.
+   * @return The newly constructed instance
+   */
+  public static Aggregator<BigInteger> MIN_BIGINTS() {
+    return new MinBigInts();
+  }
+
+  /**
+   * Return the {@code n} smallest {@link BigInteger} values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static Aggregator<BigInteger> MIN_BIGINTS(int n) {
+    return new MinNAggregator<BigInteger>(n);
+  }
+
+  /**
+   * Return the {@code n} smallest values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @param cls The type of the values to aggregate (must implement {@link Comparable}!)
+   * @return The newly constructed instance
+   */
+  public static <V extends Comparable<V>> Aggregator<V> MIN_N(int n, Class<V> cls) {
+    return new MinNAggregator<V>(n);
+  }
+
+  /**
+   * Return the first {@code n} values (or fewer if there are fewer values than {@code n}).
+   *
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static <V> Aggregator<V> FIRST_N(int n) {
+    return new FirstNAggregator<V>(n);
+  }
+
+  /**
+   * Return the last {@code n} values (or fewer if there are fewer values than {@code n}).
+   *
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  public static <V> Aggregator<V> LAST_N(int n) {
+    return new LastNAggregator<V>(n);
+  }
+
+  /**
+   * Concatenate strings, with a separator between strings. There
+   * is no limits of length for the concatenated string.
+   *
+   * <p><em>Note: String concatenation is not commutative, which means the
+   * result of the aggregation is not deterministic!</em></p>
+   *
+   * @param separator
+   *            the separator which will be appended between each string
+   * @param skipNull
+   *            define if we should skip null values. Throw
+   *            NullPointerException if set to false and there is a null
+   *            value.
+   * @return The newly constructed instance instance
+   */
+  public static Aggregator<String> STRING_CONCAT(String separator, boolean skipNull) {
+    return new StringConcatAggregator(separator, skipNull);
+  }
+
+  /**
+   * Concatenate strings, with a separator between strings. You can specify
+   * the maximum length of the output string and of the input strings, if
+   * they are &gt; 0. If a value is &lt;= 0, there is no limit.
+   *
+   * <p>Any too large string (or any string which would made the output too
+   * large) will be silently discarded.</p>
+   *
+   * <p><em>Note: String concatenation is not commutative, which means the
+   * result of the aggregation is not deterministic!</em></p>
+   *
+   * @param separator
+   *            the separator which will be appended between each string
+   * @param skipNull
+   *            define if we should skip null values. Throw
+   *            NullPointerException if set to false and there is a null
+   *            value.
+   * @param maxOutputLength
+   *            the maximum length of the output string. If it's set &lt;= 0,
+   *            there is no limit. The number of characters of the output
+   *            string will be &lt; maxOutputLength.
+   * @param maxInputLength
+   *            the maximum length of the input strings. If it's set <= 0,
+   *            there is no limit. The number of characters of the input string
+   *            will be &lt; maxInputLength to be concatenated.
+   * @return The newly constructed instance instance
+   */
+  public static Aggregator<String> STRING_CONCAT(String separator, boolean skipNull,
+      long maxOutputLength, long maxInputLength) {
+    return new StringConcatAggregator(separator, skipNull, maxOutputLength, maxInputLength);
+  }
+
+  /**
+   * Apply separate aggregators to each component of a {@link Pair}.
+   */
+  public static <V1, V2> Aggregator<Pair<V1, V2>> pairAggregator(
+      Aggregator<V1> a1, Aggregator<V2> a2) {
+    return new PairAggregator<V1, V2>(a1, a2);
+  }
+
+  /**
+   * Apply separate aggregators to each component of a {@link Tuple3}.
+   */
+  public static <V1, V2, V3> Aggregator<Tuple3<V1, V2, V3>> tripAggregator(
+      Aggregator<V1> a1, Aggregator<V2> a2, Aggregator<V3> a3) {
+    return new TripAggregator<V1, V2, V3>(a1, a2, a3);
+  }
+
+  /**
+   * Apply separate aggregators to each component of a {@link Tuple4}.
+   */
+  public static <V1, V2, V3, V4> Aggregator<Tuple4<V1, V2, V3, V4>> quadAggregator(
+      Aggregator<V1> a1, Aggregator<V2> a2, Aggregator<V3> a3, Aggregator<V4> a4) {
+    return new QuadAggregator<V1, V2, V3, V4>(a1, a2, a3, a4);
+  }
+
+  /**
+   * Apply separate aggregators to each component of a {@link Tuple}.
+   */
+  public static Aggregator<TupleN> tupleAggregator(Aggregator<?>... aggregators) {
+    return new TupleNAggregator(aggregators);
+  }
+
+  /**
+   * Wrap a {@link CombineFn} adapter around the given aggregator.
+   *
+   * @param aggregator The instance to wrap
+   * @return A {@link CombineFn} delegating to {@code aggregator}
+   */
+  public static final <K, V> CombineFn<K, V> toCombineFn(Aggregator<V> aggregator) {
+    return new AggregatorCombineFn<K, V>(aggregator);
+  }
+
+  /**
+   * Base class for aggregators that do not require any initialization.
+   */
+  public static abstract class SimpleAggregator<T> implements Aggregator<T> {
+    @Override
+    public void initialize(Configuration conf) {
+      // No-op
+    }
+  }
+
+  /**
+   * A {@code CombineFn} that delegates all of the actual work to an
+   * {@code Aggregator} instance.
+   */
+  private static class AggregatorCombineFn<K, V> extends CombineFn<K, V> {
+    // TODO: Has to be fully qualified until CombineFn.Aggregator can be removed.
+    private final org.apache.crunch.Aggregator<V> aggregator;
+
+    public AggregatorCombineFn(org.apache.crunch.Aggregator<V> aggregator) {
+      this.aggregator = aggregator;
+    }
+
+    @Override
+    public void initialize() {
+      aggregator.initialize(getConfiguration());
+    }
+
+    @Override
+    public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
+      aggregator.reset();
+      for (V v : input.second()) {
+        aggregator.update(v);
+      }
+      for (V v : aggregator.results()) {
+        emitter.emit(Pair.of(input.first(), v));
+      }
+    }
+  }
+
+  private static class SumLongs extends SimpleAggregator<Long> {
+    private long sum = 0;
+
+    @Override
+    public void reset() {
+      sum = 0;
+    }
+
+    @Override
+    public void update(Long next) {
+      sum += next;
+    }
+
+    @Override
+    public Iterable<Long> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+
+  private static class SumInts extends SimpleAggregator<Integer> {
+    private int sum = 0;
+
+    @Override
+    public void reset() {
+      sum = 0;
+    }
+
+    @Override
+    public void update(Integer next) {
+      sum += next;
+    }
+
+    @Override
+    public Iterable<Integer> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+
+  private static class SumFloats extends SimpleAggregator<Float> {
+    private float sum = 0;
+
+    @Override
+    public void reset() {
+      sum = 0f;
+    }
+
+    @Override
+    public void update(Float next) {
+      sum += next;
+    }
+
+    @Override
+    public Iterable<Float> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+
+  private static class SumDoubles extends SimpleAggregator<Double> {
+    private double sum = 0;
+
+    @Override
+    public void reset() {
+      sum = 0f;
+    }
+
+    @Override
+    public void update(Double next) {
+      sum += next;
+    }
+
+    @Override
+    public Iterable<Double> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+
+  private static class SumBigInts extends SimpleAggregator<BigInteger> {
+    private BigInteger sum = BigInteger.ZERO;
+
+    @Override
+    public void reset() {
+      sum = BigInteger.ZERO;
+    }
+
+    @Override
+    public void update(BigInteger next) {
+      sum = sum.add(next);
+    }
+
+    @Override
+    public Iterable<BigInteger> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+
+  private static class MaxLongs extends SimpleAggregator<Long> {
+    private Long max = null;
+
+    @Override
+    public void reset() {
+      max = null;
+    }
+
+    @Override
+    public void update(Long next) {
+      if (max == null || max < next) {
+        max = next;
+      }
+    }
+
+    @Override
+    public Iterable<Long> results() {
+      return ImmutableList.of(max);
+    }
+  }
+
+  private static class MaxInts extends SimpleAggregator<Integer> {
+    private Integer max = null;
+
+    @Override
+    public void reset() {
+      max = null;
+    }
+
+    @Override
+    public void update(Integer next) {
+      if (max == null || max < next) {
+        max = next;
+      }
+    }
+
+    @Override
+    public Iterable<Integer> results() {
+      return ImmutableList.of(max);
+    }
+  }
+
+  private static class MaxFloats extends SimpleAggregator<Float> {
+    private Float max = null;
+
+    @Override
+    public void reset() {
+      max = null;
+    }
+
+    @Override
+    public void update(Float next) {
+      if (max == null || max < next) {
+        max = next;
+      }
+    }
+
+    @Override
+    public Iterable<Float> results() {
+      return ImmutableList.of(max);
+    }
+  }
+
+  private static class MaxDoubles extends SimpleAggregator<Double> {
+    private Double max = null;
+
+    @Override
+    public void reset() {
+      max = null;
+    }
+
+    @Override
+    public void update(Double next) {
+      if (max == null || max < next) {
+        max = next;
+      }
+    }
+
+    @Override
+    public Iterable<Double> results() {
+      return ImmutableList.of(max);
+    }
+  }
+
+  private static class MaxBigInts extends SimpleAggregator<BigInteger> {
+    private BigInteger max = null;
+
+    @Override
+    public void reset() {
+      max = null;
+    }
+
+    @Override
+    public void update(BigInteger next) {
+      if (max == null || max.compareTo(next) < 0) {
+        max = next;
+      }
+    }
+
+    @Override
+    public Iterable<BigInteger> results() {
+      return ImmutableList.of(max);
+    }
+  }
+
+  private static class MinLongs extends SimpleAggregator<Long> {
+    private Long min = null;
+
+    @Override
+    public void reset() {
+      min = null;
+    }
+
+    @Override
+    public void update(Long next) {
+      if (min == null || min > next) {
+        min = next;
+      }
+    }
+
+    @Override
+    public Iterable<Long> results() {
+      return ImmutableList.of(min);
+    }
+  }
+
+  private static class MinInts extends SimpleAggregator<Integer> {
+    private Integer min = null;
+
+    @Override
+    public void reset() {
+      min = null;
+    }
+
+    @Override
+    public void update(Integer next) {
+      if (min == null || min > next) {
+        min = next;
+      }
+    }
+
+    @Override
+    public Iterable<Integer> results() {
+      return ImmutableList.of(min);
+    }
+  }
+
+  private static class MinFloats extends SimpleAggregator<Float> {
+    private Float min = null;
+
+    @Override
+    public void reset() {
+      min = null;
+    }
+
+    @Override
+    public void update(Float next) {
+      if (min == null || min > next) {
+        min = next;
+      }
+    }
+
+    @Override
+    public Iterable<Float> results() {
+      return ImmutableList.of(min);
+    }
+  }
+
+  private static class MinDoubles extends SimpleAggregator<Double> {
+    private Double min = null;
+
+    @Override
+    public void reset() {
+      min = null;
+    }
+
+    @Override
+    public void update(Double next) {
+      if (min == null || min > next) {
+        min = next;
+      }
+    }
+
+    @Override
+    public Iterable<Double> results() {
+      return ImmutableList.of(min);
+    }
+  }
+
+  private static class MinBigInts extends SimpleAggregator<BigInteger> {
+    private BigInteger min = null;
+
+    @Override
+    public void reset() {
+      min = null;
+    }
+
+    @Override
+    public void update(BigInteger next) {
+      if (min == null || min.compareTo(next) > 0) {
+        min = next;
+      }
+    }
+
+    @Override
+    public Iterable<BigInteger> results() {
+      return ImmutableList.of(min);
+    }
+  }
+
+  private static class MaxNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
+    private final int arity;
+    private transient SortedSet<V> elements;
+
+    public MaxNAggregator(int arity) {
+      this.arity = arity;
+    }
+
+    @Override
+    public void reset() {
+      if (elements == null) {
+        elements = Sets.newTreeSet();
+      } else {
+        elements.clear();
+      }
+    }
+
+    @Override
+    public void update(V value) {
+      if (elements.size() < arity) {
+        elements.add(value);
+      } else if (value.compareTo(elements.first()) > 0) {
+        elements.remove(elements.first());
+        elements.add(value);
+      }
+    }
+
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+
+  private static class MinNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
+    private final int arity;
+    private transient SortedSet<V> elements;
+
+    public MinNAggregator(int arity) {
+      this.arity = arity;
+    }
+
+    @Override
+    public void reset() {
+      if (elements == null) {
+        elements = Sets.newTreeSet();
+      } else {
+        elements.clear();
+      }
+    }
+
+    @Override
+    public void update(V value) {
+      if (elements.size() < arity) {
+        elements.add(value);
+      } else if (value.compareTo(elements.last()) < 0) {
+        elements.remove(elements.last());
+        elements.add(value);
+      }
+    }
+
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+
+  private static class FirstNAggregator<V> extends SimpleAggregator<V> {
+    private final int arity;
+    private final List<V> elements;
+
+    public FirstNAggregator(int arity) {
+      this.arity = arity;
+      this.elements = Lists.newArrayList();
+    }
+
+    @Override
+    public void reset() {
+      elements.clear();
+    }
+
+    @Override
+    public void update(V value) {
+      if (elements.size() < arity) {
+        elements.add(value);
+      }
+    }
+
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+
+  private static class LastNAggregator<V> extends SimpleAggregator<V> {
+    private final int arity;
+    private final LinkedList<V> elements;
+
+    public LastNAggregator(int arity) {
+      this.arity = arity;
+      this.elements = Lists.newLinkedList();
+    }
+
+    @Override
+    public void reset() {
+      elements.clear();
+    }
+
+    @Override
+    public void update(V value) {
+      elements.add(value);
+      if (elements.size() == arity + 1) {
+        elements.removeFirst();
+      }
+    }
+
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+
+  private static class StringConcatAggregator extends SimpleAggregator<String> {
+    private final String separator;
+    private final boolean skipNulls;
+    private final long maxOutputLength;
+    private final long maxInputLength;
+    private long currentLength;
+    private final LinkedList<String> list = new LinkedList<String>();
+
+    private transient Joiner joiner;
+
+    public StringConcatAggregator(final String separator, final boolean skipNulls) {
+      this.separator = separator;
+      this.skipNulls = skipNulls;
+      this.maxInputLength = 0;
+      this.maxOutputLength = 0;
+    }
+
+    public StringConcatAggregator(final String separator, final boolean skipNull, final long maxOutputLength, final long maxInputLength) {
+      this.separator = separator;
+      this.skipNulls = skipNull;
+      this.maxOutputLength = maxOutputLength;
+      this.maxInputLength = maxInputLength;
+      this.currentLength = -separator.length();
+    }
+
+    @Override
+    public void reset() {
+      if (joiner == null) {
+        joiner = skipNulls ? Joiner.on(separator).skipNulls() : Joiner.on(separator);
+      }
+      currentLength = -separator.length();
+      list.clear();
+    }
+
+    @Override
+    public void update(final String next) {
+      long length = (next == null) ? 0 : next.length() + separator.length();
+      if (maxOutputLength > 0 && currentLength + length > maxOutputLength || maxInputLength > 0 && next.length() > maxInputLength) {
+        return;
+      }
+      if (maxOutputLength > 0) {
+        currentLength += length;
+      }
+      list.add(next);
+    }
+
+    @Override
+    public Iterable<String> results() {
+      return ImmutableList.of(joiner.join(list));
+    }
+  }
+
+
+  private static abstract class TupleAggregator<T> implements Aggregator<T> {
+    private final List<Aggregator<Object>> aggregators;
+
+    @SuppressWarnings("unchecked")
+    public TupleAggregator(Aggregator<?>... aggregators) {
+      this.aggregators = Lists.newArrayList();
+      for (Aggregator<?> a : aggregators) {
+        this.aggregators.add((Aggregator<Object>) a);
+      }
+    }
+
+    @Override
+    public void initialize(Configuration configuration) {
+      for (Aggregator<?> a : aggregators) {
+        a.initialize(configuration);
+      }
+    }
+
+    @Override
+    public void reset() {
+      for (Aggregator<?> a : aggregators) {
+        a.reset();
+      }
+    }
+
+    protected void updateTuple(Tuple t) {
+      for (int i = 0; i < aggregators.size(); i++) {
+        aggregators.get(i).update(t.get(i));
+      }
+    }
+
+    protected Iterable<Object> results(int index) {
+      return aggregators.get(index).results();
+    }
+  }
+
+  private static class PairAggregator<V1, V2> extends TupleAggregator<Pair<V1, V2>> {
+
+    public PairAggregator(Aggregator<V1> a1, Aggregator<V2> a2) {
+      super(a1, a2);
+    }
+
+    @Override
+    public void update(Pair<V1, V2> value) {
+      updateTuple(value);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Iterable<Pair<V1, V2>> results() {
+      return new Tuples.PairIterable<V1, V2>((Iterable<V1>) results(0), (Iterable<V2>) results(1));
+    }
+  }
+
+  private static class TripAggregator<A, B, C> extends TupleAggregator<Tuple3<A, B, C>> {
+
+    public TripAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3) {
+      super(a1, a2, a3);
+    }
+
+    @Override
+    public void update(Tuple3<A, B, C> value) {
+      updateTuple(value);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Iterable<Tuple3<A, B, C>> results() {
+      return new Tuples.TripIterable<A, B, C>((Iterable<A>) results(0), (Iterable<B>) results(1),
+          (Iterable<C>) results(2));
+    }
+  }
+
+  private static class QuadAggregator<A, B, C, D> extends TupleAggregator<Tuple4<A, B, C, D>> {
+
+    public QuadAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3, Aggregator<D> a4) {
+      super(a1, a2, a3, a4);
+    }
+
+    @Override
+    public void update(Tuple4<A, B, C, D> value) {
+      updateTuple(value);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Iterable<Tuple4<A, B, C, D>> results() {
+      return new Tuples.QuadIterable<A, B, C, D>((Iterable<A>) results(0), (Iterable<B>) results(1),
+          (Iterable<C>) results(2), (Iterable<D>) results(3));
+    }
+  }
+
+  private static class TupleNAggregator extends TupleAggregator<TupleN> {
+    private final int size;
+
+    public TupleNAggregator(Aggregator<?>... aggregators) {
+      super(aggregators);
+      size = aggregators.length;
+    }
+
+    @Override
+    public void update(TupleN value) {
+      updateTuple(value);
+    }
+
+    @Override
+    public Iterable<TupleN> results() {
+      Iterable<?>[] iterables = new Iterable[size];
+      for (int i = 0; i < size; i++) {
+        iterables[i] = results(i);
+      }
+      return new Tuples.TupleNIterable(iterables);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/22865774/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
index 0ee4c3f..1f39632 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.crunch.Aggregator;
 import org.apache.crunch.CombineFn;
 import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.PCollection;
@@ -30,6 +31,7 @@ import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.Target;
+import org.apache.crunch.fn.Aggregators;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
@@ -118,6 +120,11 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen
   }
 
   @Override
+  public PTable<K, V> combineValues(Aggregator<V> agg) {
+    return combineValues(Aggregators.<K, V>toCombineFn(agg));
+  }
+
+  @Override
   public PTable<K, V> ungroup() {
     return parent;
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/22865774/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
index fee381d..4eb6e9c 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.Aggregator;
 import org.apache.crunch.CombineFn;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
@@ -28,6 +29,7 @@ import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.PGroupedTable;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
+import org.apache.crunch.fn.Aggregators;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PType;
@@ -78,10 +80,16 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
     return ptype;
   }
 
+  @Override
   public PTable<K, V> combineValues(CombineFn<K, V> combineFn) {
     return new DoTableImpl<K, V>("combine", getChainingCollection(), combineFn, parent.getPTableType());
   }
 
+  @Override
+  public PTable<K, V> combineValues(Aggregator<V> agg) {
+    return combineValues(Aggregators.<K, V>toCombineFn(agg));
+  }
+
   private static class Ungroup<K, V> extends DoFn<Pair<K, Iterable<V>>, Pair<K, V>> {
     @Override
     public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/22865774/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
index f28cca4..453b920 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
@@ -31,6 +31,7 @@ import org.apache.crunch.PCollection;
 import org.apache.crunch.PObject;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
+import org.apache.crunch.fn.Aggregators;
 import org.apache.crunch.fn.MapValuesFn;
 import org.apache.crunch.materialize.pobject.FirstElementPObject;
 import org.apache.crunch.types.PTableType;
@@ -56,7 +57,7 @@ public class Aggregate {
         return Pair.of(input, 1L);
       }
     }, tf.tableOf(collect.getPType(), tf.longs())).groupByKey()
-        .combineValues(CombineFn.<S> SUM_LONGS());
+        .combineValues(Aggregators.SUM_LONGS());
   }
 
   /**
@@ -74,7 +75,7 @@ public class Aggregate {
             return Pair.of(1, 1L);
           }
         }, tf.tableOf(tf.ints(), tf.longs())).groupByKey()
-        .combineValues(CombineFn.<Integer> SUM_LONGS());
+        .combineValues(Aggregators.SUM_LONGS());
     PCollection<Long> count = countTable.values();
     return new FirstElementPObject<Long>(count);
   }


Mime
View raw message