Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C62FA1028B for ; Thu, 18 Jul 2013 07:23:48 +0000 (UTC) Received: (qmail 85100 invoked by uid 500); 18 Jul 2013 07:23:47 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 85059 invoked by uid 500); 18 Jul 2013 07:23:43 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 85052 invoked by uid 99); 18 Jul 2013 07:23:41 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Jul 2013 07:23:41 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B415F8AD139; Thu, 18 Jul 2013 07:23:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: <8e016302245c4091b2ad0405de42aea7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-174: Add support for cogrouping 3, 4, or N inputs. Date: Thu, 18 Jul 2013 07:23:41 +0000 (UTC) Updated Branches: refs/heads/master 643e41063 -> 181b476fe CRUNCH-174: Add support for cogrouping 3, 4, or N inputs. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/181b476f Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/181b476f Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/181b476f Branch: refs/heads/master Commit: 181b476fe25c9ba5efac7d65edcbfc8b27ae1077 Parents: 643e410 Author: Josh Wills Authored: Wed Jul 17 19:30:21 2013 -0700 Committer: Josh Wills Committed: Wed Jul 17 22:47:22 2013 -0700 ---------------------------------------------------------------------- .../java/org/apache/crunch/lib/CogroupIT.java | 82 +++++- .../src/main/java/org/apache/crunch/Tuple3.java | 37 +++ .../src/main/java/org/apache/crunch/Tuple4.java | 43 +++ .../java/org/apache/crunch/lib/Cogroup.java | 259 +++++++++++++++---- 4 files changed, 374 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/181b476f/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java index 0d8b2b8..16c4c69 100644 --- a/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java @@ -29,6 +29,8 @@ import org.apache.crunch.Emitter; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; +import org.apache.crunch.Tuple3; +import org.apache.crunch.Tuple4; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; @@ -53,13 +55,16 @@ public class CogroupIT { private MRPipeline pipeline; private PCollection lines1; private PCollection lines2; - + private PCollection lines3; + private PCollection lines4; @Before public void setUp() throws IOException { pipeline = new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration()); lines1 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt"))); lines2 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt"))); + lines3 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt"))); + lines4 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt"))); } @After @@ -77,6 +82,26 @@ public class CogroupIT { runCogroup(AvroTypeFamily.getInstance()); } + @Test + public void testCogroup3Writables() { + runCogroup3(WritableTypeFamily.getInstance()); + } + + @Test + public void testCogroup3Avro() { + runCogroup3(AvroTypeFamily.getInstance()); + } + + @Test + public void testCogroup4Writables() { + runCogroup3(WritableTypeFamily.getInstance()); + } + + @Test + public void testCogroup4Avro() { + runCogroup3(AvroTypeFamily.getInstance()); + } + public void runCogroup(PTypeFamily ptf) { PTableType tt = ptf.tableOf(ptf.strings(), ptf.strings()); @@ -102,7 +127,62 @@ public class CogroupIT { assertThat(actual, is(expected)); } + public void runCogroup3(PTypeFamily ptf) { + PTableType tt = ptf.tableOf(ptf.strings(), ptf.strings()); + PTable kv1 = lines1.parallelDo("kv1", new KeyValueSplit(), tt); + PTable kv2 = lines2.parallelDo("kv2", new KeyValueSplit(), tt); + PTable kv3 = lines3.parallelDo("kv3", new KeyValueSplit(), tt); + + PTable> cg = Cogroup.cogroup(kv1, kv2, kv3); + + Map> result = cg.materializeToMap(); + Map> actual = Maps.newHashMap(); + for (Map.Entry> e : result.entrySet()) { + Collection one = ImmutableSet.copyOf(e.getValue().first()); + Collection two = ImmutableSet.copyOf(e.getValue().second()); + Collection three = ImmutableSet.copyOf(e.getValue().third()); + actual.put(e.getKey(), new Tuple3.Collect(one, two, three)); + } + Map> expected = ImmutableMap.of( + "a", new Tuple3.Collect(coll("1-1", "1-4"), coll(), coll("1-1", "1-4")), + "b", new Tuple3.Collect(coll("1-2"), coll("2-1"), coll("1-2")), + "c", new Tuple3.Collect(coll("1-3"), coll("2-2", "2-3"), coll("1-3")), + "d", new Tuple3.Collect(coll(), coll("2-4"), coll()) + ); + + assertThat(actual, is(expected)); + } + + public void runCogroup4(PTypeFamily ptf) { + PTableType tt = ptf.tableOf(ptf.strings(), ptf.strings()); + + PTable kv1 = lines1.parallelDo("kv1", new KeyValueSplit(), tt); + PTable kv2 = lines2.parallelDo("kv2", new KeyValueSplit(), tt); + PTable kv3 = lines3.parallelDo("kv3", new KeyValueSplit(), tt); + PTable kv4 = lines4.parallelDo("kv4", new KeyValueSplit(), tt); + + PTable> cg = Cogroup.cogroup(kv1, kv2, kv3, kv4); + + Map> result = cg.materializeToMap(); + Map> actual = Maps.newHashMap(); + for (Map.Entry> e : result.entrySet()) { + Collection one = ImmutableSet.copyOf(e.getValue().first()); + Collection two = ImmutableSet.copyOf(e.getValue().second()); + Collection three = ImmutableSet.copyOf(e.getValue().third()); + Collection four = ImmutableSet.copyOf(e.getValue().fourth()); + actual.put(e.getKey(), new Tuple4.Collect(one, two, three, four)); + } + Map> expected = ImmutableMap.of( + "a", new Tuple4.Collect(coll("1-1", "1-4"), coll(), coll("1-1", "1-4"), coll()), + "b", new Tuple4.Collect(coll("1-2"), coll("2-1"), coll("1-2"), coll("2-1")), + "c", new Tuple4.Collect(coll("1-3"), coll("2-2", "2-3"), coll("1-3"), coll("2-2", "2-3")), + "d", new Tuple4.Collect(coll(), coll("2-4"), coll(), coll("2-4")) + ); + + assertThat(actual, is(expected)); + } + private static class KeyValueSplit extends DoFn> { @Override public void process(String input, Emitter> emitter) { http://git-wip-us.apache.org/repos/asf/crunch/blob/181b476f/crunch-core/src/main/java/org/apache/crunch/Tuple3.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/Tuple3.java b/crunch-core/src/main/java/org/apache/crunch/Tuple3.java index 4372811..922ed07 100644 --- a/crunch-core/src/main/java/org/apache/crunch/Tuple3.java +++ b/crunch-core/src/main/java/org/apache/crunch/Tuple3.java @@ -17,13 +17,50 @@ */ package org.apache.crunch; +import java.util.Collection; + import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; /** * A convenience class for three-element {@link Tuple}s. */ public class Tuple3 implements Tuple { + public static class Collect extends Tuple3, Collection, Collection> { + + public static PType> derived(PType first, + PType second, PType third) { + PTypeFamily tf = first.getFamily(); + PType, Collection, Collection>> pt = + tf.triples( + tf.collections(first), + tf.collections(second), + tf.collections(third)); + Object clazz = Tuple3.Collect.class; + return tf.derived((Class>) clazz, + new MapFn, Collection, Collection>, Collect>() { + @Override + public Collect map( + Tuple3, Collection, Collection> in) { + return new Collect(in.first(), in.second(), in.third()); + } + }, + new MapFn, Tuple3, Collection, Collection>>() { + @Override + public Tuple3, Collection, Collection> map( + Collect in) { + return in; + } + }, pt); + } + + public Collect(Collection first, Collection second, Collection third) { + super(first, second, third); + } + } + private final V1 first; private final V2 second; private final V3 third; http://git-wip-us.apache.org/repos/asf/crunch/blob/181b476f/crunch-core/src/main/java/org/apache/crunch/Tuple4.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/Tuple4.java b/crunch-core/src/main/java/org/apache/crunch/Tuple4.java index f161371..94d23fd 100644 --- a/crunch-core/src/main/java/org/apache/crunch/Tuple4.java +++ b/crunch-core/src/main/java/org/apache/crunch/Tuple4.java @@ -17,13 +17,56 @@ */ package org.apache.crunch; +import java.util.Collection; + import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.PTypeFamily; /** * A convenience class for four-element {@link Tuple}s. */ public class Tuple4 implements Tuple { + public static class Collect extends Tuple4< + Collection, + Collection, + Collection, + Collection> { + + public static PType> derived(PType first, + PType second, PType third, PType fourth) { + PTypeFamily tf = first.getFamily(); + PType, Collection, Collection, Collection>> pt = + tf.quads( + tf.collections(first), + tf.collections(second), + tf.collections(third), + tf.collections(fourth)); + Object clazz = Tuple4.Collect.class; + return tf.derived((Class>) clazz, + new MapFn, Collection, Collection, Collection>, + Collect>() { + @Override + public Collect map( + Tuple4, Collection, Collection, Collection> in) { + return new Collect(in.first(), in.second(), in.third(), in.fourth()); + } + }, + new MapFn, Tuple4, Collection, Collection, Collection>>() { + @Override + public Tuple4, Collection, Collection, Collection> map( + Collect input) { + return input; + } + }, pt); + } + + public Collect(Collection first, Collection second, Collection third, Collection fourth) { + super(first, second, third, fourth); + } + } + private final V1 first; private final V2 second; private final V3 third; http://git-wip-us.apache.org/repos/asf/crunch/blob/181b476f/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java index 3bf3e4d..7f5f70d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java @@ -23,8 +23,13 @@ import org.apache.crunch.MapFn; import org.apache.crunch.PGroupedTable; import org.apache.crunch.PTable; import org.apache.crunch.Pair; +import org.apache.crunch.Tuple; +import org.apache.crunch.Tuple3; +import org.apache.crunch.Tuple4; +import org.apache.crunch.TupleN; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.TupleFactory; import com.google.common.collect.Lists; @@ -38,88 +43,250 @@ public class Cogroup { * @return a {@code PTable} representing the co-grouped tables */ public static PTable, Collection>> cogroup(PTable left, PTable right) { - return cogroup(left, right, 0); + return cogroup(0, left, right); } /** * Co-groups the two {@link PTable} arguments with a user-specified degree of parallelism (a.k.a, number of * reducers.) * + * @param numReducers The number of reducers to use * @param left The left (smaller) PTable * @param right The right (larger) PTable - * @param numReducers The number of reducers to use * @return A new {@code PTable} representing the co-grouped tables */ public static PTable, Collection>> cogroup( + int numReducers, PTable left, - PTable right, - int numReducers) { - PTypeFamily ptf = left.getTypeFamily(); - PType leftType = left.getPTableType().getValueType(); - PType rightType = right.getPTableType().getValueType(); - PType> itype = ptf.pairs(leftType, rightType); + PTable right) { + PTypeFamily tf = left.getTypeFamily(); + return cogroup( + tf.pairs(tf.collections(left.getValueType()), + tf.collections(right.getValueType())), + TupleFactory.PAIR, + numReducers, + left, right); + } - PTable> cgLeft = left.mapValues("coGroupTag1", new CogroupFn1(), - itype); - PTable> cgRight = right.mapValues("coGroupTag2", new CogroupFn2(), + /** + * Co-groups the three {@link PTable} arguments. + * + * @param first The smallest PTable + * @param second The second-smallest PTable + * @param third The largest PTable + * @return a {@code PTable} representing the co-grouped tables + */ + public static PTable> cogroup( + PTable first, + PTable second, + PTable third) { + return cogroup(0, first, second, third); + } + + /** + * Co-groups the three {@link PTable} arguments with a user-specified degree of parallelism (a.k.a, number of + * reducers.) + * + * @param numReducers The number of reducers to use + * @param first The smallest PTable + * @param second The second-smallest PTable + * @param third The largest PTable + * @return A new {@code PTable} representing the co-grouped tables + */ + public static PTable> cogroup( + int numReducers, + PTable first, + PTable second, + PTable third) { + return cogroup( + Tuple3.Collect.derived(first.getValueType(), second.getValueType(), third.getValueType()), + new TupleFactory>() { + @Override + public Tuple3.Collect makeTuple(Object... values) { + return new Tuple3.Collect( + (Collection) values[0], + (Collection) values[1], + (Collection) values[2]); + } + }, + numReducers, + first, second, third); + } + + /** + * Co-groups the three {@link PTable} arguments. + * + * @param first The smallest PTable + * @param second The second-smallest PTable + * @param third The largest PTable + * @return a {@code PTable} representing the co-grouped tables + */ + public static PTable> cogroup( + PTable first, + PTable second, + PTable third, + PTable fourth) { + return cogroup(0, first, second, third, fourth); + } + + /** + * Co-groups the three {@link PTable} arguments with a user-specified degree of parallelism (a.k.a, number of + * reducers.) + * + * @param numReducers The number of reducers to use + * @param first The smallest PTable + * @param second The second-smallest PTable + * @param third The largest PTable + * @return A new {@code PTable} representing the co-grouped tables + */ + public static PTable> cogroup( + int numReducers, + PTable first, + PTable second, + PTable third, + PTable fourth) { + return cogroup( + Tuple4.Collect.derived(first.getValueType(), second.getValueType(), third.getValueType(), + fourth.getValueType()), + new TupleFactory>() { + @Override + public Tuple4.Collect makeTuple(Object... values) { + return new Tuple4.Collect( + (Collection) values[0], + (Collection) values[1], + (Collection) values[2], + (Collection) values[3]); + } + }, + numReducers, + first, second, third); + } + + /** + * Co-groups an arbitrary number of {@link PTable} arguments. The largest table should + * come last in the ordering. + * + * @param first The first (smallest) PTable to co-group + * @param rest The other (larger) PTables to co-group + * @return a {@code PTable} representing the co-grouped tables + */ + public static PTable cogroup(PTable first, PTable... rest) { + return cogroup(0, first, rest); + } + + /** + * Co-groups an arbitrary number of {@link PTable} arguments with a user-specified degree of parallelism + * (a.k.a, number of reducers.) The largest table should come last in the ordering. + * + * @param numReducers The number of reducers to use + * @param first The first (smallest) PTable to co-group + * @param rest The other (larger) PTables to co-group + * @return A new {@code PTable} representing the co-grouped tables + */ + public static PTable cogroup( + int numReducers, + PTable first, + PTable... rest) { + PTypeFamily tf = first.getTypeFamily(); + PType[] components = new PType[1 + rest.length]; + components[0] = tf.collections(first.getValueType()); + for (int i = 0; i < rest.length; i++) { + components[i + 1] = rest[i].getValueType(); + } + return cogroup( + tf.tuples(components), + TupleFactory.TUPLEN, + numReducers, + first, rest); + } + + private static PTable cogroup( + PType outputType, + TupleFactory tupleFactory, + int numReducers, + PTable first, PTable... rest) { + PTypeFamily ptf = first.getTypeFamily(); + PType[] ptypes = new PType[1 + rest.length]; + ptypes[0] = first.getValueType(); + for (int i = 0; i < rest.length; i++) { + ptypes[i + 1] = rest[i].getValueType(); + } + PType itype = ptf.tuples(ptypes); + + PTable firstInter = first.mapValues("coGroupTag1", + new CogroupFn(0, 1 + rest.length), itype); - - PType, Collection>> otype = ptf.pairs(ptf.collections(leftType), - ptf.collections(rightType)); - PTable> both = cgLeft.union(cgRight); - PGroupedTable> grouped = null; + PTable[] inter = new PTable[rest.length]; + for (int i = 0; i < rest.length; i++) { + inter[i] = rest[i].mapValues("coGroupTag" + (i + 2), + new CogroupFn(i + 1, 1 + rest.length), + itype); + } + + PTable union = firstInter.union(inter); + PGroupedTable grouped = null; if (numReducers > 0) { - grouped = both.groupByKey(numReducers); + grouped = union.groupByKey(numReducers); } else { - grouped = both.groupByKey(); + grouped = union.groupByKey(); } - return grouped.mapValues("cogroup", new PostGroupFn(leftType, rightType), otype); + + return grouped.mapValues("cogroup", + new PostGroupFn(tupleFactory, ptypes), + outputType); } - - private static class CogroupFn1 extends MapFn> { - @Override - public Pair map(V v) { - return Pair.of(v, null); + + private static class CogroupFn extends MapFn { + private final int index; + private final int size; + + public CogroupFn(int index, int size) { + this.index = index; + this.size = size; } - } - private static class CogroupFn2 extends MapFn> { @Override - public Pair map(U u) { - return Pair.of(null, u); + public TupleN map(T input) { + Object[] v = new Object[size]; + v[index] = input; + return TupleN.of(v); } } - private static class PostGroupFn extends - MapFn>, Pair, Collection>> { + private static class PostGroupFn extends + MapFn, T> { - private PType ptypeV; - private PType ptypeU; + private final TupleFactory factory; + private final PType[] ptypes; - public PostGroupFn(PType ptypeV, PType ptypeU) { - this.ptypeV = ptypeV; - this.ptypeU = ptypeU; + public PostGroupFn(TupleFactory tf, PType... ptypes) { + this.factory = tf; + this.ptypes = ptypes; } @Override public void initialize() { super.initialize(); - ptypeV.initialize(getConfiguration()); - ptypeU.initialize(getConfiguration()); + for (PType pt : ptypes) { + pt.initialize(getConfiguration()); + } } @Override - public Pair, Collection> map(Iterable> input) { - Collection cv = Lists.newArrayList(); - Collection cu = Lists.newArrayList(); - for (Pair pair : input) { - if (pair.first() != null) { - cv.add(ptypeV.getDetachedValue(pair.first())); - } else if (pair.second() != null) { - cu.add(ptypeU.getDetachedValue(pair.second())); + public T map(Iterable input) { + Collection[] collections = new Collection[ptypes.length]; + for (int i = 0; i < ptypes.length; i++) { + collections[i] = Lists.newArrayList(); + } + for (TupleN t : input) { + for (int i = 0; i < ptypes.length; i++) { + if (t.get(i) != null) { + collections[i].add(ptypes[i].getDetachedValue(t.get(i))); + break; + } } } - return Pair.of(cv, cu); + return (T) factory.makeTuple(collections); } }