crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-241: Write side outputs from the map phase of a MapReduce job
Date Mon, 22 Jul 2013 22:20:00 GMT
Updated Branches:
  refs/heads/master b54ce84e7 -> 36bde4162


CRUNCH-241: Write side outputs from the map phase of a MapReduce job


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/36bde416
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/36bde416
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/36bde416

Branch: refs/heads/master
Commit: 36bde4162ba624c5f9bdb85434c66fa889713060
Parents: b54ce84
Author: Josh Wills <jwills@apache.org>
Authored: Thu Jul 18 23:09:21 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Mon Jul 22 15:00:05 2013 -0700

----------------------------------------------------------------------
 .../org/apache/crunch/MultipleOutputIT.java     | 36 +++++++++++++++++---
 .../impl/mr/emit/MultipleOutputEmitter.java     |  2 --
 .../crunch/impl/mr/exec/CrunchJobHooks.java     | 14 ++++----
 .../org/apache/crunch/impl/mr/plan/DoNode.java  |  4 +--
 .../crunch/impl/mr/plan/JobPrototype.java       | 32 ++++++++++++++---
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 27 ++++++++++++---
 .../java/org/apache/crunch/io/PathTarget.java   |  3 +-
 .../crunch/io/avro/trevni/TrevniKeyTarget.java  |  2 +-
 .../apache/crunch/io/impl/FileTargetImpl.java   |  5 ++-
 .../crunch/io/impl/SourcePathTargetImpl.java    |  4 +--
 10 files changed, 96 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java b/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java
index 1a85b6a..96971f8 100644
--- a/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java
@@ -18,15 +18,19 @@
 package org.apache.crunch;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.crunch.fn.Aggregators;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
+import org.apache.crunch.io.To;
 import org.apache.crunch.test.StringWrapper;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
