incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [2/2] git commit: CRUNCH-67: Fix planning for jobs that have map-side outputs
Date Wed, 19 Sep 2012 21:37:30 GMT
CRUNCH-67: Fix planning for jobs that have map-side outputs


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/28e51b6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/28e51b6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/28e51b6a

Branch: refs/heads/master
Commit: 28e51b6a4505ff406c0d9472303c28cd2e2d6aaa
Parents: 7cc16e3
Author: Josh Wills <jwills@apache.org>
Authored: Tue Sep 18 20:32:38 2012 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Tue Sep 18 20:32:38 2012 -0700

----------------------------------------------------------------------
 .../src/it/java/org/apache/crunch/CleanTextIT.java |   79 +++++++++++++++
 .../java/org/apache/crunch/impl/mr/plan/Graph.java |   15 ++-
 .../apache/crunch/impl/mr/plan/GraphBuilder.java   |    6 +-
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java    |   71 +++++++++++--
 .../org/apache/crunch/impl/mr/plan/Vertex.java     |   10 ++
 5 files changed, 161 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/28e51b6a/crunch/src/it/java/org/apache/crunch/CleanTextIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CleanTextIT.java b/crunch/src/it/java/org/apache/crunch/CleanTextIT.java
new file mode 100644
index 0000000..86a1a8d
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/CleanTextIT.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.io.Files;
+
+/**
+ *
+ */
+public class CleanTextIT {
+
+  private static final int LINES_IN_SHAKES = 3667;
+  
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+  
+  static DoFn<String, String> CLEANER = new DoFn<String, String>() {
+    @Override
+    public void process(String input, Emitter<String> emitter) {
+      emitter.emit(input.toLowerCase());
+    }
+  };
+  
+  static DoFn<String, String> SPLIT = new DoFn<String, String>() {
+    @Override
+    public void process(String input, Emitter<String> emitter) {
+      for (String word : input.split("\\S+")) {
+        if (!word.isEmpty()) {
+          emitter.emit(word);
+        }
+      }
+    }
+  };
+  
+  @Test
+  public void testMapSideOutputs() throws Exception {
+    Pipeline pipeline = new MRPipeline(CleanTextIT.class, tmpDir.getDefaultConfiguration());
+    String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
+    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
+    
+    PCollection<String> cleanShakes = shakespeare.parallelDo(CLEANER, Avros.strings());
+    File cso = tmpDir.getFile("cleanShakes");
+    cleanShakes.write(To.textFile(cso.getAbsolutePath()));
+    
+    File wc = tmpDir.getFile("wordCounts");
+    cleanShakes.parallelDo(SPLIT, Avros.strings()).count().write(To.textFile(wc.getAbsolutePath()));
+    pipeline.done();
+    
+    File cleanFile = new File(cso, "part-m-00000");
+    List<String> lines = Files.readLines(cleanFile, Charset.defaultCharset());
+    assertEquals(LINES_IN_SHAKES, lines.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/28e51b6a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java
index 93ba2bf..d634c7e 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java
@@ -14,9 +14,7 @@
  */
 package org.apache.crunch.impl.mr.plan;
 
-import java.util.Collection;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -47,13 +45,20 @@ public class Graph implements Iterable<Vertex> {
   public Vertex getVertexAt(PCollectionImpl impl) {
     return vertices.get(impl);
   }
-
-  public Vertex addVertex(PCollectionImpl impl) {
+  
+  public Vertex addVertex(PCollectionImpl impl, boolean output) {
     if (vertices.containsKey(impl)) {
-      return vertices.get(impl);
+      Vertex v = vertices.get(impl);
+      if (output) {
+        v.setOutput();
+      }
+      return v;
     }
     Vertex v = new Vertex(impl);
     vertices.put(impl, v);
+    if (output) {
+      v.setOutput();
+    }
     return v;
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/28e51b6a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java
index 7705896..7fb942f 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java
@@ -38,14 +38,14 @@ public class GraphBuilder implements PCollectionImpl.Visitor {
   }
   
   public void visitOutput(PCollectionImpl<?> output) {
-    workingVertex = graph.addVertex(output);
+    workingVertex = graph.addVertex(output, true);
     workingPath = new NodePath();
     output.accept(this);
   }
   
   @Override
   public void visitInputCollection(InputCollection<?> collection) {
-    Vertex v = graph.addVertex(collection);
+    Vertex v = graph.addVertex(collection, false);
     graph.getEdge(v, workingVertex).addNodePath(workingPath.close(collection));
   }
 
@@ -74,7 +74,7 @@ public class GraphBuilder implements PCollectionImpl.Visitor {
 
   @Override
   public void visitGroupedTable(PGroupedTableImpl<?, ?> collection) {
-    Vertex v = graph.addVertex(collection);
+    Vertex v = graph.addVertex(collection, false);
     graph.getEdge(v, workingVertex).addNodePath(workingPath.close(collection));
     workingVertex = v;
     workingPath = new NodePath(collection);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/28e51b6a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index f959f14..bca0bea 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
 public class MSCRPlanner {
@@ -91,17 +92,19 @@ public class MSCRPlanner {
     // depending on its profile.
     // For dependency handling, we only need to care about which
     // job prototype a particular GBK is assigned to.
-    Map<Vertex, JobPrototype> assignments = Maps.newHashMap();
+    Multimap<Vertex, JobPrototype> assignments = HashMultimap.create();
     for (List<Vertex> component : components) {
       assignments.putAll(constructJobPrototypes(component));
     }
     
     // Add in the job dependency information here.
-    for (Map.Entry<Vertex, JobPrototype> e : assignments.entrySet()) {
+    for (Map.Entry<Vertex, JobPrototype> e : assignments.entries()) {
       JobPrototype current = e.getValue();
       List<Vertex> parents = graph.getParents(e.getKey());
       for (Vertex parent : parents) {
-        current.addDependency(assignments.get(parent));
+        for (JobPrototype parentJobProto : assignments.get(parent)) {
+          current.addDependency(parentJobProto);
+        }
       }
     }
     
@@ -118,12 +121,14 @@ public class MSCRPlanner {
     
     for (Vertex baseVertex : baseGraph) {
       // Add all of the vertices in the base graph, but no edges (yet).
-      graph.addVertex(baseVertex.getPCollection());
+      graph.addVertex(baseVertex.getPCollection(), baseVertex.isOutput());
     }
     
     for (Edge e : baseGraph.getAllEdges()) {
-      // Add back all of the edges where neither vertex is a GBK.
-      if (!e.getHead().isGBK() && !e.getTail().isGBK()) {
+      // Add back all of the edges where neither vertex is a GBK and we do not
+      // have an output feeding into a GBK.
+      if (!(e.getHead().isGBK() && e.getTail().isGBK()) &&
+          !(e.getHead().isOutput() && e.getTail().isGBK())) {
         Vertex head = graph.getVertexAt(e.getHead().getPCollection());
         Vertex tail = graph.getVertexAt(e.getTail().getPCollection());
         graph.getEdge(head, tail).addAllNodePaths(e.getNodePaths());
@@ -134,7 +139,24 @@ public class MSCRPlanner {
       if (baseVertex.isGBK()) {
         Vertex vertex = graph.getVertexAt(baseVertex.getPCollection());
         for (Edge e : baseVertex.getIncomingEdges()) {
-          if (!e.getHead().isGBK()) {
+          if (e.getHead().isOutput()) {
+            // Execute an edge split.
+            Vertex splitTail = e.getHead();
+            PCollectionImpl<?> split = splitTail.getPCollection();
+            InputCollection<?> inputNode = handleSplitTarget(split);
+            Vertex splitHead = graph.addVertex(inputNode, false);
+            
+            // Divide up the node paths in the edge between the two GBK nodes so
+            // that each node is either owned by GBK1 -> newTail or newHead -> GBK2.
+            for (NodePath path : e.getNodePaths()) {
+              NodePath headPath = path.splitAt(split, splitHead.getPCollection());
+              graph.getEdge(vertex, splitTail).addNodePath(headPath);
+              graph.getEdge(splitHead, vertex).addNodePath(path);
+            }
+            
+            // Note the dependency between the vertices in the graph.
+            graph.markDependency(splitHead, splitTail);
+          } else if (!e.getHead().isGBK()) {
             Vertex newHead = graph.getVertexAt(e.getHead().getPCollection());
             graph.getEdge(newHead, vertex).addAllNodePaths(e.getNodePaths());
           }
@@ -144,13 +166,12 @@ public class MSCRPlanner {
             Vertex newTail = graph.getVertexAt(e.getTail().getPCollection());
             graph.getEdge(vertex, newTail).addAllNodePaths(e.getNodePaths());
           } else {
-            
             // Execute an Edge split
             Vertex newGraphTail = graph.getVertexAt(e.getTail().getPCollection());
             PCollectionImpl split = e.getSplit();
             InputCollection<?> inputNode = handleSplitTarget(split);
-            Vertex splitTail = graph.addVertex(split);
-            Vertex splitHead = graph.addVertex(inputNode);
+            Vertex splitTail = graph.addVertex(split, true);
+            Vertex splitHead = graph.addVertex(inputNode, false);
             
             // Divide up the node paths in the edge between the two GBK nodes so
             // that each node is either owned by GBK1 -> newTail or newHead -> GBK2.
@@ -170,8 +191,8 @@ public class MSCRPlanner {
     return graph;
   }
   
-  private Map<Vertex, JobPrototype> constructJobPrototypes(List<Vertex> component)
{
-    Map<Vertex, JobPrototype> assignment = Maps.newHashMap();
+  private Multimap<Vertex, JobPrototype> constructJobPrototypes(List<Vertex>
component) {
+    Multimap<Vertex, JobPrototype> assignment = HashMultimap.create();
     List<Vertex> gbks = Lists.newArrayList();
     for (Vertex v : component) {
       if (v.isGBK()) {
@@ -223,6 +244,32 @@ public class MSCRPlanner {
         }
         prototype.addReducePaths(outputPaths);
       }
+      
+      // Check for any un-assigned vertices, which should be map-side outputs
+      // that we will need to run in a map-only job.
+      HashMultimap<Target, NodePath> outputPaths = HashMultimap.create();
+      Set<Vertex> orphans = Sets.newHashSet();
+      for (Vertex v : component) {
+        if (!assignment.containsKey(v) && v.isOutput()) {
+          orphans.add(v);
+          for (Edge e : v.getIncomingEdges()) {
+            orphans.add(e.getHead());
+            for (NodePath nodePath : e.getNodePaths()) {
+              PCollectionImpl target = nodePath.tail();
+              for (Target t : outputs.get(target)) {
+                outputPaths.put(t, nodePath);
+              }
+            }
+          }
+        }
+      }
+      if (!outputPaths.isEmpty()) {
+        JobPrototype prototype = JobPrototype.createMapOnlyJob(
+            outputPaths, pipeline.createTempPath());
+        for (Vertex orphan : orphans) {
+          assignment.put(orphan, prototype);
+        }
+      }
     }
     
     return assignment;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/28e51b6a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
index db49e83..50efe6a 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
@@ -33,6 +33,8 @@ import com.google.common.collect.Sets;
  */
 public class Vertex {
   private final PCollectionImpl impl;
+  
+  private boolean output;
   private Set<Edge> incoming;
   private Set<Edge> outgoing;
   
@@ -54,6 +56,14 @@ public class Vertex {
     return impl instanceof PGroupedTableImpl;
   }
   
+  public void setOutput() {
+    this.output = true;
+  }
+  
+  public boolean isOutput() {
+    return output;
+  }
+  
   public Source getSource() {
     if (isInput()) {
       return ((InputCollection) impl).getSource();


Mime
View raw message