crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-294: Cost-based planning with materialize as breakpoint.
Date Thu, 21 Nov 2013 20:56:19 GMT
Updated Branches:
  refs/heads/master 1381165fb -> 12dea675b


CRUNCH-294: Cost-based planning with materialize as breakpoint.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/12dea675
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/12dea675
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/12dea675

Branch: refs/heads/master
Commit: 12dea675bf50ee767a86870dfcda744818ecb332
Parents: 1381165
Author: Josh Wills <jwills@apache.org>
Authored: Wed Nov 20 13:19:02 2013 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Wed Nov 20 16:22:58 2013 -0800

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/BreakpointIT.java | 129 +++++++++++++++++++
 .../src/main/java/org/apache/crunch/DoFn.java   |   2 +-
 .../org/apache/crunch/impl/mr/MRPipeline.java   |   2 +-
 .../crunch/impl/mr/collect/PCollectionImpl.java |  22 +++-
 .../crunch/impl/mr/collect/UnionCollection.java |   9 ++
 .../crunch/impl/mr/collect/UnionTable.java      |   8 ++
 .../org/apache/crunch/impl/mr/plan/Edge.java    | 119 ++++++++++++-----
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java |  20 ++-
 8 files changed, 258 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java b/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java
new file mode 100644
index 0000000..790f049
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/BreakpointIT.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class BreakpointIT {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test
+  public void testNoBreakpoint() throws Exception {
+    run(new MRPipeline(BreakpointIT.class, tmpDir.getDefaultConfiguration()),
+        tmpDir.copyResourceFileName("shakes.txt"),
+        tmpDir.getFileName("out1"),
+        tmpDir.getFileName("out2"),
+        false);
+  }
+
+  @Test
+  public void testBreakpoint() throws Exception {
+    run(new MRPipeline(BreakpointIT.class, tmpDir.getDefaultConfiguration()),
+        tmpDir.copyResourceFileName("shakes.txt"),
+        tmpDir.getFileName("out1"),
+        tmpDir.getFileName("out2"),
+        true);
+  }
+
+  public static void run(Pipeline pipeline, String input, String out1, String out2, boolean
breakpoint)
+      throws Exception {
+
+    // Read a line from a file to get a PCollection.
+    PCollection<String> pCol1 = pipeline.read(From.textFile(input));
+
+    // Create a PTable from PCollection
+    PTable<String, Integer> pTable1 = pCol1.parallelDo(new DoFn<String, Pair<String,
Integer>>() {
+      @Override
+      public void process(final String s, final Emitter<Pair<String, Integer>>
emitter) {
+        for (int i = 0; i < 10; i++) {
+          emitter.emit(new Pair<String, Integer>(s, i));
+        }
+      }
+    }, Writables.tableOf(Writables.strings(), Writables.ints()));
+
+    // Do a groupByKey
+    PGroupedTable<String, Integer> pGrpTable1 = pTable1.groupByKey();
+
+    // Select from PGroupedTable
+    PTable<String, Integer> selectFromPTable1 = pGrpTable1.parallelDo(
+        new DoFn<Pair<String, Iterable<Integer>>, Pair<String, Integer>>()
{
+          @Override
+          public void process(final Pair<String, Iterable<Integer>> input,
+                              final Emitter<Pair<String, Integer>> emitter) {
+            emitter.emit(new Pair<String, Integer>(input.first(), input.second().iterator().next()));
+          }
+        }, Writables.tableOf(Writables.strings(), Writables.ints()));
+
+    // Process selectFromPTable1 once
+    final PTable<String, String> pTable2 = selectFromPTable1.parallelDo(new DoFn<Pair<String,
Integer>, Pair<String, String>>() {
+      @Override
+      public void process(final Pair<String, Integer> input, final Emitter<Pair<String,
String>> emitter) {
+        final Integer newInt = input.second() + 5;
+        increment("job", "table2");
+        emitter.emit(new Pair<String, String>(newInt.toString(), input.first()));
+      }
+    }, Writables.tableOf(Writables.strings(), Writables.strings()));
+
+    // Process selectFromPTable1 once more
+    PTable<String, String> pTable3 = selectFromPTable1.parallelDo(new DoFn<Pair<String,
Integer>, Pair<String, String>>() {
+      @Override
+      public void process(final Pair<String, Integer> input, final Emitter<Pair<String,
String>> emitter) {
+        final Integer newInt = input.second() + 10;
+        increment("job", "table3");
+        emitter.emit(new Pair<String, String>(newInt.toString(), input.first()));
+      }
+    }, Writables.tableOf(Writables.strings(), Writables.strings()));
+
+    // Union pTable2 and pTable3 and set a breakpoint
+    PTable<String, String> pTable4 = pTable2.union(pTable3);
+    if (breakpoint) {
+      pTable4.materialize();
+    }
+
+    // Write keys
+    pTable4.keys().write(To.textFile(out1));
+
+    // Group values
+    final PGroupedTable<String, String> pGrpTable3 = pTable4.groupByKey();
+
+    // Write values
+    pGrpTable3.ungroup().write(To.textFile(out2));
+
+    PipelineExecution pe = pipeline.runAsync();
+    // Count the number of map processing steps in this pipeline
+    int mapsCount = 0;
+    for (String line : pe.getPlanDotFile().split("\n")) {
+      if (line.contains(" subgraph ") && line.contains("-map\" {")) {
+        mapsCount++;
+      }
+    }
+    assertEquals(breakpoint ? 1 : 2, mapsCount);
+    pe.waitUntilDone();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/DoFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/DoFn.java b/crunch-core/src/main/java/org/apache/crunch/DoFn.java
index 6ae89a4..a052d09 100644
--- a/crunch-core/src/main/java/org/apache/crunch/DoFn.java
+++ b/crunch-core/src/main/java/org/apache/crunch/DoFn.java
@@ -116,7 +116,7 @@ public abstract class DoFn<S, T> implements Serializable {
    * resulting {@code PCollection} should override this method.
    */
   public float scaleFactor() {
-    return 1.2f;
+    return 0.99f;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index 4fb2876..ff95b91 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -244,7 +244,7 @@ public class MRPipeline implements Pipeline {
 
   @Override
   public <T> Iterable<T> materialize(PCollection<T> pcollection) {
-
+    ((PCollectionImpl) pcollection).setBreakpoint();
     PCollectionImpl<T> pcollectionImpl = toPcollectionImpl(pcollection);
     ReadableSource<T> readableSrc = getMaterializeSourceTarget(pcollectionImpl);
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index b82c883..191b11e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -59,7 +59,9 @@ public abstract class PCollectionImpl<S> implements PCollection<S>
{
   private boolean materialized;
   protected SourceTarget<S> materializedAt;
   protected final ParallelDoOptions doOptions;
-  
+  private long size = -1L;
+  private boolean breakpoint;
+
   public PCollectionImpl(String name) {
     this(name, ParallelDoOptions.builder().build());
   }
@@ -158,6 +160,14 @@ public abstract class PCollectionImpl<S> implements PCollection<S>
{
     return getPipeline().materialize(this);
   }
 
+  public void setBreakpoint() {
+    this.breakpoint = true;
+  }
+
+  public boolean isBreakpoint() {
+    return breakpoint;
+  }
+
   /** {@inheritDoc} */
   @Override
   public PObject<Collection<S>> asCollection() {
@@ -170,6 +180,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S>
{
 
   public void materializeAt(SourceTarget<S> sourceTarget) {
     this.materializedAt = sourceTarget;
+    this.size = materializedAt.getSize(getPipeline().getConfiguration());
   }
 
   @Override
@@ -299,13 +310,10 @@ public abstract class PCollectionImpl<S> implements PCollection<S>
{
 
   @Override
   public long getSize() {
-    if (materializedAt != null) {
-      long sz = materializedAt.getSize(getPipeline().getConfiguration());
-      if (sz > 0) {
-        return sz;
-      }
+    if (size < 0) {
+      this.size = getSizeInternal();
     }
-    return getSizeInternal();
+    return size;
   }
 
   protected abstract long getSizeInternal();

http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
index 4a69d96..e6c95bb 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionCollection.java
@@ -20,6 +20,7 @@ package org.apache.crunch.impl.mr.collect;
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.apache.crunch.PCollection;
 import org.apache.crunch.ReadableData;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.plan.DoNode;
@@ -60,6 +61,14 @@ public class UnionCollection<S> extends PCollectionImpl<S>
{
   }
 
   @Override
+  public void setBreakpoint() {
+    super.setBreakpoint();
+    for (PCollectionImpl<S> parent : parents) {
+      parent.setBreakpoint();
+    }
+  }
+
+  @Override
   protected long getSizeInternal() {
     return size;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
index b6a26d5..b4144e4 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/UnionTable.java
@@ -90,6 +90,14 @@ public class UnionTable<K, V> extends PTableBase<K, V> {
   }
 
   @Override
+  public void setBreakpoint() {
+    super.setBreakpoint();
+    for (PCollectionImpl<Pair<K, V>> parent : parents) {
+      parent.setBreakpoint();
+    }
+  }
+
+  @Override
   protected void acceptInternal(PCollectionImpl.Visitor visitor) {
     visitor.visitUnionCollection(new UnionCollection<Pair<K, V>>(parents));
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
index 6eb50eb..8f99a0b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/Edge.java
@@ -18,17 +18,19 @@
 package org.apache.crunch.impl.mr.plan;
 
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.lang.builder.ReflectionToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
 import org.apache.crunch.impl.mr.collect.PCollectionImpl;
 import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
 
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -62,43 +64,94 @@ class Edge {
   public Set<NodePath> getNodePaths() {
     return paths;
   }
-  
-  public PCollectionImpl getSplit() {
-    List<Iterator<PCollectionImpl<?>>> iters = Lists.newArrayList();
-    for (NodePath nodePath : paths) {
-      Iterator<PCollectionImpl<?>> iter = nodePath.iterator();
-      iter.next(); // prime this past the initial NGroupedTableImpl
-      iters.add(iter);
+
+  private static boolean readWriteOutput(PCollectionImpl<?> pc, Map<PCollectionImpl<?>,
Set<Target>> outputs) {
+    if (outputs.containsKey(pc)) {
+      for (Target t : outputs.get(pc)) {
+        if (t instanceof SourceTarget || t.asSourceTarget(pc.getPType()) != null) {
+          return true;
+        }
+      }
     }
+    return false;
+  }
 
-    // Find the lowest point w/the lowest cost to be the split point for
-    // all of the dependent paths.
-    boolean end = false;
-    int splitIndex = -1;
-    while (!end) {
-      splitIndex++;
-      PCollectionImpl<?> current = null;
-      for (Iterator<PCollectionImpl<?>> iter : iters) {
-        if (iter.hasNext()) {
-          PCollectionImpl<?> next = iter.next();
-          if (next instanceof PGroupedTableImpl) {
-            end = true;
-            break;
-          } else if (current == null) {
-            current = next;
-          } else if (current != next) {
-            end = true;
-            break;
+  public Map<NodePath,  PCollectionImpl> getSplitPoints(Map<PCollectionImpl<?>,
Set<Target>> outputs) {
+    List<NodePath> np = Lists.newArrayList(paths);
+    List<PCollectionImpl<?>> smallestOverallPerPath = Lists.newArrayListWithExpectedSize(np.size());
+    Map<PCollectionImpl<?>, Set<Integer>> pathCounts = Maps.newHashMap();
+    Map<NodePath, PCollectionImpl> splitPoints = Maps.newHashMap();
+    for (int i = 0; i < np.size(); i++) {
+      long bestSize = Long.MAX_VALUE;
+      boolean breakpoint = false;
+      PCollectionImpl<?> best = null;
+      for (PCollectionImpl<?> pc : np.get(i)) {
+        if (!(pc instanceof PGroupedTableImpl)) {
+          if (pc.isBreakpoint()) {
+            if (!breakpoint || pc.getSize() < bestSize) {
+              best = pc;
+              bestSize = pc.getSize();
+              breakpoint = true;
+            }
+          } else if (!breakpoint && pc.getSize() < bestSize) {
+            best = pc;
+            bestSize = pc.getSize();
           }
-        } else {
-          end = true;
-          break;
+          Set<Integer> cnts = pathCounts.get(pc);
+          if (cnts == null) {
+            cnts = Sets.newHashSet();
+            pathCounts.put(pc, cnts);
+          }
+          cnts.add(i);
+        }
+      }
+      smallestOverallPerPath.add(best);
+      if (breakpoint) {
+        splitPoints.put(np.get(i), best);
+      }
+    }
+
+    Set<Integer> missing = Sets.newHashSet();
+    for (int i = 0; i < np.size(); i++) {
+      if (!splitPoints.containsKey(np.get(i))) {
+        missing.add(i);
+      }
+    }
+    if (missing.isEmpty()) {
+      return splitPoints;
+    } else {
+      // Need to either choose the smallest collection from each missing path,
+      // or the smallest single collection that is on all paths as the split target.
+      Set<PCollectionImpl<?>> smallest = Sets.newHashSet();
+      long smallestSize = 0;
+      for (Integer id : missing) {
+        PCollectionImpl<?> s = smallestOverallPerPath.get(id);
+        if (!smallest.contains(s)) {
+          smallest.add(s);
+          smallestSize += s.getSize();
+        }
+      }
+
+      PCollectionImpl<?> singleBest = null;
+      long singleSmallestSize = Long.MAX_VALUE;
+      for (Map.Entry<PCollectionImpl<?>, Set<Integer>> e : pathCounts.entrySet())
{
+        if (Sets.difference(missing, e.getValue()).isEmpty() && e.getKey().getSize()
< singleSmallestSize) {
+          singleBest = e.getKey();
+          singleSmallestSize = singleBest.getSize();
+        }
+      }
+
+      if (smallestSize < singleSmallestSize) {
+        for (Integer id : missing) {
+          splitPoints.put(np.get(id), smallestOverallPerPath.get(id));
+        }
+      } else {
+        for (Integer id : missing) {
+          splitPoints.put(np.get(id), singleBest);
         }
       }
     }
-    // TODO: Add costing calcs here.
-    
-    return Iterables.getFirst(paths, null).get(splitIndex);
+    return splitPoints;
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/12dea675/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 1e0793c..ac61fec 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -224,21 +224,19 @@ public class MSCRPlanner {
           } else {
             // Execute an Edge split
             Vertex newGraphTail = graph.getVertexAt(e.getTail().getPCollection());
-            PCollectionImpl split = e.getSplit();
-            InputCollection<?> inputNode = handleSplitTarget(split);
-            Vertex splitTail = graph.addVertex(split, true);
-            Vertex splitHead = graph.addVertex(inputNode, false);
-            
-            // Divide up the node paths in the edge between the two GBK nodes so
-            // that each node is either owned by GBK1 -> newTail or newHead -> GBK2.
-            for (NodePath path : e.getNodePaths()) {
+            Map<NodePath, PCollectionImpl> splitPoints = e.getSplitPoints(outputs);
+            for (Map.Entry<NodePath, PCollectionImpl> s : splitPoints.entrySet()) {
+              NodePath path = s.getKey();
+              PCollectionImpl split = s.getValue();
+              InputCollection<?> inputNode = handleSplitTarget(split);
+              Vertex splitTail = graph.addVertex(split, true);
+              Vertex splitHead = graph.addVertex(inputNode, false);
               NodePath headPath = path.splitAt(split, splitHead.getPCollection());
               graph.getEdge(vertex, splitTail).addNodePath(headPath);
               graph.getEdge(splitHead, newGraphTail).addNodePath(path);
+              // Note the dependency between the vertices in the graph.
+              graph.markDependency(splitHead, splitTail);
             }
-            
-            // Note the dependency between the vertices in the graph.
-            graph.markDependency(splitHead, splitTail);
           }
         }
       }


Mime
View raw message