Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 81B1510AB2 for ; Thu, 21 Nov 2013 20:56:20 +0000 (UTC) Received: (qmail 43156 invoked by uid 500); 21 Nov 2013 20:56:20 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 43125 invoked by uid 500); 21 Nov 2013 20:56:20 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 43118 invoked by uid 99); 21 Nov 2013 20:56:20 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Nov 2013 20:56:20 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B14FE89BD09; Thu, 21 Nov 2013 20:56:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-294: Cost-based planning with materialize as breakpoint. Date: Thu, 21 Nov 2013 20:56:19 +0000 (UTC) Updated Branches: refs/heads/master 1381165fb -> 12dea675b CRUNCH-294: Cost-based planning with materialize as breakpoint. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/12dea675 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/12dea675 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/12dea675 Branch: refs/heads/master Commit: 12dea675bf50ee767a86870dfcda744818ecb332 Parents: 1381165 Author: Josh Wills Authored: Wed Nov 20 13:19:02 2013 -0800 Committer: Josh Wills Committed: Wed Nov 20 16:22:58 2013 -0800 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/BreakpointIT.java | 129 +++++++++++++++++++ .../src/main/java/org/apache/crunch/DoFn.java | 2 +- .../org/apache/crunch/impl/mr/MRPipeline.java | 2 +- .../crunch/impl/mr/collect/PCollectionImpl.java | 22 +++- .../crunch/impl/mr/collect/UnionCollection.java | 9 ++ .../crunch/impl/mr/collect/UnionTable.java | 8 ++ .../org/apache/crunch/impl/mr/plan/Edge.java | 119 ++++++++++++----- .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 20 ++- 8 files changed, 258 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java b/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java new file mode 100644 index 0000000..790f049 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.io.To; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.writable.Writables; +import org.junit.Rule; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class BreakpointIT { + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testNoBreakpoint() throws Exception { + run(new MRPipeline(BreakpointIT.class, tmpDir.getDefaultConfiguration()), + tmpDir.copyResourceFileName("shakes.txt"), + tmpDir.getFileName("out1"), + tmpDir.getFileName("out2"), + false); + } + + @Test + public void testBreakpoint() throws Exception { + run(new MRPipeline(BreakpointIT.class, tmpDir.getDefaultConfiguration()), + tmpDir.copyResourceFileName("shakes.txt"), + tmpDir.getFileName("out1"), + tmpDir.getFileName("out2"), + true); + } + + public static void run(Pipeline pipeline, String input, String out1, String out2, boolean breakpoint) + throws Exception { + + // Read a line from a file to get a PCollection. + PCollection pCol1 = pipeline.read(From.textFile(input)); + + // Create a PTable from PCollection + PTable pTable1 = pCol1.parallelDo(new DoFn>() { + @Override + public void process(final String s, final Emitter> emitter) { + for (int i = 0; i < 10; i++) { + emitter.emit(new Pair(s, i)); + } + } + }, Writables.tableOf(Writables.strings(), Writables.ints())); + + // Do a groupByKey + PGroupedTable pGrpTable1 = pTable1.groupByKey(); + + // Select from PGroupedTable + PTable selectFromPTable1 = pGrpTable1.parallelDo( + new DoFn>, Pair>() { + @Override + public void process(final Pair> input, + final Emitter> emitter) { + emitter.emit(new Pair(input.first(), input.second().iterator().next())); + } + }, Writables.tableOf(Writables.strings(), Writables.ints())); + + // Process selectFromPTable1 once + final PTable pTable2 = selectFromPTable1.parallelDo(new DoFn, Pair>() { + @Override + public void process(final Pair input, final Emitter> emitter) { + final Integer newInt = input.second() + 5; + increment("job", "table2"); + emitter.emit(new Pair(newInt.toString(), input.first())); + } + }, Writables.tableOf(Writables.strings(), Writables.strings())); + + // Process selectFromPTable1 once more + PTable pTable3 = selectFromPTable1.parallelDo(new DoFn, Pair>() { + @Override + public void process(final Pair input, final Emitter> emitter) { + final Integer newInt = input.second() + 10; + increment("job", "table3"); + emitter.emit(new Pair(newInt.toString(), input.first())); + } + }, Writables.tableOf(Writables.strings(), Writables.strings())); + + // Union pTable2 and pTable3 and set a breakpoint + PTable pTable4 = pTable2.union(pTable3); + if (breakpoint) { + pTable4.materialize(); + } + + // Write keys + pTable4.keys().write(To.textFile(out1)); + + // Group values + final PGroupedTable pGrpTable3 = pTable4.groupByKey(); + + // Write values + pGrpTable3.ungroup().write(To.textFile(out2)); + + PipelineExecution pe = pipeline.runAsync(); + // Count the number of map processing steps in this pipeline + int mapsCount = 0; + for (String line : pe.getPlanDotFile().split("\n")) { + if (line.contains(" subgraph ") && line.contains("-map\" {")) { + mapsCount++; + } + } + assertEquals(breakpoint ? 1 : 2, mapsCount); + pe.waitUntilDone(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/DoFn.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/DoFn.java b/crunch-core/src/main/java/org/apache/crunch/DoFn.java index 6ae89a4..a052d09 100644 --- a/crunch-core/src/main/java/org/apache/crunch/DoFn.java +++ b/crunch-core/src/main/java/org/apache/crunch/DoFn.java @@ -116,7 +116,7 @@ public abstract class DoFn implements Serializable { * resulting {@code PCollection} should override this method. */ public float scaleFactor() { - return 1.2f; + return 0.99f; } /** http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index 4fb2876..ff95b91 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -244,7 +244,7 @@ public class MRPipeline implements Pipeline { @Override public Iterable materialize(PCollection pcollection) { - + ((PCollectionImpl) pcollection).setBreakpoint(); PCollectionImpl pcollectionImpl = toPcollectionImpl(pcollection); ReadableSource readableSrc = getMaterializeSourceTarget(pcollectionImpl); http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java index b82c883..191b11e 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java @@ -59,7 +59,9 @@ public abstract class PCollectionImpl implements PCollection { private boolean materialized; protected SourceTarget materializedAt; protected final ParallelDoOptions doOptions; - + private long size = -1L; + private boolean breakpoint; + public PCollectionImpl(String name) { this(name, ParallelDoOptions.builder().build()); } @@ -158,6 +160,14 @@ public abstract class PCollectionImpl implements PCollection { return getPipeline().materialize(this); } + public void setBreakpoint() { + this.breakpoint = true; + } + + public boolean isBreakpoint() { + return breakpoint; + } + /** {@inheritDoc} */ @Override public PObject> asCollection() { @@ -170,6 +180,7 @@ public abstract class PCollectionImpl implements PCollection { public void materializeAt(SourceTarget sourceTarget) { this.materializedAt = sourceTarget; + this.size = materializedAt.getSize(getPipeline().getConfiguration()); } @Override @@ -299,13 +310,10 @@ public abstract class PCollectionImpl implements PCollection { @Override public long getSize() { - if (materializedAt != null) { - long sz = materializedAt.getSize(getPipeline().getConfiguration()); - if (sz > 0) { - return sz; - } + if (size < 0) { + this.size = getSizeInternal(); } - return getSizeInternal(); + return size; } protected abstract long getSizeInternal(); http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java index 4a69d96..e6c95bb 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java @@ -20,6 +20,7 @@ package org.apache.crunch.impl.mr.collect; import java.util.List; import com.google.common.collect.Lists; +import org.apache.crunch.PCollection; import org.apache.crunch.ReadableData; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.impl.mr.plan.DoNode; @@ -60,6 +61,14 @@ public class UnionCollection extends PCollectionImpl { } @Override + public void setBreakpoint() { + super.setBreakpoint(); + for (PCollectionImpl parent : parents) { + parent.setBreakpoint(); + } + } + + @Override protected long getSizeInternal() { return size; } http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java index b6a26d5..b4144e4 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java @@ -90,6 +90,14 @@ public class UnionTable extends PTableBase { } @Override + public void setBreakpoint() { + super.setBreakpoint(); + for (PCollectionImpl> parent : parents) { + parent.setBreakpoint(); + } + } + + @Override protected void acceptInternal(PCollectionImpl.Visitor visitor) { visitor.visitUnionCollection(new UnionCollection>(parents)); } http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java index 6eb50eb..8f99a0b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java @@ -18,17 +18,19 @@ package org.apache.crunch.impl.mr.plan; import java.util.Collection; -import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; +import com.google.common.collect.Maps; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.lang.builder.ReflectionToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; +import org.apache.crunch.SourceTarget; +import org.apache.crunch.Target; import org.apache.crunch.impl.mr.collect.PCollectionImpl; import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -62,43 +64,94 @@ class Edge { public Set getNodePaths() { return paths; } - - public PCollectionImpl getSplit() { - List>> iters = Lists.newArrayList(); - for (NodePath nodePath : paths) { - Iterator> iter = nodePath.iterator(); - iter.next(); // prime this past the initial NGroupedTableImpl - iters.add(iter); + + private static boolean readWriteOutput(PCollectionImpl pc, Map, Set> outputs) { + if (outputs.containsKey(pc)) { + for (Target t : outputs.get(pc)) { + if (t instanceof SourceTarget || t.asSourceTarget(pc.getPType()) != null) { + return true; + } + } } + return false; + } - // Find the lowest point w/the lowest cost to be the split point for - // all of the dependent paths. - boolean end = false; - int splitIndex = -1; - while (!end) { - splitIndex++; - PCollectionImpl current = null; - for (Iterator> iter : iters) { - if (iter.hasNext()) { - PCollectionImpl next = iter.next(); - if (next instanceof PGroupedTableImpl) { - end = true; - break; - } else if (current == null) { - current = next; - } else if (current != next) { - end = true; - break; + public Map getSplitPoints(Map, Set> outputs) { + List np = Lists.newArrayList(paths); + List> smallestOverallPerPath = Lists.newArrayListWithExpectedSize(np.size()); + Map, Set> pathCounts = Maps.newHashMap(); + Map splitPoints = Maps.newHashMap(); + for (int i = 0; i < np.size(); i++) { + long bestSize = Long.MAX_VALUE; + boolean breakpoint = false; + PCollectionImpl best = null; + for (PCollectionImpl pc : np.get(i)) { + if (!(pc instanceof PGroupedTableImpl)) { + if (pc.isBreakpoint()) { + if (!breakpoint || pc.getSize() < bestSize) { + best = pc; + bestSize = pc.getSize(); + breakpoint = true; + } + } else if (!breakpoint && pc.getSize() < bestSize) { + best = pc; + bestSize = pc.getSize(); } - } else { - end = true; - break; + Set cnts = pathCounts.get(pc); + if (cnts == null) { + cnts = Sets.newHashSet(); + pathCounts.put(pc, cnts); + } + cnts.add(i); + } + } + smallestOverallPerPath.add(best); + if (breakpoint) { + splitPoints.put(np.get(i), best); + } + } + + Set missing = Sets.newHashSet(); + for (int i = 0; i < np.size(); i++) { + if (!splitPoints.containsKey(np.get(i))) { + missing.add(i); + } + } + if (missing.isEmpty()) { + return splitPoints; + } else { + // Need to either choose the smallest collection from each missing path, + // or the smallest single collection that is on all paths as the split target. + Set> smallest = Sets.newHashSet(); + long smallestSize = 0; + for (Integer id : missing) { + PCollectionImpl s = smallestOverallPerPath.get(id); + if (!smallest.contains(s)) { + smallest.add(s); + smallestSize += s.getSize(); + } + } + + PCollectionImpl singleBest = null; + long singleSmallestSize = Long.MAX_VALUE; + for (Map.Entry, Set> e : pathCounts.entrySet()) { + if (Sets.difference(missing, e.getValue()).isEmpty() && e.getKey().getSize() < singleSmallestSize) { + singleBest = e.getKey(); + singleSmallestSize = singleBest.getSize(); + } + } + + if (smallestSize < singleSmallestSize) { + for (Integer id : missing) { + splitPoints.put(np.get(id), smallestOverallPerPath.get(id)); + } + } else { + for (Integer id : missing) { + splitPoints.put(np.get(id), singleBest); } } } - // TODO: Add costing calcs here. - - return Iterables.getFirst(paths, null).get(splitIndex); + return splitPoints; } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index 1e0793c..ac61fec 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -224,21 +224,19 @@ public class MSCRPlanner { } else { // Execute an Edge split Vertex newGraphTail = graph.getVertexAt(e.getTail().getPCollection()); - PCollectionImpl split = e.getSplit(); - InputCollection inputNode = handleSplitTarget(split); - Vertex splitTail = graph.addVertex(split, true); - Vertex splitHead = graph.addVertex(inputNode, false); - - // Divide up the node paths in the edge between the two GBK nodes so - // that each node is either owned by GBK1 -> newTail or newHead -> GBK2. - for (NodePath path : e.getNodePaths()) { + Map splitPoints = e.getSplitPoints(outputs); + for (Map.Entry s : splitPoints.entrySet()) { + NodePath path = s.getKey(); + PCollectionImpl split = s.getValue(); + InputCollection inputNode = handleSplitTarget(split); + Vertex splitTail = graph.addVertex(split, true); + Vertex splitHead = graph.addVertex(inputNode, false); NodePath headPath = path.splitAt(split, splitHead.getPCollection()); graph.getEdge(vertex, splitTail).addNodePath(headPath); graph.getEdge(splitHead, newGraphTail).addNodePath(path); + // Note the dependency between the vertices in the graph. + graph.markDependency(splitHead, splitTail); } - - // Note the dependency between the vertices in the graph. - graph.markDependency(splitHead, splitTail); } } }