Return-Path: X-Original-To: apmail-incubator-crunch-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-crunch-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BC382E425 for ; Fri, 8 Feb 2013 16:15:47 +0000 (UTC) Received: (qmail 93888 invoked by uid 500); 8 Feb 2013 16:15:47 -0000 Delivered-To: apmail-incubator-crunch-commits-archive@incubator.apache.org Received: (qmail 93855 invoked by uid 500); 8 Feb 2013 16:15:47 -0000 Mailing-List: contact crunch-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: crunch-dev@incubator.apache.org Delivered-To: mailing list crunch-commits@incubator.apache.org Received: (qmail 93837 invoked by uid 99); 8 Feb 2013 16:15:47 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Feb 2013 16:15:47 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 195483A5F5; Fri, 8 Feb 2013 16:15:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: greid@apache.org To: crunch-commits@incubator.apache.org X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-163 Correct handling of same input in unions Message-Id: <20130208161547.195483A5F5@tyr.zones.apache.org> Date: Fri, 8 Feb 2013 16:15:47 +0000 (UTC) Updated Branches: refs/heads/master 3f9d29b33 -> 170ba8eea CRUNCH-163 Correct handling of same input in unions Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/170ba8ee Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/170ba8ee Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/170ba8ee Branch: refs/heads/master Commit: 170ba8eea530b965782fcbfefd1c5f3f236d1897 Parents: 3f9d29b Author: Gabriel Reid Authored: Thu Feb 7 23:25:20 2013 +0100 Committer: Gabriel Reid Committed: Fri Feb 8 09:55:54 2013 +0100 ---------------------------------------------------------------------- .../org/apache/crunch/UnionFromSameSourceIT.java | 132 +++++++++++++++ .../crunch/impl/mr/collect/PCollectionImpl.java | 4 +- .../java/org/apache/crunch/impl/mr/plan/Edge.java | 9 +- .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 8 +- .../org/apache/crunch/impl/mr/plan/Vertex.java | 4 +- 5 files changed, 151 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/170ba8ee/crunch/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java b/crunch/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java new file mode 100644 index 0000000..501a944 --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/UnionFromSameSourceIT.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; + +import org.apache.crunch.fn.IdentityFn; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.PTableType; +import org.apache.crunch.types.PType; +import org.apache.crunch.types.writable.Writables; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +/** + * Collection of tests re-using the same PCollection in various unions. + */ +public class UnionFromSameSourceIT { + + private static final int NUM_ELEMENTS = 4; + + @Rule + public transient TemporaryPath tmpDir = TemporaryPaths.create(); + + private Pipeline pipeline; + private PType elementType = Writables.strings(); + private PTableType tableType = Writables.tableOf(Writables.strings(), + Writables.strings()); + + @Before + public void setUp() { + pipeline = new MRPipeline(UnionFromSameSourceIT.class, tmpDir.getDefaultConfiguration()); + } + + @Test + public void testUnion_SingleRead() throws IOException { + PCollection strings = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt")); + PCollection union = strings.union(strings.parallelDo(IdentityFn. getInstance(), + strings.getPType())); + + assertEquals(NUM_ELEMENTS * 2, getCount(union)); + } + + @Test + public void testUnion_TwoReads() throws IOException { + PCollection stringsA = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt")); + PCollection stringsB = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt")); + + PCollection union = stringsA.union(stringsB); + + assertEquals(NUM_ELEMENTS * 2, getCount(union)); + } + + @Test + public void testDoubleUnion_EndingWithGBK() throws IOException { + runDoubleUnionPipeline(true); + } + + @Test + public void testDoubleUnion_EndingWithoutGBK() throws IOException { + runDoubleUnionPipeline(false); + } + + private void runDoubleUnionPipeline(boolean endWithGBK) throws IOException { + PCollection strings = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt")); + PTable tableA = strings.parallelDo("to table A", new ToTableFn(), tableType); + PTable tableB = strings.parallelDo("to table B", new ToTableFn(), tableType); + + PGroupedTable groupedTable = tableA.union(tableB).groupByKey(); + PCollection ungrouped = groupedTable.parallelDo("ungroup before union", + new FromGroupedTableFn(), elementType).union( + strings.parallelDo("fake id", IdentityFn. getInstance(), elementType)); + + PTable table = ungrouped.parallelDo("union back to table", new ToTableFn(), + tableType); + + if (endWithGBK) { + table = table.groupByKey().ungroup(); + } + + assertEquals(3 * NUM_ELEMENTS, getCount(table)); + } + + private int getCount(PCollection pcollection) { + int cnt = 0; + for (Object v : pcollection.materialize()) { + cnt++; + } + return cnt; + } + + private static class ToTableFn extends MapFn> { + + @Override + public Pair map(String input) { + return Pair.of(input, input); + } + + } + + private static class FromGroupedTableFn extends DoFn>, String> { + + @Override + public void process(Pair> input, Emitter emitter) { + for (String value : input.second()) { + emitter.emit(value); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/170ba8ee/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java index 296043f..f48308a 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java @@ -36,9 +36,9 @@ import org.apache.crunch.Pipeline; import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; import org.apache.crunch.fn.ExtractKeyFn; +import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.impl.mr.plan.DoNode; -import org.apache.crunch.io.ReadableSource; import org.apache.crunch.lib.Aggregate; import org.apache.crunch.materialize.pobject.CollectionPObject; import org.apache.crunch.types.PTableType; @@ -81,7 +81,7 @@ public abstract class PCollectionImpl implements PCollection { List> internal = Lists.newArrayList(); internal.add(this); for (PCollection collection : collections) { - internal.add((PCollectionImpl) collection); + internal.add((PCollectionImpl) collection.parallelDo(IdentityFn.getInstance(), collection.getPType())); } return new UnionCollection(internal); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/170ba8ee/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java index cf6fc37..1e59df0 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Set; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.commons.lang.builder.ReflectionToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; import org.apache.crunch.impl.mr.collect.PCollectionImpl; import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; @@ -108,11 +110,16 @@ class Edge { return false; } Edge e = (Edge) other; - return head.equals(e.head) && tail.equals(e.tail); + return head.equals(e.head) && tail.equals(e.tail) && paths.equals(e.paths); } @Override public int hashCode() { return new HashCodeBuilder().append(head).append(tail).toHashCode(); } + + @Override + public String toString() { + return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE); + } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/170ba8ee/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index 3718ec2..472505b 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -266,16 +266,19 @@ public class MSCRPlanner { assignment.put(v, prototype); } } else { + Set usedEdges = Sets.newHashSet(); for (Vertex g : gbks) { Set inputs = Sets.newHashSet(); for (Edge e : g.getIncomingEdges()) { inputs.addAll(e.getNodePaths()); + usedEdges.add(e); } JobPrototype prototype = JobPrototype.createMapReduceJob( (PGroupedTableImpl) g.getPCollection(), inputs, pipeline.createTempPath()); assignment.put(g, prototype); for (Edge e : g.getIncomingEdges()) { assignment.put(e.getHead(), prototype); + usedEdges.add(e); } HashMultimap outputPaths = HashMultimap.create(); for (Edge e : g.getOutgoingEdges()) { @@ -284,6 +287,7 @@ public class MSCRPlanner { outputPaths.putAll(t, e.getNodePaths()); } assignment.put(output, prototype); + usedEdges.add(e); } prototype.addReducePaths(outputPaths); } @@ -299,7 +303,7 @@ public class MSCRPlanner { boolean vertexHasUnassignedIncomingEdges = false; if (v.isOutput()) { for (Edge e : v.getIncomingEdges()) { - if (!assignment.containsKey(e.getHead())) { + if (!usedEdges.contains(e)) { vertexHasUnassignedIncomingEdges = true; } } @@ -308,7 +312,7 @@ public class MSCRPlanner { if (v.isOutput() && (vertexHasUnassignedIncomingEdges || !assignment.containsKey(v))) { orphans.add(v); for (Edge e : v.getIncomingEdges()) { - if (vertexHasUnassignedIncomingEdges && assignment.containsKey(e.getHead())) { + if (vertexHasUnassignedIncomingEdges && usedEdges.contains(e)) { // We've already dealt with this incoming edge continue; } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/170ba8ee/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java index 99fc8ba..f4aa668 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Set; import org.apache.commons.lang.builder.ReflectionToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; import org.apache.crunch.Source; import org.apache.crunch.impl.mr.collect.InputCollection; import org.apache.crunch.impl.mr.collect.PCollectionImpl; @@ -119,6 +120,7 @@ class Vertex { @Override public String toString() { - return ReflectionToStringBuilder.toStringExclude(this, Lists.newArrayList("outgoing", "incoming")); + return new ReflectionToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).setExcludeFieldNames( + new String[] { "outgoing", "incoming" }).toString(); } }