Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C6A4510D22 for ; Tue, 16 Jul 2013 00:45:59 +0000 (UTC) Received: (qmail 48023 invoked by uid 500); 16 Jul 2013 00:45:59 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 48005 invoked by uid 500); 16 Jul 2013 00:45:59 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 47998 invoked by uid 99); 16 Jul 2013 00:45:59 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Jul 2013 00:45:59 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 43B338A87C4; Tue, 16 Jul 2013 00:45:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-237: Improper job dependencies for certain types of long pipelines Date: Tue, 16 Jul 2013 00:45:59 +0000 (UTC) Updated Branches: refs/heads/master 146f1e505 -> a3dd33f45 CRUNCH-237: Improper job dependencies for certain types of long pipelines Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/a3dd33f4 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/a3dd33f4 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/a3dd33f4 Branch: refs/heads/master Commit: a3dd33f453c0db6f9e6a45bde04e9a458546a292 Parents: 146f1e5 Author: Josh Wills Authored: Mon Jul 15 11:48:50 2013 -0700 Committer: Josh Wills Committed: Mon Jul 15 17:28:50 2013 -0700 ---------------------------------------------------------------------- .../apache/crunch/LongPipelinePlannerIT.java | 110 +++++++++++++++++++ .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 37 +++---- 2 files changed, 124 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/a3dd33f4/crunch-core/src/it/java/org/apache/crunch/LongPipelinePlannerIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/LongPipelinePlannerIT.java b/crunch-core/src/it/java/org/apache/crunch/LongPipelinePlannerIT.java new file mode 100644 index 0000000..d7a4b4d --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/LongPipelinePlannerIT.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import static org.apache.crunch.types.avro.Avros.*; + +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.materialize.MaterializableIterable; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.junit.Rule; +import org.junit.Test; + +/** + * Verifies that complex plans execute dependent jobs in the correct sequence. + */ +public class LongPipelinePlannerIT { + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + @Test + public void testMR() throws Exception { + run(new MRPipeline(LongPipelinePlannerIT.class, tmpDir.getDefaultConfiguration()), + tmpDir.copyResourceFileName("shakes.txt"), + tmpDir.getFileName("output")); + } + + public static void run(Pipeline p, String input, String output) throws Exception { + PCollection in = p.readTextFile(input); + PCollection toLower = in.parallelDo("tolower", new DoFn() { + @Override + public void process(String input, Emitter emitter) { + emitter.emit(input.toLowerCase()); + } + }, strings()); + + PTable keyedLower = toLower.parallelDo("keyed", new MapFn>() { + @Override + public Pair map(String input) { + return Pair.of(input.length(), input); + } + }, tableOf(ints(), strings())).groupByKey().ungroup(); + + PCollection iso = keyedLower.groupByKey().parallelDo("iso", new DoFn>, String>() { + @Override + public void process(Pair> input, Emitter emitter) { + for (String s : input.second()) { + emitter.emit(s); + } + } + }, strings()); + + MaterializableIterable matIt = (MaterializableIterable)iso.materialize(); + ParallelDoOptions.Builder builder = ParallelDoOptions.builder().sourceTargets((SourceTarget)matIt.getSource()); + final String collectionPath = matIt.getPath().toString(); + + PTable splitMap = keyedLower.parallelDo("split-map", + new MapFn, Pair>() { + @Override + public Pair map(Pair input) { + return input; + } + }, tableOf(ints(), strings()), builder.build()); + + PTable splitReduce = splitMap.groupByKey().parallelDo("split-reduce", + new DoFn>, Pair>() { + @Override + public void process(Pair> input, + Emitter> emitter) { + emitter.emit(Pair.of(input.first(), input.second().iterator().next())); + } + }, tableOf(ints(), strings())); + + PTable splitReduceResetKeys = splitReduce.parallelDo("reset", + new MapFn, Pair>() { + @Override + public Pair map(Pair input) { + return Pair.of(input.first() - 1, input.second()); + } + }, tableOf(ints(), strings())); + PTable intersections = splitReduceResetKeys.groupByKey().ungroup(); + + PCollection merged = intersections.values(); + PCollection upper = merged.parallelDo("toupper", new MapFn() { + @Override + public String map(String input) { + return input.toUpperCase(); + } + }, strings()); + + p.writeTextFile(upper, output); + p.done(); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/a3dd33f4/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index 5f5edb4..06ede5a 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -80,7 +80,6 @@ public class MSCRPlanner { } Multimap assignments = HashMultimap.create(); - Multimap, Vertex> protoDependency = HashMultimap.create(); while (!targetDeps.isEmpty()) { Set allTargets = Sets.newHashSet(); for (PCollectionImpl pcollect : targetDeps.keySet()) { @@ -113,45 +112,37 @@ public class MSCRPlanner { // depending on its profile. // For dependency handling, we only need to care about which // job prototype a particular GBK is assigned to. + Multimap newAssignments = HashMultimap.create(); for (List component : components) { - assignments.putAll(constructJobPrototypes(component)); + newAssignments.putAll(constructJobPrototypes(component)); } // Add in the job dependency information here. - for (Map.Entry e : assignments.entries()) { + for (Map.Entry e : newAssignments.entries()) { JobPrototype current = e.getValue(); List parents = graph.getParents(e.getKey()); for (Vertex parent : parents) { - for (JobPrototype parentJobProto : assignments.get(parent)) { + for (JobPrototype parentJobProto : newAssignments.get(parent)) { current.addDependency(parentJobProto); } } } + + // Make all of the jobs in this stage dependent on existing job + // prototypes. + for (JobPrototype newPrototype : newAssignments.values()) { + for (JobPrototype oldPrototype : assignments.values()) { + newPrototype.addDependency(oldPrototype); + } + } + assignments.putAll(newAssignments); - // Add cross-stage dependencies. + // Remove completed outputs. for (PCollectionImpl output : currentStage) { - Set targets = outputs.get(output); - Vertex vertex = graph.getVertexAt(output); - for (PCollectionImpl later : laterStage) { - if (!Sets.intersection(targets, targetDeps.get(later)).isEmpty()) { - protoDependency.put(later, vertex); - } - } targetDeps.remove(output); } } - // Cross-job dependencies. - for (Entry, Vertex> pd : protoDependency.entries()) { - Vertex d = new Vertex(pd.getKey()); - Vertex dj = pd.getValue(); - for (JobPrototype parent : assignments.get(dj)) { - for (JobPrototype child : assignments.get(d)) { - child.addDependency(parent); - } - } - } - // Finally, construct the jobs from the prototypes and return. DotfileWriter dotfileWriter = new DotfileWriter(); MRExecutor exec = new MRExecutor(jarClass, outputs, toMaterialize);