incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-34: Refactor the MSCRPlanner.
Date Thu, 13 Sep 2012 16:16:22 GMT
Updated Branches:
  refs/heads/master cad1b053b -> 739a4703a


CRUNCH-34: Refactor the MSCRPlanner.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/739a4703
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/739a4703
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/739a4703

Branch: refs/heads/master
Commit: 739a4703a00dca526c7d7880649fa6329c246856
Parents: cad1b05
Author: Josh Wills <jwills@apache.org>
Authored: Thu Sep 13 09:15:58 2012 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Thu Sep 13 09:15:58 2012 -0700

----------------------------------------------------------------------
 .../crunch/impl/mr/collect/UnionCollectionIT.java  |    2 -
 .../org/apache/crunch/impl/mr/plan/DoNode.java     |   12 +-
 .../java/org/apache/crunch/impl/mr/plan/Edge.java  |  118 +++++
 .../java/org/apache/crunch/impl/mr/plan/Graph.java |  125 +++++
 .../apache/crunch/impl/mr/plan/GraphBuilder.java   |   92 ++++
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java    |  376 +++++---------
 .../org/apache/crunch/impl/mr/plan/NodePath.java   |   20 +-
 .../org/apache/crunch/impl/mr/plan/Vertex.java     |  108 ++++
 pom.xml                                            |    3 +
 9 files changed, 609 insertions(+), 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/739a4703/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java b/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
