crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject [2/3] git commit: CRUNCH-118: Add aggregator/lib patterns for aggregating the unique elements of a collection
Date Sun, 25 Nov 2012 09:19:48 GMT
CRUNCH-118: Add aggregator/lib patterns for aggregating the unique elements of a collection


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/a71871d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/a71871d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/a71871d3

Branch: refs/heads/master
Commit: a71871d36de81d5af3f6d1bd604792e7a4313f5e
Parents: 711eaed
Author: Josh Wills <jwills@apache.org>
Authored: Wed Nov 21 09:31:59 2012 -0800
Committer: Matthias Friedrich <matt@mafr.de>
Committed: Sat Nov 24 10:05:17 2012 +0100

----------------------------------------------------------------------
 .../java/org/apache/crunch/fn/Aggregators.java     |   50 +++++++-
 .../main/java/org/apache/crunch/lib/Distinct.java  |  100 +++++++++++++++
 .../java/org/apache/crunch/fn/AggregatorsTest.java |    9 +-
 .../java/org/apache/crunch/lib/DistinctTest.java   |   37 ++++++
 4 files changed, 192 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a71871d3/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java
index 9ee0de7..5364d62 100644
--- a/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java
+++ b/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java
@@ -20,6 +20,7 @@ package org.apache.crunch.fn;
 import java.math.BigInteger;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 import java.util.SortedSet;
 
 import org.apache.crunch.Aggregator;
