crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject git commit: CRUNCH-393 Handle object reuse in Aggregate.top
Date Tue, 13 May 2014 21:44:46 GMT
Repository: crunch
Updated Branches:
  refs/heads/master a4d1f8461 -> d186658cb


CRUNCH-393 Handle object reuse in Aggregate.top


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d186658c
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d186658c
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d186658c

Branch: refs/heads/master
Commit: d186658cb7d925979f0f4f745ed33f75eeb0b3ef
Parents: a4d1f84
Author: Gabriel Reid <greid@apache.org>
Authored: Tue May 13 23:40:15 2014 +0200
Committer: Gabriel Reid <greid@apache.org>
Committed: Tue May 13 23:44:08 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/crunch/lib/AggregateIT.java | 21 +++++++++++++
 .../java/org/apache/crunch/lib/Aggregate.java   | 32 ++++++++++++++++----
 2 files changed, 47 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/d186658c/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
index 56ee3ac..1408c73 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.crunch.MapFn;
@@ -33,7 +34,9 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
 import org.apache.crunch.test.Employee;
+import org.apache.crunch.test.StringWrapper;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.PTableType;
@@ -139,6 +142,24 @@ public class AggregateIT {
   }
 
   @Test
+  public void testTopN_MRPipeline() throws IOException {
+    Pipeline pipeline = new MRPipeline(AggregateIT.class, tmpDir.getDefaultConfiguration());
+    PTable<StringWrapper, String> entries = pipeline
+        .read(From.textFile(tmpDir.copyResourceFileName("set1.txt"), Avros.strings()))
+        .by(new StringWrapper.StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class));
+    PTable<StringWrapper, String> topEntries = Aggregate.top(entries, 3, true);
+    List<Pair<StringWrapper, String>> expectedTop3 = Lists.newArrayList(
+        Pair.of(StringWrapper.wrap("e"), "e"),
+        Pair.of(StringWrapper.wrap("c"), "c"),
+        Pair.of(StringWrapper.wrap("b"), "b"));
+
+    assertEquals(
+        expectedTop3,
+        Lists.newArrayList(topEntries.materialize()));
+
+  }
+
+  @Test
   public void testCollectValues_Writables() throws IOException {
     Pipeline pipeline = new MRPipeline(AggregateIT.class, tmpDir.getDefaultConfiguration());
     Map<Integer, Collection<Text>> collectionMap = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))

http://git-wip-us.apache.org/repos/asf/crunch/blob/d186658c/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
index df9310f..5ef437c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
@@ -114,19 +114,22 @@ public class Aggregate {
 
     private final int limit;
     private final boolean maximize;
+    private final PType<Pair<K, V>> pairType;
     private transient PriorityQueue<Pair<K, V>> values;
 
-    public TopKFn(int limit, boolean ascending) {
+    public TopKFn(int limit, boolean ascending, PType<Pair<K, V>> pairType) {
       this.limit = limit;
       this.maximize = ascending;
+      this.pairType = pairType;
     }
 
     public void initialize() {
       this.values = new PriorityQueue<Pair<K, V>>(limit, new PairValueComparator<K,
V>(maximize));
+      pairType.initialize(getConfiguration());
     }
 
     public void process(Pair<K, V> input, Emitter<Pair<Integer, Pair<K, V>>>
emitter) {
-      values.add(input);
+      values.add(pairType.getDetachedValue(input));
       if (values.size() > limit) {
         values.poll();
       }
@@ -143,10 +146,17 @@ public class Aggregate {
 
     private final int limit;
     private final boolean maximize;
+    private PType<Pair<K, V>> pairType;
 
-    public TopKCombineFn(int limit, boolean maximize) {
+    public TopKCombineFn(int limit, boolean maximize, PType<Pair<K, V>> pairType)
{
       this.limit = limit;
       this.maximize = maximize;
+      this.pairType = pairType;
+    }
+
+    @Override
+    public void initialize() {
+      pairType.initialize(getConfiguration());
     }
 
     @Override
@@ -155,7 +165,7 @@ public class Aggregate {
       Comparator<Pair<K, V>> cmp = new PairValueComparator<K, V>(maximize);
       PriorityQueue<Pair<K, V>> queue = new PriorityQueue<Pair<K, V>>(limit,
cmp);
       for (Pair<K, V> pair : input.second()) {
-        queue.add(pair);
+        queue.add(pairType.getDetachedValue(pair));
         if (queue.size() > limit) {
           queue.poll();
         }
@@ -169,13 +179,23 @@ public class Aggregate {
     }
   }
 
+  /**
+   * Selects the top N pairs from the given table, with sorting being performed on the values
(i.e. the second
+   * value in the pair) of the table.
+   *
+   * @param ptable table containing the pairs from which the top N is to be selected
+   * @param limit number of top elements to select
+   * @param maximize if true, the maximum N values from the table will be selected, otherwise
the minimal
+   *                 N values will be selected
+   * @return table containing the top N values from the incoming table
+   */
   public static <K, V> PTable<K, V> top(PTable<K, V> ptable, int limit,
boolean maximize) {
     PTypeFamily ptf = ptable.getTypeFamily();
     PTableType<K, V> base = ptable.getPTableType();
     PType<Pair<K, V>> pairType = ptf.pairs(base.getKeyType(), base.getValueType());
     PTableType<Integer, Pair<K, V>> inter = ptf.tableOf(ptf.ints(), pairType);
-    return ptable.parallelDo("top" + limit + "map", new TopKFn<K, V>(limit, maximize),
inter)
-        .groupByKey(1).combineValues(new TopKCombineFn<K, V>(limit, maximize))
+    return ptable.parallelDo("top" + limit + "map", new TopKFn<K, V>(limit, maximize,
pairType), inter)
+        .groupByKey(1).combineValues(new TopKCombineFn<K, V>(limit, maximize, pairType))
         .parallelDo("top" + limit + "reduce", new DoFn<Pair<Integer, Pair<K, V>>,
Pair<K, V>>() {
           public void process(Pair<Integer, Pair<K, V>> input, Emitter<Pair<K,
V>> emitter) {
             emitter.emit(input.second());


Mime
View raw message