crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-174: Add support for cogrouping 3, 4, or N inputs.
Date Thu, 18 Jul 2013 07:23:41 GMT
Updated Branches:
  refs/heads/master 643e41063 -> 181b476fe


CRUNCH-174: Add support for cogrouping 3, 4, or N inputs.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/181b476f
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/181b476f
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/181b476f

Branch: refs/heads/master
Commit: 181b476fe25c9ba5efac7d65edcbfc8b27ae1077
Parents: 643e410
Author: Josh Wills <jwills@apache.org>
Authored: Wed Jul 17 19:30:21 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Wed Jul 17 22:47:22 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/crunch/lib/CogroupIT.java   |  82 +++++-
 .../src/main/java/org/apache/crunch/Tuple3.java |  37 +++
 .../src/main/java/org/apache/crunch/Tuple4.java |  43 +++
 .../java/org/apache/crunch/lib/Cogroup.java     | 259 +++++++++++++++----
 4 files changed, 374 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/181b476f/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
index 0d8b2b8..16c4c69 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
@@ -29,6 +29,8 @@ import org.apache.crunch.Emitter;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
@@ -53,13 +55,16 @@ public class CogroupIT {
   private MRPipeline pipeline;
   private PCollection<String> lines1;
   private PCollection<String> lines2;
-
+  private PCollection<String> lines3;
+  private PCollection<String> lines4;
 
   @Before
   public void setUp() throws IOException {
     pipeline = new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration());
     lines1 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt")));
     lines2 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt")));
+    lines3 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt")));
+    lines4 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt")));
   }
 
   @After
@@ -77,6 +82,26 @@ public class CogroupIT {
     runCogroup(AvroTypeFamily.getInstance());
   }
 