@@ -47,7 +51,6 @@ public class MultipleOutputIT {
 
   public static PCollection<String> evenCountLetters(PCollection<String> words,
PTypeFamily typeFamily) {
     return words.parallelDo("even", new FilterFn<String>() {
-
       @Override
       public boolean accept(String input) {
         return input.length() % 2 == 0;
@@ -57,7 +60,6 @@ public class MultipleOutputIT {
 
   public static PCollection<String> oddCountLetters(PCollection<String> words,
PTypeFamily typeFamily) {
     return words.parallelDo("odd", new FilterFn<String>() {
-
       @Override
       public boolean accept(String input) {
         return input.length() % 2 != 0;
@@ -100,7 +102,8 @@ public class MultipleOutputIT {
     String inputPath = tmpDir.copyResourceFileName("letters.txt");
     String outputPathEven = tmpDir.getFileName("even");
     String outputPathOdd = tmpDir.getFileName("odd");
-
+    String outputPathReduce = tmpDir.getFileName("reduce");
+    
     PCollection<String> words = pipeline.read(At.textFile(inputPath, typeFamily.strings()));
 
     PCollection<String> evenCountWords = evenCountLetters(words, typeFamily);
@@ -108,14 +111,27 @@ public class MultipleOutputIT {
     pipeline.writeTextFile(evenCountWords, outputPathEven);
     pipeline.writeTextFile(oddCountWords, outputPathOdd);
 
+    evenCountWords.by(new FirstLetterFn(), typeFamily.strings())
+        .groupByKey()
+        .combineValues(Aggregators.<String>FIRST_N(10))
+        .write(To.textFile(outputPathReduce));
+    
     PipelineResult result = pipeline.done();
 
     checkFileContents(outputPathEven, Arrays.asList("bb"));
     checkFileContents(outputPathOdd, Arrays.asList("a"));
-
+    checkNotEmpty(outputPathReduce);
+    
     return result;
   }
 
+  static class FirstLetterFn extends MapFn<String, String> {
+    @Override
+    public String map(String input) {
+      return input.substring(0, 1);
+    }
+  }
+  
   /**
    * Mutates the state of an input and then emits the mutated object.
    */
@@ -167,6 +183,18 @@ public class MultipleOutputIT {
 
   }
 
+  private void checkNotEmpty(String filePath) throws IOException {
+    File dir = new File(filePath);
+    File[] partFiles = dir.listFiles(new FilenameFilter() {
+      @Override
+      public boolean accept(File dir, String name) {
+        return name.startsWith("part");
+      } 
+    });
+    assertTrue(partFiles.length > 0);
+    assertTrue(Files.readLines(partFiles[0], Charset.defaultCharset()).size() > 0);
+  }
+  
   private void checkFileContents(String filePath, List<String> expected) throws IOException
{
     File outputFile = new File(filePath, "part-m-00000");
     List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());

http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
index 2e58fed..3d806ed 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
@@ -17,8 +17,6 @@
  */
 package org.apache.crunch.impl.mr.emit;
 
-import java.io.IOException;
-
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.io.CrunchOutputs;

http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
index 0780431..6a15a0d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
@@ -66,7 +66,8 @@ public final class CrunchJobHooks {
     private final Map<Integer, PathTarget> multiPaths;
     private final boolean mapOnlyJob;
 
-    public CompletionHook(Job job, Path workingPath, Map<Integer, PathTarget> multiPaths,
boolean mapOnlyJob) {
+    public CompletionHook(Job job, Path workingPath, Map<Integer, PathTarget> multiPaths,
+        boolean mapOnlyJob) {
       this.job = job;
       this.workingPath = workingPath;
       this.multiPaths = multiPaths;
@@ -80,12 +81,11 @@ public final class CrunchJobHooks {
 
     private synchronized void handleMultiPaths() throws IOException {
       try {
-        if (job.isSuccessful() && !multiPaths.isEmpty()) {
-          // Need to handle moving the data from the output directory of the
-          // job to the output locations specified in the paths.
-          FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration());
-          for (Map.Entry<Integer, PathTarget> entry : multiPaths.entrySet()) {
-            entry.getValue().handleOutputs(job.getConfiguration(), workingPath, entry.getKey(),
mapOnlyJob);
+        if (job.isSuccessful()) {
+          if (!multiPaths.isEmpty()) {
+            for (Map.Entry<Integer, PathTarget> entry : multiPaths.entrySet()) {
+              entry.getValue().handleOutputs(job.getConfiguration(), workingPath, entry.getKey());
+            }
           }
         }
       } catch(Exception ie) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
index 865369c..2d6d590 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
@@ -107,7 +107,7 @@ public class DoNode {
     // TODO: This is sort of terrible, refactor the code to make this make more sense.
     boolean exists = false;
     for (DoNode child : children) {
-      if (node == child) {
+      if (node == child || (node.isOutputNode() && node.equals(child))) {
         exists = true;
         break;
       }
@@ -152,7 +152,7 @@ public class DoNode {
       return true;
     }
     DoNode o = (DoNode) other;
-    return (name.equals(o.name) && fn.equals(o.fn) && source == o.source
&& outputConverter == o.outputConverter);
+    return name.equals(o.name) && fn.equals(o.fn) && source == o.source &&
outputConverter == o.outputConverter;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index f22b5a1..da13611 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -66,6 +66,7 @@ class JobPrototype {
   private final Map<PCollectionImpl<?>, DoNode> nodes = Maps.newHashMap();
   private final Path workingPath;
 
+  private HashMultimap<Target, NodePath> mapSideNodePaths;
   private HashMultimap<Target, NodePath> targetsToNodePaths;
   private DoTableImpl<?, ?> combineFnTable;
 
@@ -107,6 +108,13 @@ class JobPrototype {
     return targetsToNodePaths;
   }
 
+  public void addMapSideOutputs(HashMultimap<Target, NodePath> mapSideNodePaths) {
+    if (group == null) {
+      throw new IllegalStateException("Cannot side-outputs to a map-only job");
+    }
+    this.mapSideNodePaths = mapSideNodePaths;
+  }
+  
   public void addReducePaths(HashMultimap<Target, NodePath> outputPaths) {
     if (group == null) {
       throw new IllegalStateException("Cannot add a reduce phase to a map-only job");
@@ -135,10 +143,9 @@ class JobPrototype {
     job.setJarByClass(jarClass);
 
     Set<DoNode> outputNodes = Sets.newHashSet();
-    Set<Target> targets = targetsToNodePaths.keySet();
     Path outputPath = new Path(workingPath, "output");
     MSCROutputHandler outputHandler = new MSCROutputHandler(job, outputPath, group == null);
-    for (Target target : targets) {
+    for (Target target : targetsToNodePaths.keySet()) {
       DoNode node = null;
       for (NodePath nodePath : targetsToNodePaths.get(target)) {
         if (node == null) {
@@ -150,6 +157,22 @@ class JobPrototype {
       }
     }
 
+    Set<DoNode> mapSideNodes = Sets.newHashSet();
+    if (mapSideNodePaths != null) {
+      for (Target target : mapSideNodePaths.keySet()) {
+        DoNode node = null;
+        for (NodePath nodePath : mapSideNodePaths.get(target)) {
+          if (node == null) {
+            PCollectionImpl<?> collect = nodePath.tail();
+            node = DoNode.createOutputNode(target.toString(), collect.getPType());
+            outputHandler.configureNode(node, target);
+          }
+          mapSideNodes.add(walkPath(nodePath.descendingIterator(), node));
+        }
+        
+      }
+    }
+    
     job.setMapperClass(CrunchMapper.class);
     List<DoNode> inputNodes;
     DoNode reduceNode = null;
@@ -171,7 +194,7 @@ class JobPrototype {
       group.configureShuffle(job);
 
       DoNode mapOutputNode = group.getGroupingNode();
-      Set<DoNode> mapNodes = Sets.newHashSet();
+      Set<DoNode> mapNodes = Sets.newHashSet(mapSideNodes);
       for (NodePath nodePath : mapNodePaths) {
         // Advance these one step, since we've already configured
         // the grouping node, and the PGroupedTableImpl is the tail
@@ -192,8 +215,7 @@ class JobPrototype {
       inputNode.getSource().configureSource(job, -1);
     } else {
       for (int i = 0; i < inputNodes.size(); i++) {
-        DoNode inputNode = inputNodes.get(i);
-        inputNode.getSource().configureSource(job, i);
+        inputNodes.get(i).getSource().configureSource(job, i);
       }
       job.setInputFormatClass(CrunchInputFormat.class);
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 06ede5a..b5b37d7 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -268,17 +267,36 @@ public class MSCRPlanner {
       Set<Edge> usedEdges = Sets.newHashSet();
       for (Vertex g : gbks) {
         Set<NodePath> inputs = Sets.newHashSet();
+        HashMultimap<Target, NodePath> mapSideOutputPaths = HashMultimap.create();
         for (Edge e : g.getIncomingEdges()) {
           inputs.addAll(e.getNodePaths());
           usedEdges.add(e);
+          if (e.getHead().isInput()) {
+            for (Edge ep : e.getHead().getOutgoingEdges()) {
+              if (ep.getTail().isOutput() && !usedEdges.contains(ep)) { // map-side
output
+                for (Target t : outputs.get(ep.getTail().getPCollection())) {
+                  mapSideOutputPaths.putAll(t, ep.getNodePaths());
+                }
+                usedEdges.add(ep);
+              }
+            }
+          }
         }
         JobPrototype prototype = JobPrototype.createMapReduceJob(
             ++lastJobID, (PGroupedTableImpl) g.getPCollection(), inputs, pipeline.createTempPath());
+        prototype.addMapSideOutputs(mapSideOutputPaths);
         assignment.put(g, prototype);
         for (Edge e : g.getIncomingEdges()) {
           assignment.put(e.getHead(), prototype);
-          usedEdges.add(e);
+          if (e.getHead().isInput()) {
+            for (Edge ep : e.getHead().getOutgoingEdges()) {
+              if (ep.getTail().isOutput() && !assignment.containsKey(ep.getTail()))
{ // map-side output
+                assignment.put(ep.getTail(), prototype);
+              }
+            }
+          }
         }
+        
         HashMultimap<Target, NodePath> outputPaths = HashMultimap.create();
         for (Edge e : g.getOutgoingEdges()) {
           Vertex output = e.getTail();
@@ -290,13 +308,12 @@ public class MSCRPlanner {
         }
         prototype.addReducePaths(outputPaths);
       }
-      
+
       // Check for any un-assigned vertices, which should be map-side outputs
       // that we will need to run in a map-only job.
       HashMultimap<Target, NodePath> outputPaths = HashMultimap.create();
       Set<Vertex> orphans = Sets.newHashSet();
       for (Vertex v : component) {
-
         // Check if this vertex has multiple inputs but only a subset of
         // them have already been assigned
         boolean vertexHasUnassignedIncomingEdges = false;
@@ -334,7 +351,7 @@ public class MSCRPlanner {
         }
       }
     }
-    
+  
     return assignment;
   }
   

http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java
index 4f7949f..9488c16 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java
@@ -44,8 +44,7 @@ public interface PathTarget extends MapReduceTarget {
    * @param conf The job {@code Configuration}
    * @param workingPath The temp directory that contains the output of the job
    * @param index The index of this target for jobs that write multiple output files to a
single directory
-   * @param mapOnlyJob Whether or not this is a map-only job
    * @throws IOException
    */
-  void handleOutputs(Configuration conf, Path workingPath, int index, boolean mapOnlyJob)
throws IOException;
+  void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
index 2ede024..2fefa59 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java
@@ -134,7 +134,7 @@ public class TrevniKeyTarget extends FileTargetImpl {
 
   @Override
   protected Path getDestFile(final Configuration conf, final Path src, final Path dir, final
boolean mapOnlyJob) throws IOException {
-    Path outputFilename = super.getDestFile(conf, src, dir, mapOnlyJob);
+    Path outputFilename = super.getDestFile(conf, src, dir, true);
     //make sure the dst file is unique in the case there are multiple part-#.trv files per
partition.
     return new Path(outputFilename.toString()+"-"+src.getName());
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index 4d58830..07c63df 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -86,8 +86,7 @@ public class FileTargetImpl implements PathTarget {
     return true;
   }
 
-  public void handleOutputs(Configuration conf, Path workingPath, int index,
-      boolean mapOnlyJob) throws IOException {
+  public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException
{
     FileSystem srcFs = workingPath.getFileSystem(conf);
     Path src = getSourcePattern(workingPath, index);
     Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
@@ -97,7 +96,7 @@ public class FileTargetImpl implements PathTarget {
     }
     boolean sameFs = isCompatible(srcFs, path);
     for (Path s : srcs) {
-      Path d = getDestFile(conf, s, path, mapOnlyJob);
+      Path d = getDestFile(conf, s, path, s.getName().contains("-m-"));
       if (sameFs) {
         srcFs.rename(s, d);
       } else {

http://git-wip-us.apache.org/repos/asf/crunch/blob/36bde416/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
index a90bb7b..fbc2201 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
@@ -52,8 +52,8 @@ public class SourcePathTargetImpl<T> extends SourceTargetImpl<T>
implements Path
   }
 
   @Override
-  public void handleOutputs(Configuration conf, Path workingPath, int index, boolean mapOnlyJob)
+  public void handleOutputs(Configuration conf, Path workingPath, int index)
       throws IOException {
-    ((PathTarget) target).handleOutputs(conf, workingPath, index, mapOnlyJob);
+    ((PathTarget) target).handleOutputs(conf, workingPath, index);
   }
 }


Mime
View raw message