crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject crunch git commit: CRUNCH-568: Don't use a null key in the Aggregators.aggregate implementation
Date Wed, 07 Oct 2015 21:50:56 GMT
Repository: crunch
Updated Branches:
  refs/heads/master f8c98a6c6 -> 57235348d


CRUNCH-568: Don't use a null key in the Aggregators.aggregate implementation


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/57235348
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/57235348
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/57235348

Branch: refs/heads/master
Commit: 57235348d6628d28c3a869f23aca15888aa377be
Parents: f8c98a6
Author: Josh Wills <jwills@apache.org>
Authored: Tue Oct 6 14:30:24 2015 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Wed Oct 7 10:36:38 2015 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/crunch/lib/Aggregate.java    |  8 ++++----
 .../it/java/org/apache/crunch/SparkAggregatorIT.java  | 14 ++++++++++++++
 2 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/57235348/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
index 794caa0..dd4e1db 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
@@ -308,11 +308,11 @@ public class Aggregate {
   
   public static <S> PCollection<S> aggregate(PCollection<S> collect, Aggregator<S>
aggregator) {
     PTypeFamily tf = collect.getTypeFamily();
-    return collect.parallelDo("Aggregate.aggregator", new MapFn<S, Pair<Void, S>>()
{
-      public Pair<Void, S> map(S input) {
-        return Pair.of(null, input);
+    return collect.parallelDo("Aggregate.aggregator", new MapFn<S, Pair<Boolean, S>>()
{
+      public Pair<Boolean, S> map(S input) {
+        return Pair.of(false, input);
       }
-    }, tf.tableOf(tf.nulls(), collect.getPType()))
+    }, tf.tableOf(tf.booleans(), collect.getPType()))
     .groupByKey(1)
     .combineValues(aggregator)
     .values();

http://git-wip-us.apache.org/repos/asf/crunch/blob/57235348/crunch-spark/src/it/java/org/apache/crunch/SparkAggregatorIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkAggregatorIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkAggregatorIT.java
index bc6ebea..f55ef8f 100644
--- a/crunch-spark/src/it/java/org/apache/crunch/SparkAggregatorIT.java
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkAggregatorIT.java
@@ -19,6 +19,7 @@ package org.apache.crunch;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import org.apache.crunch.fn.Aggregators;
 import org.apache.crunch.impl.spark.SparkPipeline;
 import org.apache.crunch.io.From;
 import org.apache.crunch.test.TemporaryPath;
@@ -26,6 +27,8 @@ import org.apache.crunch.types.avro.Avros;
 import org.junit.Rule;
 import org.junit.Test;
 
+import java.util.Collection;
+
 import static org.junit.Assert.assertEquals;
 
 public class SparkAggregatorIT {
@@ -44,6 +47,17 @@ public class SparkAggregatorIT {
     pipeline.done();
   }
 
+  @Test
+  public void testAvroFirstN() throws Exception {
+    SparkPipeline pipeline = new SparkPipeline("local", "aggregator");
+    PCollection<String> set1 = pipeline.read(From.textFile(tempDir.copyResourceFileName("set1.txt"),
Avros.strings()));
+    PCollection<String> set2 = pipeline.read(From.textFile(tempDir.copyResourceFileName("set2.txt"),
Avros.strings()));
+    Aggregator<String> first5 = Aggregators.FIRST_N(5);
+    Collection<String> aggregate = set1.union(set2).aggregate(first5).asCollection().getValue();
+    pipeline.done();
+    assertEquals(5, aggregate.size());
+  }
+
   private static class CntFn extends MapFn<String, Integer> {
     @Override
     public Integer map(String input) {


Mime
View raw message