+  @Test
+  public void testCogroup3Writables() {
+    runCogroup3(WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testCogroup3Avro() {
+    runCogroup3(AvroTypeFamily.getInstance());
+  }
+  
+  @Test
+  public void testCogroup4Writables() {
+    runCogroup3(WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testCogroup4Avro() {
+    runCogroup3(AvroTypeFamily.getInstance());
+  }
+  
   public void runCogroup(PTypeFamily ptf) {
     PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
 
@@ -102,7 +127,62 @@ public class CogroupIT {
     assertThat(actual, is(expected));
   }
 
+  public void runCogroup3(PTypeFamily ptf) {
+    PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
 
+    PTable<String, String> kv1 = lines1.parallelDo("kv1", new KeyValueSplit(), tt);
+    PTable<String, String> kv2 = lines2.parallelDo("kv2", new KeyValueSplit(), tt);
+    PTable<String, String> kv3 = lines3.parallelDo("kv3", new KeyValueSplit(), tt);
+    
+    PTable<String, Tuple3.Collect<String, String, String>> cg = Cogroup.cogroup(kv1,
kv2, kv3);
+
+    Map<String, Tuple3.Collect<String, String, String>> result = cg.materializeToMap();
+    Map<String, Tuple3.Collect<String, String, String>> actual = Maps.newHashMap();
+    for (Map.Entry<String, Tuple3.Collect<String, String, String>> e : result.entrySet())
{
+      Collection<String> one = ImmutableSet.copyOf(e.getValue().first());
+      Collection<String> two = ImmutableSet.copyOf(e.getValue().second());
+      Collection<String> three = ImmutableSet.copyOf(e.getValue().third());
+      actual.put(e.getKey(), new Tuple3.Collect<String, String, String>(one, two, three));
+    }
+    Map<String, Tuple3.Collect<String, String, String>> expected = ImmutableMap.of(
+        "a", new Tuple3.Collect<String, String, String>(coll("1-1", "1-4"), coll(),
coll("1-1", "1-4")),
+        "b", new Tuple3.Collect<String, String, String>(coll("1-2"), coll("2-1"), coll("1-2")),
+        "c", new Tuple3.Collect<String, String, String>(coll("1-3"), coll("2-2", "2-3"),
coll("1-3")),
+        "d", new Tuple3.Collect<String, String, String>(coll(), coll("2-4"), coll())
+    );
+
+    assertThat(actual, is(expected));
+  }
+  
+  public void runCogroup4(PTypeFamily ptf) {
+    PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
+
+    PTable<String, String> kv1 = lines1.parallelDo("kv1", new KeyValueSplit(), tt);
+    PTable<String, String> kv2 = lines2.parallelDo("kv2", new KeyValueSplit(), tt);
+    PTable<String, String> kv3 = lines3.parallelDo("kv3", new KeyValueSplit(), tt);
+    PTable<String, String> kv4 = lines4.parallelDo("kv4", new KeyValueSplit(), tt);
+    
+    PTable<String, Tuple4.Collect<String, String, String, String>> cg = Cogroup.cogroup(kv1,
kv2, kv3, kv4);
+
+    Map<String, Tuple4.Collect<String, String, String, String>> result = cg.materializeToMap();
+    Map<String, Tuple4.Collect<String, String, String, String>> actual = Maps.newHashMap();
+    for (Map.Entry<String, Tuple4.Collect<String, String, String, String>> e
: result.entrySet()) {
+      Collection<String> one = ImmutableSet.copyOf(e.getValue().first());
+      Collection<String> two = ImmutableSet.copyOf(e.getValue().second());
+      Collection<String> three = ImmutableSet.copyOf(e.getValue().third());
+      Collection<String> four = ImmutableSet.copyOf(e.getValue().fourth());
+      actual.put(e.getKey(), new Tuple4.Collect<String, String, String, String>(one,
two, three, four));
+    }
+    Map<String, Tuple4.Collect<String, String, String, String>> expected = ImmutableMap.of(
+        "a", new Tuple4.Collect<String, String, String, String>(coll("1-1", "1-4"),
coll(), coll("1-1", "1-4"), coll()),
+        "b", new Tuple4.Collect<String, String, String, String>(coll("1-2"), coll("2-1"),
coll("1-2"), coll("2-1")),
+        "c", new Tuple4.Collect<String, String, String, String>(coll("1-3"), coll("2-2",
"2-3"), coll("1-3"), coll("2-2", "2-3")),
+        "d", new Tuple4.Collect<String, String, String, String>(coll(), coll("2-4"),
coll(), coll("2-4"))
+    );
+
+    assertThat(actual, is(expected));
+  }
+  
   private static class KeyValueSplit extends DoFn<String, Pair<String, String>>
{
     @Override
     public void process(String input, Emitter<Pair<String, String>> emitter)
{

http://git-wip-us.apache.org/repos/asf/crunch/blob/181b476f/crunch-core/src/main/java/org/apache/crunch/Tuple3.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Tuple3.java b/crunch-core/src/main/java/org/apache/crunch/Tuple3.java
index 4372811..922ed07 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Tuple3.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Tuple3.java
@@ -17,13 +17,50 @@
  */
 package org.apache.crunch;
 
+import java.util.Collection;
+
 import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
 
 /**
  * A convenience class for three-element {@link Tuple}s.
  */
 public class Tuple3<V1, V2, V3> implements Tuple {
 
+  public static class Collect<V1, V2, V3> extends Tuple3<Collection<V1>, Collection<V2>,
Collection<V3>> {
+
+    public static <V1, V2, V3> PType<Tuple3.Collect<V1, V2, V3>> derived(PType<V1>
first,
+        PType<V2> second, PType<V3> third) {
+      PTypeFamily tf = first.getFamily();
+      PType<Tuple3<Collection<V1>, Collection<V2>, Collection<V3>>>
pt = 
+          tf.triples(
+              tf.collections(first),
+              tf.collections(second),
+              tf.collections(third));
+      Object clazz = Tuple3.Collect.class;
+      return tf.derived((Class<Tuple3.Collect<V1, V2, V3>>) clazz,
+          new MapFn<Tuple3<Collection<V1>, Collection<V2>, Collection<V3>>,
Collect<V1, V2, V3>>() {
+        @Override
+        public Collect<V1, V2, V3> map(
+            Tuple3<Collection<V1>, Collection<V2>, Collection<V3>>
in) {
+          return new Collect<V1, V2, V3>(in.first(), in.second(), in.third());
+        }
+      },
+      new MapFn<Collect<V1, V2, V3>, Tuple3<Collection<V1>, Collection<V2>,
Collection<V3>>>() {
+        @Override
+        public Tuple3<Collection<V1>, Collection<V2>, Collection<V3>>
map(
+            Collect<V1, V2, V3> in) {
+          return in;
+        }
+      }, pt);
+    }
+    
+    public Collect(Collection<V1> first, Collection<V2> second, Collection<V3>
third) {
+      super(first, second, third);
+    }
+  }
+  
   private final V1 first;
   private final V2 second;
   private final V3 third;

http://git-wip-us.apache.org/repos/asf/crunch/blob/181b476f/crunch-core/src/main/java/org/apache/crunch/Tuple4.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Tuple4.java b/crunch-core/src/main/java/org/apache/crunch/Tuple4.java
index f161371..94d23fd 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Tuple4.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Tuple4.java
@@ -17,13 +17,56 @@
  */
 package org.apache.crunch;
 
+import java.util.Collection;
+
 import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
 
 /**
  * A convenience class for four-element {@link Tuple}s.
  */
 public class Tuple4<V1, V2, V3, V4> implements Tuple {
 
+  public static class Collect<V1, V2, V3, V4> extends Tuple4<
+  Collection<V1>, 
+  Collection<V2>,
+  Collection<V3>,
+  Collection<V4>> {
+
+    public static <V1, V2, V3, V4> PType<Tuple4.Collect<V1, V2, V3, V4>>
derived(PType<V1> first,
+        PType<V2> second, PType<V3> third, PType<V4> fourth) {
+      PTypeFamily tf = first.getFamily();
+      PType<Tuple4<Collection<V1>, Collection<V2>, Collection<V3>,
Collection<V4>>> pt = 
+          tf.quads(
+              tf.collections(first),
+              tf.collections(second),
+              tf.collections(third),
+              tf.collections(fourth));
+      Object clazz = Tuple4.Collect.class;
+      return tf.derived((Class<Tuple4.Collect<V1, V2, V3, V4>>) clazz,
+          new MapFn<Tuple4<Collection<V1>, Collection<V2>, Collection<V3>,
Collection<V4>>,
+          Collect<V1, V2, V3, V4>>() {
+        @Override
+        public Collect<V1, V2, V3, V4> map(
+            Tuple4<Collection<V1>, Collection<V2>, Collection<V3>, Collection<V4>>
in) {
+          return new Collect<V1, V2, V3, V4>(in.first(), in.second(), in.third(), in.fourth());
+        }
+      },
+      new MapFn<Collect<V1, V2, V3, V4>, Tuple4<Collection<V1>, Collection<V2>,
Collection<V3>, Collection<V4>>>() {
+        @Override
+        public Tuple4<Collection<V1>, Collection<V2>, Collection<V3>,
Collection<V4>> map(
+            Collect<V1, V2, V3, V4> input) {
+          return input;
+        }
+      }, pt);
+    }
+
+    public Collect(Collection<V1> first, Collection<V2> second, Collection<V3>
third, Collection<V4> fourth) {
+      super(first, second, third, fourth);
+    }
+  }
+  
   private final V1 first;
   private final V2 second;
   private final V3 third;

http://git-wip-us.apache.org/repos/asf/crunch/blob/181b476f/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
index 3bf3e4d..7f5f70d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
@@ -23,8 +23,13 @@ import org.apache.crunch.MapFn;
 import org.apache.crunch.PGroupedTable;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.Tuple4;
+import org.apache.crunch.TupleN;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.TupleFactory;
 
 import com.google.common.collect.Lists;
 
@@ -38,88 +43,250 @@ public class Cogroup {
    * @return a {@code PTable} representing the co-grouped tables
    */
   public static <K, U, V> PTable<K, Pair<Collection<U>, Collection<V>>>
cogroup(PTable<K, U> left, PTable<K, V> right) {
-    return cogroup(left, right, 0);
+    return cogroup(0, left, right);
   }
   
   /**
    * Co-groups the two {@link PTable} arguments with a user-specified degree of parallelism
(a.k.a, number of
    * reducers.)
    * 
+   * @param numReducers The number of reducers to use
    * @param left The left (smaller) PTable
    * @param right The right (larger) PTable
-   * @param numReducers The number of reducers to use
    * @return A new {@code PTable} representing the co-grouped tables
    */
   public static <K, U, V> PTable<K, Pair<Collection<U>, Collection<V>>>
cogroup(
+      int numReducers,
       PTable<K, U> left,
-      PTable<K, V> right,
-      int numReducers) {
-    PTypeFamily ptf = left.getTypeFamily();
-    PType<U> leftType = left.getPTableType().getValueType();
-    PType<V> rightType = right.getPTableType().getValueType();
-    PType<Pair<U, V>> itype = ptf.pairs(leftType, rightType);
+      PTable<K, V> right) {
+    PTypeFamily tf = left.getTypeFamily();
+    return cogroup(
+        tf.pairs(tf.collections(left.getValueType()),
+                 tf.collections(right.getValueType())),
+        TupleFactory.PAIR,
+        numReducers,
+        left, right);
+  }
 
-    PTable<K, Pair<U, V>> cgLeft = left.mapValues("coGroupTag1", new CogroupFn1<U,
V>(),
-        itype);
-    PTable<K, Pair<U, V>> cgRight = right.mapValues("coGroupTag2", new CogroupFn2<U,
V>(),
+  /**
+   * Co-groups the three {@link PTable} arguments.
+   * 
+   * @param first The smallest PTable
+   * @param second The second-smallest PTable
+   * @param third The largest PTable
+   * @return a {@code PTable} representing the co-grouped tables
+   */
+  public static <K, V1, V2, V3> PTable<K, Tuple3.Collect<V1, V2, V3>> cogroup(
+      PTable<K, V1> first,
+      PTable<K, V2> second,
+      PTable<K, V3> third) {
+    return cogroup(0, first, second, third);
+  }
+  
+  /**
+   * Co-groups the three {@link PTable} arguments with a user-specified degree of parallelism
(a.k.a, number of
+   * reducers.)
+   * 
+   * @param numReducers The number of reducers to use
+   * @param first The smallest PTable
+   * @param second The second-smallest PTable
+   * @param third The largest PTable
+   * @return A new {@code PTable} representing the co-grouped tables
+   */
+  public static <K, V1, V2, V3> PTable<K, Tuple3.Collect<V1, V2, V3>> cogroup(
+      int numReducers,
+      PTable<K, V1> first,
+      PTable<K, V2> second,
+      PTable<K, V3> third) {
+    return cogroup(
+        Tuple3.Collect.derived(first.getValueType(), second.getValueType(), third.getValueType()),
+        new TupleFactory<Tuple3.Collect<V1, V2, V3>>() {
+          @Override
+          public Tuple3.Collect<V1, V2, V3> makeTuple(Object... values) {
+            return new Tuple3.Collect<V1, V2, V3>(
+                (Collection<V1>) values[0],
+                (Collection<V2>) values[1],
+                (Collection<V3>) values[2]);
+          }
+        },
+        numReducers,
+        first, second, third);
+  }
+  
+  /**
+   * Co-groups the three {@link PTable} arguments.
+   * 
+   * @param first The smallest PTable
+   * @param second The second-smallest PTable
+   * @param third The largest PTable
+   * @return a {@code PTable} representing the co-grouped tables
+   */
+  public static <K, V1, V2, V3, V4> PTable<K, Tuple4.Collect<V1, V2, V3, V4>>
cogroup(
+      PTable<K, V1> first,
+      PTable<K, V2> second,
+      PTable<K, V3> third,
+      PTable<K, V4> fourth) {
+    return cogroup(0, first, second, third, fourth);
+  }
+  
+  /**
+   * Co-groups the three {@link PTable} arguments with a user-specified degree of parallelism
(a.k.a, number of
+   * reducers.)
+   * 
+   * @param numReducers The number of reducers to use
+   * @param first The smallest PTable
+   * @param second The second-smallest PTable
+   * @param third The largest PTable
+   * @return A new {@code PTable} representing the co-grouped tables
+   */
+  public static <K, V1, V2, V3, V4> PTable<K, Tuple4.Collect<V1, V2, V3, V4>>
cogroup(
+      int numReducers,
+      PTable<K, V1> first,
+      PTable<K, V2> second,
+      PTable<K, V3> third,
+      PTable<K, V4> fourth) {
+    return cogroup(
+        Tuple4.Collect.derived(first.getValueType(), second.getValueType(), third.getValueType(),
+            fourth.getValueType()),
+        new TupleFactory<Tuple4.Collect<V1, V2, V3, V4>>() {
+          @Override
+          public Tuple4.Collect<V1, V2, V3, V4> makeTuple(Object... values) {
+            return new Tuple4.Collect<V1, V2, V3, V4>(
+                (Collection<V1>) values[0],
+                (Collection<V2>) values[1],
+                (Collection<V3>) values[2],
+                (Collection<V4>) values[3]);
+          }
+        },
+        numReducers,
+        first, second, third);
+  }
+  
+  /**
+   * Co-groups an arbitrary number of {@link PTable} arguments. The largest table should
+   * come last in the ordering.
+   * 
+   * @param first The first (smallest) PTable to co-group
+   * @param rest The other (larger) PTables to co-group
+   * @return a {@code PTable} representing the co-grouped tables
+   */
+  public static <K> PTable<K, TupleN> cogroup(PTable<K, ?> first, PTable<K,
?>... rest) {
+    return cogroup(0, first, rest);
+  }
+  
+  /**
+   * Co-groups an arbitrary number of {@link PTable} arguments with a user-specified degree
of parallelism
+   * (a.k.a, number of reducers.) The largest table should come last in the ordering.
+   * 
+   * @param numReducers The number of reducers to use
+   * @param first The first (smallest) PTable to co-group
+   * @param rest The other (larger) PTables to co-group
+   * @return A new {@code PTable} representing the co-grouped tables
+   */
+  public static <K, U, V> PTable<K, TupleN> cogroup(
+      int numReducers,
+      PTable<K, ?> first,
+      PTable<K, ?>... rest) {
+    PTypeFamily tf = first.getTypeFamily();
+    PType[] components = new PType[1 + rest.length];
+    components[0] = tf.collections(first.getValueType());
+    for (int i = 0; i < rest.length; i++) {
+      components[i + 1] = rest[i].getValueType();
+    }
+    return cogroup(
+        tf.tuples(components),
+        TupleFactory.TUPLEN,
+        numReducers,
+        first, rest);
+  }
+  
+  private static <K, T extends Tuple> PTable<K, T> cogroup(
+      PType<T> outputType,
+      TupleFactory tupleFactory,
+      int numReducers,
+      PTable<K, ?> first, PTable<K, ?>... rest) {
+    PTypeFamily ptf = first.getTypeFamily();
+    PType[] ptypes = new PType[1 + rest.length];
+    ptypes[0] = first.getValueType();
+    for (int i = 0; i < rest.length; i++) {
+      ptypes[i + 1] = rest[i].getValueType();
+    }
+    PType<TupleN> itype = ptf.tuples(ptypes);
+    
+    PTable<K, TupleN> firstInter = first.mapValues("coGroupTag1",
+        new CogroupFn(0, 1 + rest.length),
         itype);
-
-    PType<Pair<Collection<U>, Collection<V>>> otype = ptf.pairs(ptf.collections(leftType),
-        ptf.collections(rightType));
-    PTable<K, Pair<U, V>> both = cgLeft.union(cgRight);
-    PGroupedTable<K, Pair<U, V>> grouped = null;
+    PTable<K, TupleN>[] inter = new PTable[rest.length];
+    for (int i = 0; i < rest.length; i++) {
+      inter[i] = rest[i].mapValues("coGroupTag" + (i + 2),
+          new CogroupFn(i + 1, 1 + rest.length),
+          itype);
+    }
+    
+    PTable<K, TupleN> union = firstInter.union(inter);
+    PGroupedTable<K, TupleN> grouped = null;
     if (numReducers > 0) {
-      grouped = both.groupByKey(numReducers);
+      grouped = union.groupByKey(numReducers);
     } else {
-      grouped = both.groupByKey();
+      grouped = union.groupByKey();
     }
-    return grouped.mapValues("cogroup", new PostGroupFn<U, V>(leftType, rightType),
otype);
+    
+    return grouped.mapValues("cogroup", 
+        new PostGroupFn<T>(tupleFactory, ptypes),
+        outputType);
   }
-
-  private static class CogroupFn1<V, U> extends MapFn<V, Pair<V, U>> {
-    @Override
-    public Pair<V, U> map(V v) {
-      return Pair.of(v, null);
+  
+  private static class CogroupFn<T> extends MapFn<T, TupleN> {
+    private final int index;
+    private final int size;
+    
+    public CogroupFn(int index, int size) {
+      this.index = index;
+      this.size = size;
     }
-  }
 
-  private static class CogroupFn2<V, U> extends MapFn<U, Pair<V, U>> {
     @Override
-    public Pair<V, U> map(U u) {
-      return Pair.of(null, u);
+    public TupleN map(T input) {
+      Object[] v = new Object[size];
+      v[index] = input;
+      return TupleN.of(v);
     }
   }
 
-  private static class PostGroupFn<V, U> extends
-      MapFn<Iterable<Pair<V, U>>, Pair<Collection<V>, Collection<U>>>
{
+  private static class PostGroupFn<T extends Tuple> extends
+      MapFn<Iterable<TupleN>, T> {
     
-    private PType<V> ptypeV;
-    private PType<U> ptypeU;
+    private final TupleFactory factory;
+    private final PType[] ptypes;
     
-    public PostGroupFn(PType<V> ptypeV, PType<U> ptypeU) {
-      this.ptypeV = ptypeV;
-      this.ptypeU = ptypeU;
+    public PostGroupFn(TupleFactory tf, PType... ptypes) {
+      this.factory = tf;
+      this.ptypes = ptypes;
     }
     
     @Override
     public void initialize() {
       super.initialize();
-      ptypeV.initialize(getConfiguration());
-      ptypeU.initialize(getConfiguration());
+      for (PType pt : ptypes) {
+        pt.initialize(getConfiguration());
+      }
     }
     
     @Override
-    public Pair<Collection<V>, Collection<U>> map(Iterable<Pair<V,
U>> input) {
-      Collection<V> cv = Lists.newArrayList();
-      Collection<U> cu = Lists.newArrayList();
-      for (Pair<V, U> pair : input) {
-        if (pair.first() != null) {
-          cv.add(ptypeV.getDetachedValue(pair.first()));
-        } else if (pair.second() != null) {
-          cu.add(ptypeU.getDetachedValue(pair.second()));
+    public T map(Iterable<TupleN> input) {
+      Collection[] collections = new Collection[ptypes.length];
+      for (int i = 0; i < ptypes.length; i++) {
+        collections[i] = Lists.newArrayList();
+      }
+      for (TupleN t : input) {
+        for (int i = 0; i < ptypes.length; i++) {
+          if (t.get(i) != null) {
+            collections[i].add(ptypes[i].getDetachedValue(t.get(i)));
+            break;
+          }
         }
       }
-      return Pair.of(cv, cu);
+      return (T) factory.makeTuple(collections);
     }
   }
 


Mime
View raw message