crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-237: Improper job dependencies for certain types of long pipelines
Date Tue, 16 Jul 2013 00:45:59 GMT
Updated Branches:
  refs/heads/master 146f1e505 -> a3dd33f45


CRUNCH-237: Improper job dependencies for certain types of long pipelines


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/a3dd33f4
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/a3dd33f4
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/a3dd33f4

Branch: refs/heads/master
Commit: a3dd33f453c0db6f9e6a45bde04e9a458546a292
Parents: 146f1e5
Author: Josh Wills <jwills@apache.org>
Authored: Mon Jul 15 11:48:50 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Mon Jul 15 17:28:50 2013 -0700

----------------------------------------------------------------------
 .../apache/crunch/LongPipelinePlannerIT.java    | 110 +++++++++++++++++++
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java |  37 +++----
 2 files changed, 124 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/a3dd33f4/crunch-core/src/it/java/org/apache/crunch/LongPipelinePlannerIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/LongPipelinePlannerIT.java b/crunch-core/src/it/java/org/apache/crunch/LongPipelinePlannerIT.java
new file mode 100644
index 0000000..d7a4b4d
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/LongPipelinePlannerIT.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.apache.crunch.types.avro.Avros.*;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Verifies that complex plans execute dependent jobs in the correct sequence.
+ */
+public class LongPipelinePlannerIT {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+  
+  @Test
+  public void testMR() throws Exception {
+    run(new MRPipeline(LongPipelinePlannerIT.class, tmpDir.getDefaultConfiguration()),
+        tmpDir.copyResourceFileName("shakes.txt"),
+        tmpDir.getFileName("output"));
+  }
+  
+  public static void run(Pipeline p, String input, String output) throws Exception {
+    PCollection<String> in = p.readTextFile(input);
+    PCollection<String> toLower = in.parallelDo("tolower", new DoFn<String, String>()
{
+      @Override
+      public void process(String input, Emitter<String> emitter) {
+        emitter.emit(input.toLowerCase());
+      }
+    }, strings());
+
+    PTable<Integer, String> keyedLower = toLower.parallelDo("keyed", new MapFn<String,
Pair<Integer, String>>() {
+      @Override
+      public Pair<Integer, String> map(String input) {
+        return Pair.of(input.length(), input);
+      }
+    }, tableOf(ints(), strings())).groupByKey().ungroup();
+
+    PCollection<String> iso = keyedLower.groupByKey().parallelDo("iso", new DoFn<Pair<Integer,
Iterable<String>>, String>() {
+      @Override
+      public void process(Pair<Integer, Iterable<String>> input, Emitter<String>
emitter) {
+        for (String s : input.second()) {
+          emitter.emit(s);
+        }
+      } 
+    }, strings());
+
+    MaterializableIterable matIt = (MaterializableIterable)iso.materialize();
+    ParallelDoOptions.Builder builder = ParallelDoOptions.builder().sourceTargets((SourceTarget)matIt.getSource());
+    final String collectionPath = matIt.getPath().toString();
+
+    PTable<Integer, String> splitMap = keyedLower.parallelDo("split-map",
+        new MapFn<Pair<Integer, String>, Pair<Integer, String>>() {
+      @Override
+      public Pair<Integer, String> map(Pair<Integer, String> input) {
+        return input;
+      }
+    }, tableOf(ints(), strings()), builder.build());
+
+    PTable<Integer, String> splitReduce = splitMap.groupByKey().parallelDo("split-reduce",
+        new DoFn<Pair<Integer, Iterable<String>>, Pair<Integer, String>>()
{
+      @Override
+      public void process(Pair<Integer, Iterable<String>> input,
+          Emitter<Pair<Integer, String>> emitter) {
+        emitter.emit(Pair.of(input.first(), input.second().iterator().next()));
+      }
+    }, tableOf(ints(), strings()));
+
+    PTable<Integer, String> splitReduceResetKeys = splitReduce.parallelDo("reset",
+        new MapFn<Pair<Integer, String>, Pair<Integer, String>>() {
+      @Override
+      public Pair<Integer, String> map(Pair<Integer, String> input) {
+        return Pair.of(input.first() - 1, input.second());
+      }
+    }, tableOf(ints(), strings()));
+    PTable<Integer, String> intersections = splitReduceResetKeys.groupByKey().ungroup();
+
+    PCollection<String> merged = intersections.values();
+    PCollection<String> upper = merged.parallelDo("toupper", new MapFn<String, String>()
{
+      @Override
+      public String map(String input) {
+        return input.toUpperCase();
+      }
+    }, strings());
+
+    p.writeTextFile(upper, output);
+    p.done();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a3dd33f4/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 5f5edb4..06ede5a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -80,7 +80,6 @@ public class MSCRPlanner {
     }
     
     Multimap<Vertex, JobPrototype> assignments = HashMultimap.create();
-    Multimap<PCollectionImpl<?>, Vertex> protoDependency = HashMultimap.create();
     while (!targetDeps.isEmpty()) {
       Set<Target> allTargets = Sets.newHashSet();
       for (PCollectionImpl<?> pcollect : targetDeps.keySet()) {
@@ -113,45 +112,37 @@ public class MSCRPlanner {
       // depending on its profile.
       // For dependency handling, we only need to care about which
       // job prototype a particular GBK is assigned to.
+      Multimap<Vertex, JobPrototype> newAssignments = HashMultimap.create();
       for (List<Vertex> component : components) {
-        assignments.putAll(constructJobPrototypes(component));
+        newAssignments.putAll(constructJobPrototypes(component));
       }
 
       // Add in the job dependency information here.
-      for (Map.Entry<Vertex, JobPrototype> e : assignments.entries()) {
+      for (Map.Entry<Vertex, JobPrototype> e : newAssignments.entries()) {
         JobPrototype current = e.getValue();
         List<Vertex> parents = graph.getParents(e.getKey());
         for (Vertex parent : parents) {
-          for (JobPrototype parentJobProto : assignments.get(parent)) {
+          for (JobPrototype parentJobProto : newAssignments.get(parent)) {
             current.addDependency(parentJobProto);
           }
         }
       }
+
+      // Make all of the jobs in this stage dependent on existing job
+      // prototypes.
+      for (JobPrototype newPrototype : newAssignments.values()) {
+        for (JobPrototype oldPrototype : assignments.values()) {
+          newPrototype.addDependency(oldPrototype);
+        }
+      }
+      assignments.putAll(newAssignments);
       
-      // Add cross-stage dependencies.
+      // Remove completed outputs.
       for (PCollectionImpl<?> output : currentStage) {
-        Set<Target> targets = outputs.get(output);
-        Vertex vertex = graph.getVertexAt(output);
-        for (PCollectionImpl<?> later : laterStage) {
-          if (!Sets.intersection(targets, targetDeps.get(later)).isEmpty()) {
-            protoDependency.put(later, vertex);
-          }
-        }
         targetDeps.remove(output);
       }
     }
     
-    // Cross-job dependencies.
-    for (Entry<PCollectionImpl<?>, Vertex> pd : protoDependency.entries()) {
-      Vertex d = new Vertex(pd.getKey());
-      Vertex dj = pd.getValue();
-      for (JobPrototype parent : assignments.get(dj)) {
-        for (JobPrototype child : assignments.get(d)) {
-          child.addDependency(parent);
-        }
-      }
-    }
-    
     // Finally, construct the jobs from the prototypes and return.
     DotfileWriter dotfileWriter = new DotfileWriter();
     MRExecutor exec = new MRExecutor(jarClass, outputs, toMaterialize);


Mime
View raw message