index 6e70ff6..f9f73b2 100644
--- a/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
+++ b/crunch/src/it/java/org/apache/crunch/impl/mr/collect/UnionCollectionIT.java
@@ -105,11 +105,9 @@ public class UnionCollectionIT {
   }
 
   private void checkMaterialized(Iterable<String> materialized) {
-
     List<String> materializedValues = Lists.newArrayList(materialized.iterator());
     Collections.sort(materializedValues);
     LOG.info("Materialized union: " + materializedValues);
-
     assertEquals(EXPECTED, materializedValues);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/739a4703/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
index f63700e..236496b 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
@@ -104,8 +104,16 @@ public class DoNode {
   }
 
   public DoNode addChild(DoNode node) {
-    if (!children.contains(node)) {
-      this.children.add(node);
+    // TODO: This is sort of terrible, refactor the code to make this make more sense.
+    boolean exists = false;
+    for (DoNode child : children) {
+      if (node == child) {
+        exists = true;
+        break;
+      }
+    }
+    if (!exists) {
+      children.add(node);
     }
     return this;
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/739a4703/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
new file mode 100644
index 0000000..5aceb8b
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ *
+ */
+public class Edge {
+  private final Vertex head;
+  private final Vertex tail;
+  private final Set<NodePath> paths;
+  
+  public Edge(Vertex head, Vertex tail) {
+    this.head = head;
+    this.tail = tail;
+    this.paths = Sets.newHashSet();
+  }
+  
+  public Vertex getHead() {
+    return head;
+  }
+  
+  public Vertex getTail() {
+    return tail;
+  }
+
+  public void addNodePath(NodePath path) {
+    this.paths.add(path);
+  }
+  
+  public void addAllNodePaths(Collection<NodePath> paths) {
+    this.paths.addAll(paths);
+  }
+  
+  public Set<NodePath> getNodePaths() {
+    return paths;
+  }
+  
+  public PCollectionImpl getSplit() {
+    List<Iterator<PCollectionImpl<?>>> iters = Lists.newArrayList();
+    for (NodePath nodePath : paths) {
+      Iterator<PCollectionImpl<?>> iter = nodePath.iterator();
+      iter.next(); // prime this past the initial NGroupedTableImpl
+      iters.add(iter);
+    }
+
+    // Find the lowest point w/the lowest cost to be the split point for
+    // all of the dependent paths.
+    boolean end = false;
+    int splitIndex = -1;
+    while (!end) {
+      splitIndex++;
+      PCollectionImpl<?> current = null;
+      for (Iterator<PCollectionImpl<?>> iter : iters) {
+        if (iter.hasNext()) {
+          PCollectionImpl<?> next = iter.next();
+          if (next instanceof PGroupedTableImpl) {
+            end = true;
+            break;
+          } else if (current == null) {
+            current = next;
+          } else if (current != next) {
+            end = true;
+            break;
+          }
+        } else {
+          end = true;
+          break;
+        }
+      }
+    }
+    // TODO: Add costing calcs here.
+    
+    return Iterables.getFirst(paths, null).get(splitIndex);
+  }
+  
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !(other instanceof Edge)) {
+      return false;
+    }
+    Edge e = (Edge) other;
+    return head.equals(e.head) && tail.equals(e.tail);
+  }
+  
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(head).append(tail).toHashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/739a4703/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java
new file mode 100644
index 0000000..93ba2bf
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java
@@ -0,0 +1,125 @@
+/**
+ * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ *
+ */
+public class Graph implements Iterable<Vertex> {
+
+  private final Map<PCollectionImpl, Vertex> vertices;
+  private final Map<Pair<Vertex, Vertex>, Edge> edges;  
+  private final Map<Vertex, List<Vertex>> dependencies;
+  
+  public Graph() {
+    this.vertices = Maps.newHashMap();
+    this.edges = Maps.newHashMap();
+    this.dependencies = Maps.newHashMap();
+  }
+  
+  public Vertex getVertexAt(PCollectionImpl impl) {
+    return vertices.get(impl);
+  }
+
+  public Vertex addVertex(PCollectionImpl impl) {
+    if (vertices.containsKey(impl)) {
+      return vertices.get(impl);
+    }
+    Vertex v = new Vertex(impl);
+    vertices.put(impl, v);
+    return v;
+  }
+  
+  public Edge getEdge(Vertex head, Vertex tail) {
+    Pair<Vertex, Vertex> p = Pair.of(head, tail);
+    if (edges.containsKey(p)) {
+      return edges.get(p);
+    }
+    
+    Edge e = new Edge(head, tail);
+    edges.put(p, e);
+    tail.addIncoming(e);
+    head.addOutgoing(e);
+    return e;
+  }
+  
+  @Override
+  public Iterator<Vertex> iterator() {
+    return Sets.newHashSet(vertices.values()).iterator();
+  }
+
+  public Set<Edge> getAllEdges() {
+    return Sets.newHashSet(edges.values());
+  }
+  
+  public void markDependency(Vertex child, Vertex parent) {
+    List<Vertex> parents = dependencies.get(child);
+    if (parents == null) {
+      parents = Lists.newArrayList();
+      dependencies.put(child, parents);
+    }
+    parents.add(parent);
+  }
+  
+  public List<Vertex> getParents(Vertex child) {
+    if (dependencies.containsKey(child)) {
+      return dependencies.get(child);
+    }
+    return ImmutableList.of();
+  }
+  
+  public List<List<Vertex>> connectedComponents() {
+    List<List<Vertex>> components = Lists.newArrayList();
+    Set<Vertex> unassigned = Sets.newHashSet(vertices.values());
+    while (!unassigned.isEmpty()) {
+      Vertex base = unassigned.iterator().next();
+      List<Vertex> component = Lists.newArrayList();
+      component.add(base);
+      unassigned.remove(base);
+      Set<Vertex> working = Sets.newHashSet(base.getAllNeighbors());
+      while (!working.isEmpty()) {
+        Vertex n = working.iterator().next();
+        working.remove(n);
+        if (unassigned.contains(n)) {
+          component.add(n);
+          unassigned.remove(n);
+          for (Vertex n2 : n.getAllNeighbors()) {
+            if (unassigned.contains(n2)) {
+              working.add(n2);
+            }
+          }
+        }
+      }
+      components.add(component);
+    }
+    
+    return components;
+  }  
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/739a4703/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java
new file mode 100644
index 0000000..7705896
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import org.apache.crunch.impl.mr.collect.DoCollectionImpl;
+import org.apache.crunch.impl.mr.collect.DoTableImpl;
+import org.apache.crunch.impl.mr.collect.InputCollection;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+import org.apache.crunch.impl.mr.collect.UnionCollection;
+
+/**
+ *
+ */
+public class GraphBuilder implements PCollectionImpl.Visitor {
+
+  private Graph graph = new Graph();
+  private Vertex workingVertex;
+  private NodePath workingPath;
+  
+  public Graph getGraph() {
+    return graph;
+  }
+  
+  public void visitOutput(PCollectionImpl<?> output) {
+    workingVertex = graph.addVertex(output);
+    workingPath = new NodePath();
+    output.accept(this);
+  }
+  
+  @Override
+  public void visitInputCollection(InputCollection<?> collection) {
+    Vertex v = graph.addVertex(collection);
+    graph.getEdge(v, workingVertex).addNodePath(workingPath.close(collection));
+  }
+
+  @Override
+  public void visitUnionCollection(UnionCollection<?> collection) {
+    Vertex baseVertex = workingVertex;
+    NodePath basePath = workingPath;
+    for (PCollectionImpl<?> parent : collection.getParents()) {
+      workingPath = new NodePath(basePath);
+      workingVertex = baseVertex;
+      processParent(parent);
+    }
+  }
+
+  @Override
+  public void visitDoFnCollection(DoCollectionImpl<?> collection) {
+    workingPath.push(collection);
+    processParent(collection.getOnlyParent());
+  }
+
+  @Override
+  public void visitDoTable(DoTableImpl<?, ?> collection) {
+    workingPath.push(collection);
+    processParent(collection.getOnlyParent());
+  }
+
+  @Override
+  public void visitGroupedTable(PGroupedTableImpl<?, ?> collection) {
+    Vertex v = graph.addVertex(collection);
+    graph.getEdge(v, workingVertex).addNodePath(workingPath.close(collection));
+    workingVertex = v;
+    workingPath = new NodePath(collection);
+    processParent(collection.getOnlyParent());
+  }
+  
+  private void processParent(PCollectionImpl<?> parent) {
+    Vertex v = graph.getVertexAt(parent);
+    if (v == null) {
+      parent.accept(this);
+    } else {
+      graph.getEdge(v, workingVertex).addNodePath(workingPath.close(parent));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/739a4703/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 975d5a0..f959f14 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -73,168 +73,162 @@ public class MSCRPlanner {
   }
 
   public MRExecutor plan(Class<?> jarClass, Configuration conf) throws IOException
{
-    // Constructs all of the node paths, which either start w/an input
-    // or a GBK and terminate in an output collection of any type.
-    NodeVisitor visitor = new NodeVisitor();
+    // Walk the current plan tree and build a graph in which the vertices are
+    // sources, targets, and GBK operations.
+    GraphBuilder graphBuilder = new GraphBuilder();
     for (PCollectionImpl<?> output : outputs.keySet()) {
-      visitor.visitOutput(output);
+      graphBuilder.visitOutput(output);
     }
-
-    // Pull out the node paths.
-    Map<PCollectionImpl<?>, Set<NodePath>> nodePaths = visitor.getNodePaths();
-
-    // Keeps track of the dependencies from collections -> jobs and then
-    // between different jobs.
-    Map<PCollectionImpl<?>, JobPrototype> assignments = Maps.newHashMap();
-    Map<PCollectionImpl<?>, Set<JobPrototype>> jobDependencies = new HashMap<PCollectionImpl<?>,
Set<JobPrototype>>();
-
-    // Find the set of GBKs that DO NOT depend on any other GBK.
-    Set<PGroupedTableImpl<?, ?>> workingGroupings = null;
-    while (!(workingGroupings = getWorkingGroupings(nodePaths)).isEmpty()) {
-
-      for (PGroupedTableImpl<?, ?> grouping : workingGroupings) {
-        Set<NodePath> mapInputPaths = nodePaths.get(grouping);
-        JobPrototype proto = JobPrototype.createMapReduceJob(grouping, mapInputPaths, pipeline.createTempPath());
-        assignments.put(grouping, proto);
-        if (jobDependencies.containsKey(grouping)) {
-          for (JobPrototype dependency : jobDependencies.get(grouping)) {
-            proto.addDependency(dependency);
-          }
-        }
-      }
-
-      Map<PGroupedTableImpl<?, ?>, Set<NodePath>> dependencyPaths = getDependencyPaths(workingGroupings,
nodePaths);
-      for (Map.Entry<PGroupedTableImpl<?, ?>, Set<NodePath>> entry : dependencyPaths.entrySet())
{
-        PGroupedTableImpl<?, ?> grouping = entry.getKey();
-        Set<NodePath> currentNodePaths = entry.getValue();
-
-        JobPrototype proto = assignments.get(grouping);
-        Set<NodePath> gbkPaths = Sets.newHashSet();
-        for (NodePath nodePath : currentNodePaths) {
-          PCollectionImpl<?> tail = nodePath.tail();
-          if (tail instanceof PGroupedTableImpl) {
-            gbkPaths.add(nodePath);
-            if (!jobDependencies.containsKey(tail)) {
-              jobDependencies.put(tail, Sets.<JobPrototype> newHashSet());
-            }
-            jobDependencies.get(tail).add(proto);
-          }
-        }
-
-        if (!gbkPaths.isEmpty()) {
-          handleGroupingDependencies(gbkPaths, currentNodePaths);
-        }
-
-        // At this point, all of the dependencies for the working groups will be
-        // file outputs, and so we can add them all to the JobPrototype-- we now
-        // have
-        // a complete job.
-        HashMultimap<Target, NodePath> reduceOutputs = HashMultimap.create();
-        for (NodePath nodePath : currentNodePaths) {
-          assignments.put(nodePath.tail(), proto);
-          for (Target target : outputs.get(nodePath.tail())) {
-            reduceOutputs.put(target, nodePath);
-          }
-        }
-        proto.addReducePaths(reduceOutputs);
-
-        // We've processed this GBK-- remove it from the set of nodePaths we
-        // need to process in the next step.
-        nodePaths.remove(grouping);
-      }
+    Graph baseGraph = graphBuilder.getGraph();
+    
+    // Create a new graph that splits up up dependent GBK nodes.
+    Graph graph = prepareFinalGraph(baseGraph);
+    
+    // Break the graph up into connected components.
+    List<List<Vertex>> components = graph.connectedComponents();
+    
+    // For each component, we will create one or more job prototypes,
+    // depending on its profile.
+    // For dependency handling, we only need to care about which
+    // job prototype a particular GBK is assigned to.
+    Map<Vertex, JobPrototype> assignments = Maps.newHashMap();
+    for (List<Vertex> component : components) {
+      assignments.putAll(constructJobPrototypes(component));
     }
-
-    // Process any map-only jobs that are remaining.
-    if (!nodePaths.isEmpty()) {
-      for (Map.Entry<PCollectionImpl<?>, Set<NodePath>> entry : nodePaths.entrySet())
{
-        PCollectionImpl<?> collect = entry.getKey();
-        if (!assignments.containsKey(collect)) {
-          HashMultimap<Target, NodePath> mapOutputs = HashMultimap.create();
-          for (NodePath nodePath : entry.getValue()) {
-            for (Target target : outputs.get(nodePath.tail())) {
-              mapOutputs.put(target, nodePath);
-            }
-          }
-          JobPrototype proto = JobPrototype.createMapOnlyJob(mapOutputs, pipeline.createTempPath());
-
-          if (jobDependencies.containsKey(collect)) {
-            for (JobPrototype dependency : jobDependencies.get(collect)) {
-              proto.addDependency(dependency);
-            }
-          }
-          assignments.put(collect, proto);
-        }
+    
+    // Add in the job dependency information here.
+    for (Map.Entry<Vertex, JobPrototype> e : assignments.entrySet()) {
+      JobPrototype current = e.getValue();
+      List<Vertex> parents = graph.getParents(e.getKey());
+      for (Vertex parent : parents) {
+        current.addDependency(assignments.get(parent));
       }
     }
-
+    
+    // Finally, construct the jobs from the prototypes and return.
     MRExecutor exec = new MRExecutor(jarClass);
     for (JobPrototype proto : Sets.newHashSet(assignments.values())) {
       exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline));
     }
     return exec;
   }
-
-  private Map<PGroupedTableImpl<?, ?>, Set<NodePath>> getDependencyPaths(Set<PGroupedTableImpl<?,
?>> workingGroupings,
-      Map<PCollectionImpl<?>, Set<NodePath>> nodePaths) {
-    Map<PGroupedTableImpl<?, ?>, Set<NodePath>> dependencyPaths = Maps.newHashMap();
-    for (PGroupedTableImpl<?, ?> grouping : workingGroupings) {
-      dependencyPaths.put(grouping, Sets.<NodePath> newHashSet());
+  
+  private Graph prepareFinalGraph(Graph baseGraph) {
+    Graph graph = new Graph();
+    
+    for (Vertex baseVertex : baseGraph) {
+      // Add all of the vertices in the base graph, but no edges (yet).
+      graph.addVertex(baseVertex.getPCollection());
     }
-
-    // Find the targets that depend on one of the elements of the current
-    // working group.
-    for (PCollectionImpl<?> target : nodePaths.keySet()) {
-      if (!workingGroupings.contains(target)) {
-        for (NodePath nodePath : nodePaths.get(target)) {
-          if (workingGroupings.contains(nodePath.head())) {
-            dependencyPaths.get(nodePath.head()).add(nodePath);
+    
+    for (Edge e : baseGraph.getAllEdges()) {
+      // Add back all of the edges where neither vertex is a GBK.
+      if (!e.getHead().isGBK() && !e.getTail().isGBK()) {
+        Vertex head = graph.getVertexAt(e.getHead().getPCollection());
+        Vertex tail = graph.getVertexAt(e.getTail().getPCollection());
+        graph.getEdge(head, tail).addAllNodePaths(e.getNodePaths());
+      }
+    }
+    
+    for (Vertex baseVertex : baseGraph) {
+      if (baseVertex.isGBK()) {
+        Vertex vertex = graph.getVertexAt(baseVertex.getPCollection());
+        for (Edge e : baseVertex.getIncomingEdges()) {
+          if (!e.getHead().isGBK()) {
+            Vertex newHead = graph.getVertexAt(e.getHead().getPCollection());
+            graph.getEdge(newHead, vertex).addAllNodePaths(e.getNodePaths());
+          }
+        }
+        for (Edge e : baseVertex.getOutgoingEdges()) {
+          if (!e.getTail().isGBK()) {
+            Vertex newTail = graph.getVertexAt(e.getTail().getPCollection());
+            graph.getEdge(vertex, newTail).addAllNodePaths(e.getNodePaths());
+          } else {
+            
+            // Execute an Edge split
+            Vertex newGraphTail = graph.getVertexAt(e.getTail().getPCollection());
+            PCollectionImpl split = e.getSplit();
+            InputCollection<?> inputNode = handleSplitTarget(split);
+            Vertex splitTail = graph.addVertex(split);
+            Vertex splitHead = graph.addVertex(inputNode);
+            
+            // Divide up the node paths in the edge between the two GBK nodes so
+            // that each node is either owned by GBK1 -> newTail or newHead -> GBK2.
+            for (NodePath path : e.getNodePaths()) {
+              NodePath headPath = path.splitAt(split, splitHead.getPCollection());
+              graph.getEdge(vertex, splitTail).addNodePath(headPath);
+              graph.getEdge(splitHead, newGraphTail).addNodePath(path);
+            }
+            
+            // Note the dependency between the vertices in the graph.
+            graph.markDependency(splitHead, splitTail);
           }
         }
       }
     }
-    return dependencyPaths;
+    
+    return graph;
   }
-
-  private int getSplitIndex(Set<NodePath> currentNodePaths) {
-    List<Iterator<PCollectionImpl<?>>> iters = Lists.newArrayList();
-    for (NodePath nodePath : currentNodePaths) {
-      Iterator<PCollectionImpl<?>> iter = nodePath.iterator();
-      iter.next(); // prime this past the initial NGroupedTableImpl
-      iters.add(iter);
+  
+  private Map<Vertex, JobPrototype> constructJobPrototypes(List<Vertex> component)
{
+    Map<Vertex, JobPrototype> assignment = Maps.newHashMap();
+    List<Vertex> gbks = Lists.newArrayList();
+    for (Vertex v : component) {
+      if (v.isGBK()) {
+        gbks.add(v);
+      }
     }
 
-    // Find the lowest point w/the lowest cost to be the split point for
-    // all of the dependent paths.
-    boolean end = false;
-    int splitIndex = -1;
-    while (!end) {
-      splitIndex++;
-      PCollectionImpl<?> current = null;
-      for (Iterator<PCollectionImpl<?>> iter : iters) {
-        if (iter.hasNext()) {
-          PCollectionImpl<?> next = iter.next();
-          if (next instanceof PGroupedTableImpl) {
-            end = true;
-            break;
-          } else if (current == null) {
-            current = next;
-          } else if (current != next) {
-            end = true;
-            break;
+    if (gbks.isEmpty()) {
+      HashMultimap<Target, NodePath> outputPaths = HashMultimap.create();
+      for (Vertex v : component) {
+        if (v.isInput()) {
+          for (Edge e : v.getOutgoingEdges()) {
+            for (NodePath nodePath : e.getNodePaths()) {
+              PCollectionImpl target = nodePath.tail();
+              for (Target t : outputs.get(target)) {
+                outputPaths.put(t, nodePath);
+              }
+            }
           }
-        } else {
-          end = true;
-          break;
         }
       }
+      if (outputPaths.isEmpty()) {
+        throw new IllegalStateException("No outputs?");
+      }
+      JobPrototype prototype = JobPrototype.createMapOnlyJob(
+          outputPaths, pipeline.createTempPath()); 
+      for (Vertex v : component) {
+        assignment.put(v, prototype);
+      }
+    } else {
+      for (Vertex g : gbks) {
+        Set<NodePath> inputs = Sets.newHashSet();
+        for (Edge e : g.getIncomingEdges()) {
+          inputs.addAll(e.getNodePaths());
+        }
+        JobPrototype prototype = JobPrototype.createMapReduceJob(
+            (PGroupedTableImpl) g.getPCollection(), inputs, pipeline.createTempPath());
+        assignment.put(g, prototype);
+        for (Edge e : g.getIncomingEdges()) {
+          assignment.put(e.getHead(), prototype);
+        }
+        HashMultimap<Target, NodePath> outputPaths = HashMultimap.create();
+        for (Edge e : g.getOutgoingEdges()) {
+          Vertex output = e.getTail();
+          for (Target t : outputs.get(output.getPCollection())) {
+            outputPaths.putAll(t, e.getNodePaths());
+          }
+          assignment.put(output, prototype);
+        }
+        prototype.addReducePaths(outputPaths);
+      }
     }
-    // TODO: Add costing calcs here.
-    return splitIndex;
+    
+    return assignment;
   }
-
-  private void handleGroupingDependencies(Set<NodePath> gbkPaths, Set<NodePath>
currentNodePaths) throws IOException {
-    int splitIndex = getSplitIndex(currentNodePaths);
-    PCollectionImpl<?> splitTarget = currentNodePaths.iterator().next().get(splitIndex);
+  
+  private InputCollection<?> handleSplitTarget(PCollectionImpl<?> splitTarget)
{
     if (!outputs.containsKey(splitTarget)) {
       outputs.put(splitTarget, Sets.<Target> newHashSet());
     }
@@ -261,108 +255,6 @@ public class MSCRPlanner {
     outputs.get(splitTarget).add(srcTarget);
     splitTarget.materializeAt(srcTarget);
 
-    PCollectionImpl<?> inputNode = (PCollectionImpl<?>) pipeline.read(srcTarget);
-    Set<NodePath> nextNodePaths = Sets.newHashSet();
-    for (NodePath nodePath : currentNodePaths) {
-      if (gbkPaths.contains(nodePath)) {
-        nextNodePaths.add(nodePath.splitAt(splitIndex, inputNode));
-      } else {
-        nextNodePaths.add(nodePath);
-      }
-    }
-    currentNodePaths.clear();
-    currentNodePaths.addAll(nextNodePaths);
-  }
-
-  private Set<PGroupedTableImpl<?, ?>> getWorkingGroupings(Map<PCollectionImpl<?>,
Set<NodePath>> nodePaths) {
-    Set<PGroupedTableImpl<?, ?>> gbks = Sets.newHashSet();
-    for (PCollectionImpl<?> target : nodePaths.keySet()) {
-      if (target instanceof PGroupedTableImpl) {
-        boolean hasGBKDependency = false;
-        for (NodePath nodePath : nodePaths.get(target)) {
-          if (nodePath.head() instanceof PGroupedTableImpl) {
-            hasGBKDependency = true;
-            break;
-          }
-        }
-        if (!hasGBKDependency) {
-          gbks.add((PGroupedTableImpl<?, ?>) target);
-        }
-      }
-    }
-    return gbks;
-  }
-
-  private static class NodeVisitor implements PCollectionImpl.Visitor {
-
-    private final Map<PCollectionImpl<?>, Set<NodePath>> nodePaths;
-    private final Map<PCollectionImpl<?>, Source<?>> inputs;
-    private PCollectionImpl<?> workingNode;
-    private NodePath workingPath;
-
-    public NodeVisitor() {
-      this.nodePaths = new HashMap<PCollectionImpl<?>, Set<NodePath>>();
-      this.inputs = new HashMap<PCollectionImpl<?>, Source<?>>();
-    }
-
-    public Map<PCollectionImpl<?>, Set<NodePath>> getNodePaths() {
-      return nodePaths;
-    }
-
-    public void visitOutput(PCollectionImpl<?> output) {
-      nodePaths.put(output, Sets.<NodePath> newHashSet());
-      workingNode = output;
-      workingPath = new NodePath();
-      output.accept(this);
-    }
-
-    @Override
-    public void visitInputCollection(InputCollection<?> collection) {
-      workingPath.close(collection);
-      inputs.put(collection, collection.getSource());
-      nodePaths.get(workingNode).add(workingPath);
-    }
-
-    @Override
-    public void visitUnionCollection(UnionCollection<?> collection) {
-      PCollectionImpl<?> baseNode = workingNode;
-      NodePath basePath = workingPath;
-      for (PCollectionImpl<?> parent : collection.getParents()) {
-        workingPath = new NodePath(basePath);
-        workingNode = baseNode;
-        processParent(parent);
-      }
-    }
-
-    @Override
-    public void visitDoFnCollection(DoCollectionImpl<?> collection) {
-      workingPath.push(collection);
-      processParent(collection.getOnlyParent());
-    }
-
-    @Override
-    public void visitDoTable(DoTableImpl<?, ?> collection) {
-      workingPath.push(collection);
-      processParent(collection.getOnlyParent());
-    }
-
-    @Override
-    public void visitGroupedTable(PGroupedTableImpl<?, ?> collection) {
-      workingPath.close(collection);
-      nodePaths.get(workingNode).add(workingPath);
-      workingNode = collection;
-      nodePaths.put(workingNode, Sets.<NodePath> newHashSet());
-      workingPath = new NodePath(collection);
-      processParent(collection.getOnlyParent());
-    }
-
-    private void processParent(PCollectionImpl<?> parent) {
-      if (!nodePaths.containsKey(parent)) {
-        parent.accept(this);
-      } else {
-        workingPath.close(parent);
-        nodePaths.get(workingNode).add(workingPath);
-      }
-    }
-  }
+    return (InputCollection<?>) pipeline.read(srcTarget);
+  }  
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/739a4703/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
index c7a67bd..a090d93 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/NodePath.java
@@ -44,8 +44,9 @@ class NodePath implements Iterable<PCollectionImpl<?>> {
     this.path.push((PCollectionImpl<?>) stage);
   }
 
-  public void close(PCollectionImpl<?> head) {
+  public NodePath close(PCollectionImpl<?> head) {
     this.path.push(head);
+    return this;
   }
 
   public Iterator<PCollectionImpl<?>> iterator() {
@@ -103,4 +104,21 @@ class NodePath implements Iterable<PCollectionImpl<?>> {
     path = nextPath;
     return top;
   }
+  
+  public NodePath splitAt(PCollectionImpl split, PCollectionImpl<?> newHead) {
+    NodePath top = new NodePath();
+    int splitIndex = 0;
+    for (PCollectionImpl p : path) {
+      top.path.add(p);
+      if (p == split) {
+        break;
+      }
+      splitIndex++;
+    }
+    LinkedList<PCollectionImpl<?>> nextPath = Lists.newLinkedList();
+    nextPath.add(newHead);
+    nextPath.addAll(path.subList(splitIndex + 1, path.size()));
+    path = nextPath;
+    return top;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/739a4703/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
new file mode 100644
index 0000000..db49e83
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.crunch.Source;
+import org.apache.crunch.impl.mr.collect.InputCollection;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ *
+ */
+public class Vertex {
+  private final PCollectionImpl impl;
+  private Set<Edge> incoming;
+  private Set<Edge> outgoing;
+  
+  public Vertex(PCollectionImpl impl) {
+    this.impl = impl;
+    this.incoming = Sets.newHashSet();
+    this.outgoing = Sets.newHashSet();
+  }
+  
+  public PCollectionImpl getPCollection() {
+    return impl;
+  }
+  
+  public boolean isInput() {
+    return impl instanceof InputCollection;
+  }
+  
+  public boolean isGBK() {
+    return impl instanceof PGroupedTableImpl;
+  }
+  
+  public Source getSource() {
+    if (isInput()) {
+      return ((InputCollection) impl).getSource();
+    }
+    return null;
+  }
+  
+  public void addIncoming(Edge edge) {
+    this.incoming.add(edge);
+  }
+  
+  public void addOutgoing(Edge edge) {
+    this.outgoing.add(edge);
+  }
+  
+  public List<Vertex> getAllNeighbors() {
+    List<Vertex> n = Lists.newArrayList();
+    for (Edge e : incoming) {
+      n.add(e.getHead());
+    }
+    for (Edge e : outgoing) {
+      n.add(e.getTail());
+    }
+    return n;
+  }
+  
+  public Set<Edge> getAllEdges() {
+    return Sets.union(incoming, outgoing);
+  }
+  
+  public Set<Edge> getIncomingEdges() {
+    return incoming;
+  }
+  
+  public Set<Edge> getOutgoingEdges() {
+    return outgoing;
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || !(obj instanceof Vertex)) {
+      return false;
+    }
+    Vertex other = (Vertex) obj;
+    return impl.equals(other.impl);
+  }
+  
+  @Override
+  public int hashCode() {
+    return 17 + 37 * impl.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/739a4703/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 792bef0..edc1e0e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -635,6 +635,9 @@ under the License.
           <version>2.12</version>
           <configuration>
             <argLine>-Xmx512m</argLine>
+            <includes>
+              <include>**/*IT.java</include>
+            </includes>
           </configuration>
         </plugin>
         <plugin>


Mime
View raw message