Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 997B9F8B5 for ; Tue, 2 Apr 2013 19:53:57 +0000 (UTC) Received: (qmail 48421 invoked by uid 500); 2 Apr 2013 19:53:57 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 48372 invoked by uid 500); 2 Apr 2013 19:53:57 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 48362 invoked by uid 99); 2 Apr 2013 19:53:57 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Apr 2013 19:53:57 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D7B2A836364; Tue, 2 Apr 2013 19:53:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: greid@apache.org To: commits@crunch.apache.org Message-Id: <3478783d1691446aa11c2b2f0980e5b8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-190 Deep-copy retained values in CoGroup Date: Tue, 2 Apr 2013 19:53:56 +0000 (UTC) Updated Branches: refs/heads/master 154c8fbd3 -> 64497fa4f CRUNCH-190 Deep-copy retained values in CoGroup Use the PType#getDetachedValue functionality to deep copy values where necessary before retaining them in the reduce portion of the CoGroup library. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/64497fa4 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/64497fa4 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/64497fa4 Branch: refs/heads/master Commit: 64497fa4f40827d740d341d5513bafff6ed20a9d Parents: 154c8fb Author: Gabriel Reid Authored: Tue Apr 2 21:48:11 2013 +0200 Committer: Gabriel Reid Committed: Tue Apr 2 21:48:11 2013 +0200 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/lib/CogroupIT.java | 47 +++++++++++++++ .../main/java/org/apache/crunch/lib/Cogroup.java | 23 ++++++- 2 files changed, 67 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/64497fa4/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java index 99950a4..af3329f 100644 --- a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java +++ b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java @@ -17,16 +17,19 @@ */ package org.apache.crunch.lib; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.nio.charset.Charset; import java.util.Collection; +import java.util.Collections; import java.util.List; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; +import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; @@ -36,16 +39,21 @@ import org.apache.crunch.fn.MapKeysFn; import org.apache.crunch.fn.MapValuesFn; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.From; +import org.apache.crunch.test.StringWrapper; +import org.apache.crunch.test.StringWrapper.StringToStringWrapperMapFn; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.avro.AvroTypeFamily; +import org.apache.crunch.types.avro.Avros; import org.apache.crunch.types.writable.WritableTypeFamily; import org.junit.Rule; import org.junit.Test; import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.io.Files; public class CogroupIT { @@ -123,4 +131,43 @@ public class CogroupIT { } assertTrue(passed); } + + static class ConstantMapFn extends MapFn { + + @Override + public StringWrapper map(StringWrapper input) { + return StringWrapper.wrap("key"); + } + + } + + @Test + public void testCogroup_CheckObjectResultOnRichObjects() throws IOException { + Pipeline pipeline = new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration()); + PTable tableA = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt")) + .parallelDo(new StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class)) + .by(new ConstantMapFn(), Avros.reflects(StringWrapper.class)); + PTable tableB = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt")) + .parallelDo(new StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class)) + .by(new ConstantMapFn(), Avros.reflects(StringWrapper.class)); + + List set1Values = Lists.newArrayList(); + List set2Values = Lists.newArrayList(); + PTable, Collection>> cogroup = Cogroup.cogroup(tableA, tableB); + for (Pair, Collection>> entry : cogroup.materialize()) { + for (StringWrapper stringWrapper : entry.second().first()) { + set1Values.add(stringWrapper.getValue()); + } + for (StringWrapper stringWrapper : entry.second().second()) { + set2Values.add(stringWrapper.getValue()); + } + } + + Collections.sort(set1Values); + Collections.sort(set2Values); + + assertEquals(ImmutableList.of("a", "b", "c", "e"), set1Values); + assertEquals(ImmutableList.of("a", "c", "d"), set2Values); + + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/64497fa4/crunch/src/main/java/org/apache/crunch/lib/Cogroup.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/Cogroup.java b/crunch/src/main/java/org/apache/crunch/lib/Cogroup.java index df1e4a5..07d873c 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/Cogroup.java +++ b/crunch/src/main/java/org/apache/crunch/lib/Cogroup.java @@ -51,7 +51,8 @@ public class Cogroup { PTable> both = cgLeft.union(cgRight); PType, Collection>> otype = ptf.pairs(ptf.collections(leftType), ptf.collections(rightType)); - return both.groupByKey().parallelDo("cogroup", new PostGroupFn(), ptf.tableOf(keyType, otype)); + return both.groupByKey().parallelDo("cogroup", + new PostGroupFn(leftType, rightType), ptf.tableOf(keyType, otype)); } private static class CogroupFn1 extends MapValuesFn> { @@ -70,6 +71,22 @@ public class Cogroup { private static class PostGroupFn extends DoFn>>, Pair, Collection>>> { + + private PType ptypeV; + private PType ptypeU; + + public PostGroupFn(PType ptypeV, PType ptypeU) { + this.ptypeV = ptypeV; + this.ptypeU = ptypeU; + } + + @Override + public void initialize() { + super.initialize(); + ptypeV.initialize(getConfiguration()); + ptypeU.initialize(getConfiguration()); + } + @Override public void process(Pair>> input, Emitter, Collection>>> emitter) { @@ -77,9 +94,9 @@ public class Cogroup { Collection cu = Lists.newArrayList(); for (Pair pair : input.second()) { if (pair.first() != null) { - cv.add(pair.first()); + cv.add(ptypeV.getDetachedValue(pair.first())); } else if (pair.second() != null) { - cu.add(pair.second()); + cu.add(ptypeU.getDetachedValue(pair.second())); } } emitter.emit(Pair.of(input.first(), Pair.of(cv, cu)));