@@ -326,7 +327,7 @@ public final class Aggregators {
   public static <V> Aggregator<V> LAST_N(int n) {
     return new LastNAggregator<V>(n);
   }
-
+  
   /**
    * Concatenate strings, with a separator between strings. There
    * is no limits of length for the concatenated string.
@@ -340,7 +341,7 @@ public final class Aggregators {
    *            define if we should skip null values. Throw
    *            NullPointerException if set to false and there is a null
    *            value.
-   * @return The newly constructed instance instance
+   * @return The newly constructed instance
    */
   public static Aggregator<String> STRING_CONCAT(String separator, boolean skipNull)
{
     return new StringConcatAggregator(separator, skipNull);
@@ -371,7 +372,7 @@ public final class Aggregators {
    *            the maximum length of the input strings. If it's set <= 0,
    *            there is no limit. The number of characters of the input string
    *            will be &lt; maxInputLength to be concatenated.
-   * @return The newly constructed instance instance
+   * @return The newly constructed instance
    */
   public static Aggregator<String> STRING_CONCAT(String separator, boolean skipNull,
       long maxOutputLength, long maxInputLength) {
@@ -379,6 +380,17 @@ public final class Aggregators {
   }
 
   /**
+   * Collect the unique elements of the input, as defined by the {@code equals} method for
+   * the input objects. No guarantees are made about the order in which the final elements
+   * will be returned.
+   * 
+   * @return The newly constructed instance
+   */
+  public static <V> Aggregator<V> UNIQUE_ELEMENTS() {
+    return new SetAggregator<V>();
+  }
+  
+  /**
    * Apply separate aggregators to each component of a {@link Pair}.
    */
   public static <V1, V2> Aggregator<Pair<V1, V2>> pairAggregator(
@@ -1052,4 +1064,36 @@ public final class Aggregators {
     }
   }
 
+  private static class SetAggregator<V> extends SimpleAggregator<V> {
+    private final Set<V> elements;
+    private final int sizeLimit;
+    
+    public SetAggregator() {
+      this(-1);
+    }
+    
+    public SetAggregator(int sizeLimit) {
+      this.elements = Sets.newHashSet();
+      this.sizeLimit = sizeLimit;
+    }
+    
+    @Override
+    public void reset() {
+      elements.clear();
+    }
+
+    @Override
+    public void update(V value) {
+      elements.add(value);
+      if (sizeLimit > 0 && elements.size() > sizeLimit) {
+        elements.iterator().remove();
+      }
+    }
+
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a71871d3/crunch/src/main/java/org/apache/crunch/lib/Distinct.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Distinct.java b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java
new file mode 100644
index 0000000..fcf7b7e
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Set;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+/**
+ * Functions for computing the distinct elements of a {@code PCollection}.
+ */
+public class Distinct {
+
+  private static final int DEFAULT_FLUSH_EVERY = 50000;
+  
+  /**
+   * Construct a new {@code PCollection} that contains the unique elements of a
+   * given input {@code PCollection}.
+   * 
+   * @param input The input {@code PCollection}
+   * @return A new {@code PCollection} that contains the unique elements of the input
+   */
+  public static <S> PCollection<S> distinct(PCollection<S> input) {
+    return distinct(input, DEFAULT_FLUSH_EVERY);
+  }
+  
+  /**
+   * A {@code distinct} operation that gives the client more control over how frequently
+   * elements are flushed to disk in order to allow control over performance or
+   * memory consumption.
+   * 
+   * @param input The input {@code PCollection}
+   * @param flushEvery Flush the elements to disk whenever we encounter this many unique
values
+   * @return A new {@code PCollection} that contains the unique elements of the input
+   */
+  public static <S> PCollection<S> distinct(PCollection<S> input, int flushEvery)
{
+    Preconditions.checkArgument(flushEvery > 0);
+    PType<S> pt = input.getPType();
+    PTypeFamily ptf = pt.getFamily();
+    return input
+        .parallelDo("pre-distinct", new PreDistinctFn<S>(flushEvery), ptf.tableOf(pt,
ptf.nulls()))
+        .groupByKey()
+        .parallelDo("post-distinct", new PostDistinctFn<S>(), pt);
+  }
+  
+  private static class PreDistinctFn<S> extends DoFn<S, Pair<S, Void>>
{
+    private final Set<S> values = Sets.newHashSet();
+    private final int flushEvery;
+    
+    public PreDistinctFn(int flushEvery) {
+      this.flushEvery = flushEvery;
+    }
+    
+    @Override
+    public void process(S input, Emitter<Pair<S, Void>> emitter) {
+      values.add(input);
+      if (values.size() > flushEvery) {
+        cleanup(emitter);
+      }
+    }
+    
+    @Override
+    public void cleanup(Emitter<Pair<S, Void>> emitter) {
+      for (S in : values) {
+        emitter.emit(Pair.<S, Void>of(in, null));
+      }
+      values.clear();
+    }
+  }
+  
+  private static class PostDistinctFn<S> extends DoFn<Pair<S, java.lang.Iterable<java.lang.Void>>,
S> {
+    @Override
+    public void process(Pair<S, Iterable<Void>> input, Emitter<S> emitter)
{
+      emitter.emit(input.first());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a71871d3/crunch/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/fn/AggregatorsTest.java b/crunch/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
index 4ebb872..bd63653 100644
--- a/crunch/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
+++ b/crunch/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
@@ -55,6 +55,7 @@ import org.junit.Test;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
 
@@ -122,7 +123,13 @@ public class AggregatorsTest {
   public void testLastN() {
     assertThat(apply(Aggregators.<Integer>LAST_N(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(29,
1009)));
   }
-
+  
+  @Test
+  public void testUniqueElements() {
+    assertThat(ImmutableSet.copyOf(apply(Aggregators.<Integer>UNIQUE_ELEMENTS(), 17,
29, 29, 16, 17)),
+        is(ImmutableSet.of(17, 29, 16)));
+  }
+  
   @Test
   public void testPairs() {
     List<Pair<Long, Double>> input = ImmutableList.of(Pair.of(1720L, 17.29),
Pair.of(9L, -3.14));

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a71871d3/crunch/src/test/java/org/apache/crunch/lib/DistinctTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/DistinctTest.java b/crunch/src/test/java/org/apache/crunch/lib/DistinctTest.java
new file mode 100644
index 0000000..4c9d816
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/lib/DistinctTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+
+public class DistinctTest {
+  @Test
+  public void testDistinct() {
+    PCollection<Integer> input = MemPipeline.typedCollectionOf(Avros.ints(),
+        17, 29, 17, 29, 17, 29, 36, 45, 17, 45, 36, 29);
+    Iterable<Integer> unique = Distinct.distinct(input).materialize();
+    assertEquals(ImmutableSet.of(17, 29, 36, 45), ImmutableSet.copyOf(unique));
+  }
+}


Mime
View raw message