incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-48: Add length limit options to the String concatenation combiner. Contributed by Gauthier Ambard.
Date Mon, 20 Aug 2012 07:40:16 GMT
Updated Branches:
  refs/heads/master 012e924e6 -> 457a067b6


CRUNCH-48: Add length limit options to the String concatenation combiner. Contributed by Gauthier
Ambard.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/457a067b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/457a067b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/457a067b

Branch: refs/heads/master
Commit: 457a067b688ab129dc3adc01f5aacc2fa7114dbc
Parents: 012e924
Author: jwills <jwills@apache.org>
Authored: Mon Aug 20 00:38:42 2012 -0700
Committer: jwills <jwills@apache.org>
Committed: Mon Aug 20 00:38:42 2012 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/crunch/CombineFn.java |   54 ++++++++++++++-
 .../test/java/org/apache/crunch/CombineFnTest.java |    7 ++-
 2 files changed, 57 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/457a067b/crunch/src/main/java/org/apache/crunch/CombineFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/CombineFn.java b/crunch/src/main/java/org/apache/crunch/CombineFn.java
index 27183a9..88fbbaf 100644
--- a/crunch/src/main/java/org/apache/crunch/CombineFn.java
+++ b/crunch/src/main/java/org/apache/crunch/CombineFn.java
@@ -335,9 +335,9 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
     return aggregator(new LastNAggregator<V>(n));
   }
 
-
   /**
-   * Used to concatenate strings, with a separator between each strings.
+   * Used to concatenate strings, with a separator between each strings. There
+   * is no limits of length for the concatenated string.
    * 
    * @param separator
    *            the separator which will be appended between each string
@@ -348,7 +348,35 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
    * @return
    */
   public static final <K> CombineFn<K, String> STRING_CONCAT(final String separator,
final boolean skipNull) {
-    return aggregator(new StringConcatAggregator(separator, skipNull));
+      return aggregator(new StringConcatAggregator(separator, skipNull));
+  }
+
+  /**
+   * Used to concatenate strings, with a separator between each strings. You
+   * can specify the maximum length of the output string and of the input
+   * strings, if they are > 0. If a value is <= 0, there is no limits.
+   * 
+   * Any too large string (or any string which would made the output too
+   * large) will be silently discarded.
+   * 
+   * @param separator
+   *            the separator which will be appended between each string
+   * @param skipNull
+   *            define if we should skip null values. Throw
+   *            NullPointerException if set to false and there is a null
+   *            value.
+   * @param maxOutputLength
+   *            the maximum length of the output string. If it's set <= 0,
+   *            there is no limits. The number of characters of the output
+   *            string will be < maxOutputLength.
+   * @param maxInputLength
+   *            the maximum length of the input strings. If it's set <= 0,
+   *            there is no limits. The number of characters of the int string
+   *            will be < maxInputLength to be concatenated.
+   * @return
+   */
+  public static final <K> CombineFn<K, String> STRING_CONCAT(final String separator,
final boolean skipNull, final long maxOutputLength, final long maxInputLength) {
+      return aggregator(new StringConcatAggregator(separator, skipNull, maxOutputLength,
maxInputLength));
   }
 
   public static class SumLongs implements Aggregator<Long> {
@@ -870,6 +898,9 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
   public static class StringConcatAggregator implements Aggregator<String> {
     private final String separator;
     private final boolean skipNulls;
+    private final long maxOutputLength;
+    private final long maxInputLength;
+    private long currentLength;
     private final LinkedList<String> list = new LinkedList<String>();
 
     private transient Joiner joiner;
@@ -877,6 +908,16 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
     public StringConcatAggregator(final String separator, final boolean skipNulls) {
       this.separator = separator;
       this.skipNulls = skipNulls;
+      this.maxInputLength = 0;
+      this.maxOutputLength = 0;
+    }
+
+    public StringConcatAggregator(final String separator, final boolean skipNull, final long
maxOutputLength, final long maxInputLength) {
+      this.separator = separator;
+      this.skipNulls = skipNull;
+      this.maxOutputLength = maxOutputLength;
+      this.maxInputLength = maxInputLength;
+      this.currentLength = -separator.length();
     }
 
     @Override
@@ -889,6 +930,13 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S,
Iterable<T>>, Pair<S,
 
     @Override
     public void update(final String next) {
+      long length = (next == null) ? 0 : next.length() + separator.length();
+      if (maxOutputLength > 0 && currentLength + length > maxOutputLength ||
maxInputLength > 0 && next.length() > maxInputLength) {
+        return;
+      }
+      if (maxOutputLength > 0) {
+        currentLength += length;
+      }
       list.add(next);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/457a067b/crunch/src/test/java/org/apache/crunch/CombineFnTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/CombineFnTest.java b/crunch/src/test/java/org/apache/crunch/CombineFnTest.java
index 82bdf00..af67ec3 100644
--- a/crunch/src/test/java/org/apache/crunch/CombineFnTest.java
+++ b/crunch/src/test/java/org/apache/crunch/CombineFnTest.java
@@ -191,13 +191,18 @@ public class CombineFnTest {
     String[] arrayNull = new String[] { null, "" };
     assertEquals(ImmutableList.of("foofoobarbar"), applyAggregator(
         new StringConcatAggregator("", true), ImmutableList.of("foo", "foobar", "bar")));
-
     assertEquals(ImmutableList.of("foo/foobar/bar"), applyAggregator(
         new StringConcatAggregator("/", false), ImmutableList.of("foo", "foobar", "bar")));
     assertEquals(ImmutableList.of("  "), applyAggregator(
         new StringConcatAggregator(" ", true), ImmutableList.of(" ", "")));
     assertEquals(ImmutableList.of(""), applyAggregator(
         new StringConcatAggregator(" ", true), Arrays.asList(arrayNull)));
+    assertEquals(ImmutableList.of("foo bar"), applyAggregator(
+        new StringConcatAggregator(" ", true, 20, 3), ImmutableList.of("foo", "foobar", "bar")));
+    assertEquals(ImmutableList.of("foo foobar"), applyAggregator(
+        new StringConcatAggregator(" ", true, 10, 6), ImmutableList.of("foo", "foobar", "bar")));
+    assertEquals(ImmutableList.of("foo bar"), applyAggregator(
+        new StringConcatAggregator(" ", true, 9, 6), ImmutableList.of("foo", "foobar", "bar")));
   }
 
   @Test(expected = NullPointerException.class)


Mime
View raw message