crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzo...@apache.org
Subject [2/2] crunch git commit: CRUNCH-438: Visualizations of some important internal/intermediate pipeline planning states
Date Fri, 09 Jan 2015 00:01:33 GMT
CRUNCH-438: Visualizations of some important internal/intermediate pipeline planning states

Signed-off-by: tzolov <christian.tzolov@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/6543ded5
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/6543ded5
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/6543ded5

Branch: refs/heads/master
Commit: 6543ded556dbbd4e8e25e59730f7f5fa8e2c3c26
Parents: 3088d82
Author: tzolov <christian.tzolov@gmail.com>
Authored: Thu Jan 8 21:50:37 2015 +0100
Committer: tzolov <christian.tzolov@gmail.com>
Committed: Fri Jan 9 00:35:37 2015 +0100

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 .../apache/crunch/impl/mr/plan/DotfilesIT.java  | 201 +++++++++++++
 .../org/apache/crunch/PipelineExecution.java    |   7 +
 .../org/apache/crunch/impl/mem/MemPipeline.java |  10 +
 .../org/apache/crunch/impl/mr/MRPipeline.java   |  10 +-
 .../apache/crunch/impl/mr/exec/MRExecutor.java  |  26 +-
 .../impl/mr/plan/CommonDotfileWriter.java       | 176 +++++++++++
 .../crunch/impl/mr/plan/DotfileUtills.java      | 178 ++++++++++++
 .../crunch/impl/mr/plan/DotfileWriterGraph.java | 156 ++++++++++
 .../plan/DotfileWriterPCollectionLineage.java   |  87 ++++++
 .../impl/mr/plan/DotfileWriterRTNodes.java      | 291 +++++++++++++++++++
 .../crunch/impl/mr/plan/JobPrototype.java       |   2 -
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java |  43 ++-
 .../crunch/impl/mr/plan/PlanningParameters.java |   2 +
 .../crunch/impl/mr/run/CrunchOutputFormat.java  |  54 ----
 .../org/apache/crunch/impl/mr/run/RTNode.java   |  33 +++
 .../org/apache/crunch/io/CrunchOutputs.java     | 257 ++++------------
 .../apache/crunch/impl/spark/SparkRuntime.java  |   7 +
 src/main/config/checkstyle.xml                  |   1 -
 19 files changed, 1258 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index d5149ce..d4b1ab9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,4 +5,5 @@
 target
 *.iml
 .idea
+.checkstyle
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/it/java/org/apache/crunch/impl/mr/plan/DotfilesIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mr/plan/DotfilesIT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mr/plan/DotfilesIT.java
new file mode 100644
index 0000000..bfb1258
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/impl/mr/plan/DotfilesIT.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.lib.Aggregate;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.io.Files;
+
+public class DotfilesIT {
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Rule
+  public TemporaryPath dotfileDir = TemporaryPaths.create();
+
+  enum WordCountStats {
+    ANDS
+  }
+
+  public static PTable<String, Long> wordCount(PCollection<String> words, PTypeFamily typeFamily) {
+    return Aggregate.count(words.parallelDo(new DoFn<String, String>() {
+
+      @Override
+      public void process(String line, Emitter<String> emitter) {
+        for (String word : line.split("\\s+")) {
+          emitter.emit(word);
+          if ("and".equals(word)) {
+            increment(WordCountStats.ANDS);
+          }
+        }
+      }
+    }, typeFamily.strings()));
+  }
+
+  @Test
+  public void testPlanDotfileWithOutputDir() throws Throwable {
+
+    Configuration conf = tmpDir.getDefaultConfiguration();
+
+    DotfileUtills.setPipelineDotfileOutputDir(conf, dotfileDir.getRootFileName());
+
+    run(new MRPipeline(DotfilesIT.class, conf), WritableTypeFamily.getInstance());
+
+    String[] dotfileNames = dotfileNames(dotfileDir.getRootFile());
+
+    assertEquals(1, dotfileNames.length);
+
+    assertTrue(containsFileEndingWith(dotfileNames, "jobplan.dot"));
+
+    assertTrue("PlanDotfile should always be present in the Configuration",
+        conf.get(PlanningParameters.PIPELINE_PLAN_DOTFILE).length() > 0);
+  }
+
+  @Test
+  public void testPlanDotfileWithoutOutputDir() throws Throwable {
+
+    Configuration conf = tmpDir.getDefaultConfiguration();
+
+    run(new MRPipeline(DotfilesIT.class, conf), WritableTypeFamily.getInstance());
+
+    String[] dotfileNames = dotfileNames(dotfileDir.getRootFile());
+
+    assertEquals(0, dotfileNames.length);
+    assertTrue("PlanDotfile should always be present in the Configuration",
+        conf.get(PlanningParameters.PIPELINE_PLAN_DOTFILE).length() > 0);
+  }
+
+  @Test
+  public void testDebugDotfiles() throws Throwable {
+
+    Configuration conf = tmpDir.getDefaultConfiguration();
+
+    DotfileUtills.setPipelineDotfileOutputDir(conf, dotfileDir.getRootFileName());
+    DotfileUtills.enableDebugDotfiles(conf);
+
+    run(new MRPipeline(DotfilesIT.class, conf), WritableTypeFamily.getInstance());
+
+    String[] dotfileNames = dotfileNames(dotfileDir.getRootFile());
+
+    assertEquals(5, dotfileNames.length);
+
+    assertTrue(containsFileEndingWith(dotfileNames, "jobplan.dot"));
+    assertTrue(containsFileEndingWith(dotfileNames, "split_graph_plan.dot"));
+    assertTrue(containsFileEndingWith(dotfileNames, "rt_plan.dot"));
+    assertTrue(containsFileEndingWith(dotfileNames, "base_graph_plan.dot"));
+    assertTrue(containsFileEndingWith(dotfileNames, "lineage_plan.dot"));
+
+    assertTrue("PlanDotfile should always be present in the Configuration",
+        conf.get(PlanningParameters.PIPELINE_PLAN_DOTFILE).length() > 0);
+  }
+
+  @Test
+  public void testDebugDotfilesEnabledButNoOutputDirSet() throws Throwable {
+
+    Configuration conf = tmpDir.getDefaultConfiguration();
+
+    DotfileUtills.enableDebugDotfiles(conf);
+
+    run(new MRPipeline(DotfilesIT.class, conf), WritableTypeFamily.getInstance());
+
+    String[] dotfileNames = dotfileNames(dotfileDir.getRootFile());
+
+    assertEquals(0, dotfileNames.length);
+
+    assertTrue("PlanDotfile should always be present in the Configuration",
+        conf.get(PlanningParameters.PIPELINE_PLAN_DOTFILE).length() > 0);
+  }
+
+  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
+    String inputPath = tmpDir.copyResourceFileName("shakes.txt");
+    String outputPath = tmpDir.getFileName("output");
+
+    PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, typeFamily.strings()));
+    PTable<String, Long> wordCount = wordCount(shakespeare, typeFamily);
+
+    pipeline.writeTextFile(wordCount, outputPath);
+
+    PipelineResult res = pipeline.done();
+    assertTrue(res.succeeded());
+    List<PipelineResult.StageResult> stageResults = res.getStageResults();
+
+    assertEquals(1, stageResults.size());
+    assertEquals(427, stageResults.get(0).getCounterValue(WordCountStats.ANDS));
+
+    File outputFile = new File(outputPath, "part-r-00000");
+    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
+    boolean passed = false;
+    for (String line : lines) {
+      if (line.startsWith("Macbeth\t28") || line.startsWith("[Macbeth,28]")) {
+        passed = true;
+        break;
+      }
+    }
+    assertTrue(passed);
+  }
+
+  private boolean containsFileEndingWith(String[] fileNames, String suffix) {
+    for (String fn : fileNames) {
+      if (fn.endsWith(suffix))
+        return true;
+    }
+    return false;
+  }
+
+  private String[] dotfileNames(File rootDir) {
+
+    File[] dotfileFiles = rootDir.listFiles(new FilenameFilter() {
+      @Override
+      public boolean accept(File dir, String name) {
+        return name.endsWith(".dot");
+      }
+    });
+
+    String[] fileNames = new String[dotfileFiles.length];
+    int i = 0;
+    for (File file : dotfileFiles) {
+      fileNames[i++] = file.getName();
+    }
+
+    return fileNames;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java b/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java
index af6a177..b456d45 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java
@@ -19,6 +19,7 @@ package org.apache.crunch;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -35,6 +36,12 @@ public interface PipelineExecution extends ListenableFuture<PipelineResult> {
    */
   String getPlanDotFile();
 
+  /**
+   * Returns all .dot files that allows a client to graph the Crunch execution plan internals.
+   * Key is the name of the dot file and the value is the file itself
+   */
+  Map<String, String> getNamedDotFiles();
+
   /** Blocks until pipeline completes or the specified waiting time elapsed. */
    void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException;
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 23b9e04..49e5662 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -19,12 +19,15 @@ package org.apache.crunch.impl.mem;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import com.google.common.base.Charsets;
+
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.io.DatumWriter;
 import org.apache.crunch.CachingOptions;
@@ -60,9 +63,11 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapreduce.Counters;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractFuture;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -390,6 +395,11 @@ public class MemPipeline implements Pipeline {
     }
 
     @Override
+    public Map<String, String> getNamedDotFiles() {
+      return ImmutableMap.of("", "");
+    }
+
+    @Override
     public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException {
       set(res);
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index d23988b..6cd2809 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -24,9 +24,11 @@ import java.net.URLEncoder;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Maps;
+
 import org.apache.crunch.CachingOptions;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.PCollection;
@@ -130,7 +132,9 @@ public class MRPipeline extends DistributedPipeline {
   @Override
   public MRPipelineExecution runAsync() {
     MRExecutor mrExecutor = plan();
-    writePlanDotFile(mrExecutor.getPlanDotFile());
+    for (Entry<String, String> dotEntry: mrExecutor.getNamedDotFiles().entrySet()){
+      writePlanDotFile(dotEntry.getKey(), dotEntry.getValue());
+    }
     MRPipelineExecution res = mrExecutor.execute();
     outputTargets.clear();
     return res;
@@ -159,7 +163,7 @@ public class MRPipeline extends DistributedPipeline {
    *
    * @param dotFileContents contents to be written to the dot file
    */
-  private void writePlanDotFile(String dotFileContents) {
+  private void writePlanDotFile(String fileName, String dotFileContents) {
     String dotFileDir = getConfiguration().get(PlanningParameters.PIPELINE_DOTFILE_OUTPUT_DIR);
     if (dotFileDir != null) {
       FSDataOutputStream outputStream = null;
@@ -168,7 +172,7 @@ public class MRPipeline extends DistributedPipeline {
         URI uri = new URI(dotFileDir);
         FileSystem fs = FileSystem.get(uri, getConfiguration());
         SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd_HH.mm.ss.SSS");
-        String filenameSuffix = String.format("_%s_jobplan.dot", dateFormat.format(new Date()));
+        String filenameSuffix = String.format("_%s_%s.dot", dateFormat.format(new Date()), fileName);
         String encodedName = URLEncoder.encode(getName(), "UTF-8");
         // We limit the pipeline name to the first 150 characters to keep the output dotfile length less 
         // than 200, as it's not clear what the exact limits are on the filesystem we're writing to (this

http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index 024fcce..63d0c5d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -18,8 +18,10 @@
 package org.apache.crunch.impl.mr.exec;
 
 import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AbstractFuture;
+
 import org.apache.crunch.PipelineCallable;
 import org.apache.crunch.PipelineResult;
 import org.apache.crunch.SourceTarget;
@@ -37,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -68,7 +71,7 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe
   private Thread monitorThread;
   private boolean started;
 
-  private String planDotFile;
+  private Map<String, String> namedDotFiles;
   
   public MRExecutor(
       Configuration conf,
@@ -90,16 +93,28 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe
     this.pollInterval = isLocalMode()
       ? new CappedExponentialCounter(50, 1000)
       : new CappedExponentialCounter(500, 10000);
+
+    this.namedDotFiles = new ConcurrentHashMap<String, String>();
   }
 
   public void addJob(CrunchControlledJob job) {
     this.control.addJob(job);
   }
 
-  public void setPlanDotFile(String planDotFile) {
-    this.planDotFile = planDotFile;
+  public void addNamedDotFile(String fileName, String planDotFile) {
+    this.namedDotFiles.put(fileName, planDotFile);
+  }
+
+  @Override
+  public String getPlanDotFile() {
+    return this.namedDotFiles.get("jobplan");
   }
   
+  @Override
+  public Map<String, String> getNamedDotFiles() {
+    return ImmutableMap.copyOf(this.namedDotFiles);
+  }
+
   public synchronized MRPipelineExecution execute() {
     if (!started) {
       monitorThread.start();
@@ -190,11 +205,6 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe
   }
 
   @Override
-  public String getPlanDotFile() {
-    return planDotFile;
-  }
-
-  @Override
   public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException {
     doneSignal.await(timeout, timeUnit);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/CommonDotfileWriter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/CommonDotfileWriter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/CommonDotfileWriter.java
new file mode 100644
index 0000000..a3199ba
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/CommonDotfileWriter.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import static java.lang.String.format;
+import static org.apache.commons.collections.CollectionUtils.isEmpty;
+
+import java.util.ArrayList;
+
+import org.apache.crunch.Source;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.types.PType;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+
+/**
+ * Common Debug dotfile writer class. Provides the draw abstraction common for all debug dotfile writers.
+ */
+@SuppressWarnings({ "rawtypes" })
+public abstract class CommonDotfileWriter {
+
+  protected static final String DEFAULT_FOLDER_COLOR = "darkGreen";
+
+  protected static final String[] COLORS = new String[] { "blue", "red", "green", "yellow", "cyan", "darkGray", "gray",
+      "magenta", "darkGreen", "black" };
+
+  protected StringBuilder edgeBuilder = null;
+  protected StringBuilder contentBuilder = null;
+
+  protected String label(String text) {
+    return text == null ? "-" : text;
+  }
+
+  protected String className(Object obj) {
+
+    if (obj == null) {
+      return "-";
+    }
+
+    Class clazz = null;
+    if (obj instanceof Class) {
+      clazz = (Class) obj;
+    } else {
+      clazz = obj.getClass();
+    }
+    String s = clazz.getName();
+    s = s.substring(s.lastIndexOf('.') + 1);
+
+    return s;
+  }
+
+  protected String getPCollectionID(PCollectionImpl<?> pcollectionImpl) {
+    return String.format("\"%s@%d\"", pcollectionImpl.getName(), pcollectionImpl.hashCode());
+  }
+
+  protected String formatPCollection(PCollectionImpl<?> pcollectionImpl) {
+
+    String withBreakpoint = pcollectionImpl.isBreakpoint() ? " [breakpoint]" : "";
+
+    return String.format("%s [label=\"{%s | %s | %s }\", shape=%s, color=%s];\n", getPCollectionID(pcollectionImpl),
+        pcollectionImpl.getName(), className(pcollectionImpl) + withBreakpoint,
+        formatPType(pcollectionImpl.getPType()), "record", "black");
+  }
+
+  protected String formatPType(PType ptype) {
+
+    StringBuilder sb = new StringBuilder();
+
+    sb.append(className(ptype.getTypeClass()));
+
+    if (!isEmpty(ptype.getSubTypes())) {
+
+      ArrayList<String> subtypes = Lists.newArrayList();
+      for (Object subType : ptype.getSubTypes()) {
+        if (subType instanceof PType) {
+          subtypes.add(formatPType((PType) subType));
+        } else {
+          subtypes.add(className(subType));
+        }
+      }
+
+      sb.append("[").append(Joiner.on(", ").join(subtypes)).append("]");
+    }
+
+    return sb.toString();
+  }
+
+  private String getSourceID(Source s) {
+    return "\"ST@" + s + "\"";
+  }
+
+  private String getTargetID(Target t) {
+    return "\"ST@" + t + "\"";
+  }
+
+  protected void formatTarget(Target target, String color) {
+    contentBuilder.append(String.format("%s [label=\"%s\", shape=folder, color=\"%s\"];\n", getTargetID(target),
+        target.toString(), color));
+  }
+
+  protected void formatSource(Source source, String color) {
+    contentBuilder.append(String.format("%s [label=\"%s\", shape=folder, color=\"%s\"];\n", getSourceID(source),
+        source.toString(), color));
+  }
+
+  protected void link(String from, String to, String color) {
+    edgeBuilder.append(String.format("%s -> %s [color=\"%s\"];\n", from, to, color));
+  }
+
+  protected void link(PCollectionImpl pc, Target target, String color) {
+    link(getPCollectionID(pc), getTargetID(target), color);
+  }
+
+  protected void link(PCollectionImpl parent, PCollectionImpl child, String color) {
+    link(getPCollectionID(parent), getPCollectionID(child), color);
+  }
+
+  protected void link(Source source, PCollectionImpl pc, String color) {
+    link(getSourceID(source), getPCollectionID(pc), color);
+  }
+
+  public String buildDiagram(String diagramName) {
+
+    edgeBuilder = new StringBuilder();
+    contentBuilder = new StringBuilder();
+
+    contentBuilder.append("digraph G {\n");
+    contentBuilder.append(format("   label=\"%s \\n\\n\"; fontsize=24; labelloc=\"t\"; \n", diagramName));
+
+    contentBuilder.append(getLgentd());
+
+    try {
+      doBuildDiagram();
+    } catch (Throwable t) {
+      contentBuilder.append("\"" + Throwables.getRootCause(t) + "\"");
+    }
+
+    contentBuilder.append(edgeBuilder);
+    contentBuilder.append("}\n");
+
+    return contentBuilder.toString();
+  }
+
+  public String getLgentd() {
+    StringBuilder lsb = new StringBuilder();
+    lsb.append("subgraph \"cluster-legend-rtnodes\" {\n").append(
+        "label=\"LEGEND\" ; fontsize=10; style=filled; color=lightblue;\n");
+
+    doGetLegend(lsb);
+
+    lsb.append("}\n");
+    return lsb.toString();
+  }
+
+  protected abstract void doBuildDiagram();
+
+  protected abstract void doGetLegend(StringBuilder lsb);
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileUtills.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileUtills.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileUtills.java
new file mode 100644
index 0000000..dc372f5
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileUtills.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.exec.MRExecutor;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+/**
+ * Helper class that manages the dotfile generation lifecycle and configuring the dotfile debug context.
+ */
+public class DotfileUtills {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DotfileUtills.class);
+
+  private final Class<?> jarClass;
+  private final Configuration conf;
+
+  private String rtNodesDotfile = "";
+  private String basePlanGraphDotfile = "";
+  private String splitGraphPlanDotfile = "";
+  private String pcollectionLineageDotfile = "";
+  private String planDotFile = "";
+
+  DotfileUtills(Class<?> jarClass, Configuration conf) {
+    this.jarClass = jarClass;
+    this.conf = conf;
+  }
+
+  /**
+   * Builds a lineage dotfile only if the dotfile-debug mode is enabled.
+   */
+  void buildLineageDotfile(Map<PCollectionImpl<?>, Set<Target>> outputs) {
+
+    if (isDebugDotfilesEnabled(conf)) {
+      try {
+        pcollectionLineageDotfile = new DotfileWriterPCollectionLineage(outputs)
+            .buildDiagram("PCollection Lineage Plan (" + jarClass.getSimpleName() + ")");
+      } catch (Exception ex) {
+        LOG.error("Problem creating debug dotfile:", ex);
+      }
+    }
+  }
+
+  /**
+   * Builds the base graph dotfile only if the dotfile-debug mode is enabled.
+   */
+  void buildBaseGraphDotfile(Map<PCollectionImpl<?>, Set<Target>> outputs, Graph graph) {
+
+    if (isDebugDotfilesEnabled(conf)) {
+      try {
+        basePlanGraphDotfile = new DotfileWriterGraph(graph, outputs, null).buildDiagram("Base Graph ("
+            + jarClass.getSimpleName() + ")");
+      } catch (Exception ex) {
+        LOG.error("Problem creating debug dotfile:", ex);
+      }
+    }
+  }
+
+  /**
+   * Builds a splitted graph dotfile only if the dotfile-debug mode is enabled.
+   */
+  void buildSplitGraphDotfile(Map<PCollectionImpl<?>, Set<Target>> outputs, Graph graph, List<List<Vertex>> components) {
+
+    if (isDebugDotfilesEnabled(conf)) {
+      try {
+        splitGraphPlanDotfile = new DotfileWriterGraph(graph, outputs, null).buildDiagram("Graph With Components ("
+            + jarClass.getSimpleName() + ")");
+
+      } catch (Exception ex) {
+        LOG.error("Problem creating debug dotfile:", ex);
+      }
+    }
+  }
+
+  /**
+   * Builds a RT node dotfile only if the dotfile-debug mode is enabled.
+   */
+  void buildRTNodesDotfile(MRExecutor exec) {
+    if (isDebugDotfilesEnabled(conf)) {
+      try {
+        rtNodesDotfile = new DotfileWriterRTNodes(exec.getJobs()).buildDiagram("Run Time Plan ("
+            + jarClass.getSimpleName() + ")");
+      } catch (Exception ex) {
+        LOG.error("Problem creating debug dotfile:", ex);
+      }
+    }
+  }
+
+  /**
+   * Build the plan dotfile despite of the the dotfile-debug mode.
+   * 
+   * @throws IOException
+   */
+  void buildPlanDotfile(MRExecutor exec, Multimap<Target, JobPrototype> assignments, MRPipeline pipeline, int lastJobID) {
+    try {
+      DotfileWriter dotfileWriter = new DotfileWriter();
+      
+      for (JobPrototype proto : Sets.newHashSet(assignments.values())) {
+        dotfileWriter.addJobPrototype(proto);
+      }
+
+      planDotFile = dotfileWriter.buildDotfile();
+      
+    } catch (Exception ex) {
+      LOG.error("Problem creating debug dotfile:", ex);
+    }
+  }
+
+  /**
+   * Attach the generated dotfiles to the {@link MRExecutor} context!. Note that the planDotFile is always added!
+   */
+  void addDotfilesToContext(MRExecutor exec) {
+    try {
+      // The job plan is always enabled and set in the Configuration;
+      conf.set(PlanningParameters.PIPELINE_PLAN_DOTFILE, planDotFile);
+      exec.addNamedDotFile("jobplan", planDotFile);
+
+      // Debug dotfiles are only stored if the configuration is set to enabled
+      if (isDebugDotfilesEnabled(conf)) {
+        exec.addNamedDotFile("rt_plan", rtNodesDotfile);
+        exec.addNamedDotFile("base_graph_plan", basePlanGraphDotfile);
+        exec.addNamedDotFile("split_graph_plan", splitGraphPlanDotfile);
+        exec.addNamedDotFile("lineage_plan", pcollectionLineageDotfile);
+      }
+    } catch (Exception ex) {
+      LOG.error("Problem creating debug dotfile:", ex);
+    }
+  }
+
+  public static boolean isDebugDotfilesEnabled(Configuration conf) {
+    return conf.getBoolean(PlanningParameters.DEBUG_DOTFILES_ENABLED, false)
+        && conf.get(PlanningParameters.PIPELINE_DOTFILE_OUTPUT_DIR) != null;
+  }
+
+  public static void enableDebugDotfiles(Configuration conf) {
+    conf.setBoolean(PlanningParameters.DEBUG_DOTFILES_ENABLED, true);
+  }
+
+  public static void disableDebugDotfilesEnabled(Configuration conf) {
+    conf.setBoolean(PlanningParameters.DEBUG_DOTFILES_ENABLED, false);
+  }
+
+  public static void setPipelineDotfileOutputDir(Configuration conf, String outputDir) {
+    conf.set(PlanningParameters.PIPELINE_DOTFILE_OUTPUT_DIR, outputDir);
+  }
+
+  public static String getPipelineDotfileOutputDir(Configuration conf) {
+    return conf.get(PlanningParameters.PIPELINE_DOTFILE_OUTPUT_DIR);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterGraph.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterGraph.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterGraph.java
new file mode 100644
index 0000000..387f4bb
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterGraph.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import static java.lang.String.format;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.crunch.Source;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Maps;
+
+@SuppressWarnings("rawtypes")
+public class DotfileWriterGraph extends CommonDotfileWriter {
+
+  private Graph graph;
+  private Map<PCollectionImpl<?>, Set<Target>> outputTargets;
+  private List<List<Vertex>> components;
+
+  public DotfileWriterGraph(Graph graph, Map<PCollectionImpl<?>, Set<Target>> outputTargets,
+      List<List<Vertex>> components) {
+    super();
+    this.graph = graph;
+    this.outputTargets = outputTargets;
+    this.components = components;
+  }
+
+  private String formatVertex(Vertex v) {
+    return formatPCollection(v.getPCollection());
+  }
+
+  private String formatNodePaths(Set<NodePath> nodePaths) {
+    ArrayList<String> path = new ArrayList<String>();
+    for (NodePath np : nodePaths) {
+      path.add(Joiner.on(",  \\l").join(np) + " \\l");
+    }
+    return format("%s", Joiner.on(" | \\n").join(path));
+  }
+
+  private void link(Edge e) {
+    edgeBuilder.append(String.format("%s -> %s [label=\"%s\", labeljust=r, color=\"%s\"];\n", getPCollectionID(e.getHead()
+        .getPCollection()), getPCollectionID(e.getTail().getPCollection()), formatNodePaths(e.getNodePaths()), "black"));
+  }
+
+  private void link(Source source, Vertex v, String color) {
+    link(source, v.getPCollection(), color);
+  }
+
+  private void link(Vertex v, Target target, String color) {
+    link(v.getPCollection(), target, color);
+  }
+
+  private class ComponentContentBuilder {
+
+    private Map<List<Vertex>, StringBuilder> contentBuilderMap = Maps.newHashMap();
+    private StringBuilder topContentBuilder;
+
+    public ComponentContentBuilder(StringBuilder contentBuilder, List<List<Vertex>> components) {
+      this.topContentBuilder = contentBuilder;
+      if (!CollectionUtils.isEmpty(components)) {
+        for (List<Vertex> vl : components) {
+          contentBuilderMap.put(vl, new StringBuilder());
+        }
+      }
+    }
+
+    private StringBuilder getContentBuilder(Vertex v) {
+      for (Entry<List<Vertex>, StringBuilder> entry : contentBuilderMap.entrySet()) {
+        if (entry.getKey().contains(v)) {
+          return entry.getValue();
+        }
+      }
+      return topContentBuilder;
+    }
+
+    public void append(Vertex v) {
+      this.getContentBuilder(v).append(formatVertex(v));
+    }
+
+    public StringBuilder build() {
+      int index = 0;
+      for (Entry<List<Vertex>, StringBuilder> entry : contentBuilderMap.entrySet()) {
+        topContentBuilder.append("subgraph \"cluster-component" + index + "\" {\n");
+        topContentBuilder.append(format(
+            "   label=\"Component%s\"; fontsize=14; graph[style=dotted]; fontcolor=red color=red; \n", index));
+        topContentBuilder.append(entry.getValue());
+        topContentBuilder.append("}\n");
+        index++;
+      }
+      return topContentBuilder;
+    }
+  }
+
+  @Override
+  protected void doGetLegend(StringBuilder lsb) {
+    lsb.append("   \"Folder\"  [label=\"Folder Name\", fontsize=10, shape=folder, color=darkGreen]\n")
+        .append("   \"Vertex1\"  [label=\"{Vertex Name | Vertex PCollection | PType }\", fontsize=10, shape=record]\n")
+        .append("   subgraph \"cluster-component-legend\" {\n")
+        .append("         label=\"Component1\" fontsize=14 graph[style=dotted] fontcolor=red color=red\n")
+        .append(
+            "      \"Vertex2\"  [label=\"{Vertex Name | Vertex PCollection | PType }\", fontsize=10, shape=record]\n")
+        .append("   }\n").append("   \"Vertex1\" -> \"Vertex2\" [label=\"Path List\", fontsize=10];\n");
+  }
+
+  @Override
+  public void doBuildDiagram() {
+
+    ComponentContentBuilder componentContentBuilder = new ComponentContentBuilder(contentBuilder, components);
+
+    for (Vertex v : graph) {
+      componentContentBuilder.append(v);
+
+      Source source = v.getSource();
+      if (source != null) {
+        formatSource(source, DEFAULT_FOLDER_COLOR);
+        link(source, v, DEFAULT_FOLDER_COLOR);
+      }
+
+      if (v.isOutput() && outputTargets != null) {
+        for (Target target2 : outputTargets.get(v.getPCollection())) {
+          formatTarget(target2, DEFAULT_FOLDER_COLOR);
+          link(v, target2, DEFAULT_FOLDER_COLOR);
+        }
+      }
+    }
+
+    contentBuilder = componentContentBuilder.build();
+
+    for (Edge e : graph.getAllEdges()) {
+      link(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterPCollectionLineage.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterPCollectionLineage.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterPCollectionLineage.java
new file mode 100644
index 0000000..9d54de9
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterPCollectionLineage.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.InputCollection;
+
+/**
+ * Writes <a href="http://www.graphviz.org">Graphviz</a> dot files to illustrate the topology of Crunch pipelines.
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class DotfileWriterPCollectionLineage extends CommonDotfileWriter {
+
+  private Map<PCollectionImpl<?>, Set<Target>> outputs;
+
+  public DotfileWriterPCollectionLineage(Map<PCollectionImpl<?>, Set<Target>> outputs) {
+    super();
+    this.outputs = outputs;
+  }
+
+  private void formatPCollectionLineage(PCollectionImpl pcollection, String color) {
+
+    contentBuilder.append(formatPCollection(pcollection));
+
+    // for input pcollections add the related source and link it to the collection
+    if (pcollection instanceof InputCollection) {
+      InputCollection ic = (InputCollection) pcollection;
+
+      formatSource(ic.getSource(), DEFAULT_FOLDER_COLOR);
+
+      link(ic.getSource(), pcollection, color);
+    }
+
+    List<PCollectionImpl<?>> parents = pcollection.getParents();
+    if (!CollectionUtils.isEmpty(parents)) {
+      for (PCollectionImpl parentPCollection : parents) {
+        link(parentPCollection, pcollection, color);
+        formatPCollectionLineage(parentPCollection, color);
+      }
+    }
+  }
+
+  @Override
+  protected void doBuildDiagram() {
+
+    int outputIndex = 0;
+
+    for (PCollectionImpl<?> pcollection : outputs.keySet()) {
+
+      String pathColor = COLORS[outputIndex++];
+
+      formatPCollectionLineage(pcollection, pathColor);
+
+      for (Target target : outputs.get(pcollection)) {
+        formatTarget(target, DEFAULT_FOLDER_COLOR);
+        link(pcollection, target, pathColor);
+      }
+    }
+  }
+
+  @Override
+  protected void doGetLegend(StringBuilder lsb) {
+    lsb.append("\"Folder\"  [label=\"Folder Name\" fontsize=10 shape=folder color=darkGreen]\n").append(
+        "\"PCollection\"  [label=\"{PCollection Name | PCollection Class| PType }\" fontsize=10 shape=record]\n");
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterRTNodes.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterRTNodes.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterRTNodes.java
new file mode 100644
index 0000000..58bb95d
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterRTNodes.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import static java.lang.String.format;
+import static org.apache.commons.collections.CollectionUtils.isEmpty;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.impl.mr.MRJob;
+import org.apache.crunch.impl.mr.run.NodeContext;
+import org.apache.crunch.impl.mr.run.RTNode;
+import org.apache.crunch.io.CrunchInputs;
+import org.apache.crunch.io.CrunchOutputs;
+import org.apache.crunch.io.CrunchOutputs.OutputConfig;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.util.DistCache;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+/**
+ * Writes <a href="http://www.graphviz.org">Graphviz</a> dot files to illustrate the topology of Crunch pipelines.
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class DotfileWriterRTNodes extends CommonDotfileWriter {
+
+  private static final String GREEN = "green";
+  private static final String RED = "red";
+  private static final String CYAN = "cyan";
+  private static final String BLUE = "blue";
+  private static final String BLACK = "black";
+
+  private List<MRJob> mrJobs;
+
+  public DotfileWriterRTNodes(List<MRJob> mrJobs) {
+    super();
+    this.mrJobs = mrJobs;
+  }
+
+  private String getId(RTNode rtNode) {
+    return format("\"%s@%d\"", rtNode.getNodeName(), rtNode.hashCode());
+  }
+
+  private String getOutputNameId(String outputName, MRJob mrJob) {
+    return format("\"%s@%s\"", outputName, mrJob.getJobID());
+  }
+
+  private String getId(FormatBundle bundle, MRJob mrJob) {
+    String name = (bundle == null) ? "-" : bundle.getName();
+    return format("\"%s@%s\"", name, mrJob.getJobID());
+  }
+
+  private String formatConvertor(Converter converter) {
+    StringBuffer sb = new StringBuffer();
+    sb.append(className(converter));
+    if (converter != null) {
+      if (!converter.applyPTypeTransforms()) {
+        sb.append(" (applyPTypeTransforms = ").append(converter.applyPTypeTransforms()).append(")");
+      }
+      sb.append("[").append(converter.getKeyClass().getSimpleName()).append(", ")
+          .append(converter.getValueClass().getSimpleName()).append("]");
+    }
+    return sb.toString();
+  }
+
+  private String formatRTNode(RTNode rtNode) {
+    return format("%s [label=\"{{%s | %s} | %s | %s | { %s | %s } }\" shape=record; color = black;];\n", getId(rtNode),
+        label(rtNode.getNodeName()), label(rtNode.getOutputName()), className(rtNode.getDoFn()),
+        formatPType(rtNode.getPType()), formatConvertor(rtNode.getInputConverter()),
+        formatConvertor(rtNode.getOutputConverter()));
+  }
+
+  private void formatRTNodeTree(RTNode parentRTNode) {
+
+    contentBuilder.append(formatRTNode(parentRTNode));
+
+    if (!isEmpty(parentRTNode.getChildren())) {
+      for (RTNode child : parentRTNode.getChildren()) {
+        // process child nodes
+        formatRTNodeTree(child);
+        // link parent to child node
+        link(getId(parentRTNode), getId(child), BLACK);
+      }
+    }
+  }
+
+  private List<RTNode> formatMRJobTask(Configuration configuration, int jobId, NodeContext nodeContext, String color) {
+
+    List<RTNode> rtNodes = getRTNodes(configuration, nodeContext);
+    if (rtNodes == null)
+      return null;
+
+    contentBuilder.append("subgraph \"cluster-job" + jobId + "_" + nodeContext + "\" {\n");
+    contentBuilder.append(" label=\"" + nodeContext + "\"; color=" + color + "; fontsize=14;\n");
+
+    for (RTNode rtn : rtNodes) {
+      formatRTNodeTree(rtn);
+    }
+    contentBuilder.append("}\n");
+
+    return rtNodes;
+  }
+
+  private void formatJobOutputs(Map<String, OutputConfig> namedOutputs, MRJob mrJob) {
+
+    contentBuilder.append("subgraph \"cluster-output_" + mrJob.getJobID() + "\" {\n");
+    contentBuilder.append(" label=\"OUTPUTS\"; fontsize=14; color= magenta;\n");
+
+    for (Entry<String, OutputConfig> entry : namedOutputs.entrySet()) {
+      String output = format("%s [label=\"{%s | %s | { %s | %s } }\" shape=record; color = %s];\n",
+          getOutputNameId(entry.getKey(), mrJob), entry.getKey(), entry.getValue().bundle.getName(),
+          entry.getValue().keyClass.getSimpleName(), entry.getValue().valueClass.getSimpleName(), BLACK);
+
+      contentBuilder.append(output);
+    }
+
+    contentBuilder.append("}\n");
+  }
+
+  private void formatJobInputs(Map<FormatBundle, Map<Integer, List<Path>>> inputFormatNodeMap, MRJob mrJob, String color) {
+
+    contentBuilder.append("subgraph \"cluster-inputs_" + mrJob.getJobID() + "\" {\n");
+    contentBuilder.append(" label=\"INPUTS\"; fontsize=14; color= " + color + ";\n");
+
+    for (Entry<FormatBundle, Map<Integer, List<Path>>> entry : inputFormatNodeMap.entrySet()) {
+
+      FormatBundle bundle = entry.getKey();
+
+      ArrayList<String> inList = new ArrayList<String>();
+      for (Entry<Integer, List<Path>> value : entry.getValue().entrySet()) {
+        inList.add(format("{ %s | %s}", value.getKey(), value.getValue()));
+      }
+
+      contentBuilder.append(format("%s [label=\"{ %s | %s}\" shape=record; color = %s];\n", getId(bundle, mrJob),
+          bundle.getName(), Joiner.on("|").join(inList), BLACK));
+    }
+
+    contentBuilder.append("}\n");
+  }
+
+  private FormatBundle findFormatBundleByNodeIndex(Map<FormatBundle, Map<Integer, List<Path>>> inputFormatNodeMap,
+      int nodeIndex) {
+    for (Entry<FormatBundle, Map<Integer, List<Path>>> entry : inputFormatNodeMap.entrySet()) {
+      if (entry.getValue().containsKey(nodeIndex)) {
+        return entry.getKey();
+      }
+      if (nodeIndex == 0 && entry.getValue().containsKey(-1)) {
+        return entry.getKey();
+      }
+    }
+    return null;
+  }
+
+  private List<RTNode> leafs(List<RTNode> rtNodes) {
+
+    ArrayList<RTNode> tails = Lists.newArrayListWithExpectedSize(rtNodes.size());
+
+    for (RTNode node : rtNodes) {
+      tails.addAll(leafs(node));
+    }
+    return tails;
+  }
+
+  private List<RTNode> leafs(RTNode rtNode) {
+
+    List<RTNode> leafs = Lists.newArrayList();
+
+    if (rtNode.isLeafNode()) {
+      leafs.add(rtNode);
+    } else {
+      for (RTNode child : rtNode.getChildren()) {
+        leafs.addAll(leafs(child));
+      }
+    }
+
+    return leafs;
+  }
+
+  private static List<RTNode> getRTNodes(Configuration conf, NodeContext nodeContext) {
+    Path path = new Path(new Path(conf.get(PlanningParameters.CRUNCH_WORKING_DIRECTORY)), nodeContext.toString());
+    try {
+      return (List<RTNode>) DistCache.read(conf, path);
+    } catch (IOException e) {
+      throw new CrunchRuntimeException("Could not read runtime node information", e);
+    }
+  }
+
+  @Override
+  protected void doBuildDiagram() {
+
+    for (MRJob mrJob : mrJobs) {
+
+      // TODO to find a way to handle job dependencies e.g mrJob.getDependentJobs()
+
+      Configuration configuration = mrJob.getJob().getConfiguration();
+
+      contentBuilder.append("subgraph \"cluster-job" + mrJob.getJobID() + "\" {\n");
+      contentBuilder.append("    label=\"Crunch Job " + mrJob.getJobID() + "\" ;\n");
+
+      List<RTNode> mapRTNodes = formatMRJobTask(configuration, mrJob.getJobID(), NodeContext.MAP, BLUE);
+      List<RTNode> combineRTNodes = formatMRJobTask(configuration, mrJob.getJobID(), NodeContext.COMBINE, CYAN);
+      List<RTNode> reduceRTNodes = formatMRJobTask(configuration, mrJob.getJobID(), NodeContext.REDUCE, RED);
+
+      // Deserialize Job's inputs from the CRUNCH_INPUTS Configuration property.
+      Map<FormatBundle, Map<Integer, List<Path>>> inputFormatNodeMap = CrunchInputs.getFormatNodeMap(mrJob.getJob());
+
+      formatJobInputs(inputFormatNodeMap, mrJob, GREEN);
+
+      // Link inputs to map RTNode tasks
+      for (int mapNodeIndex = 0; mapNodeIndex < mapRTNodes.size(); mapNodeIndex++) {
+        FormatBundle formatBundle = findFormatBundleByNodeIndex(inputFormatNodeMap, mapNodeIndex);
+        RTNode rtNode = mapRTNodes.get(mapNodeIndex);
+        link(getId(formatBundle, mrJob), getId(rtNode), BLACK);
+      }
+
+      // Deserialize Job's Outputs from the CRUNCH_OUTPUTS Configuration property.
+      Map<String, OutputConfig> namedOutputs = CrunchOutputs.getNamedOutputs(configuration);
+
+      formatJobOutputs(namedOutputs, mrJob);
+
+      List<RTNode> mapLeafs = leafs(mapRTNodes);
+
+      for (RTNode leafNode : mapLeafs) {
+        String outputName = leafNode.getOutputName();
+        if (StringUtils.isEmpty(outputName)) {
+          if (!isEmpty(combineRTNodes)) {
+            // If there is a combiner connect the map to the combiner and then the combiner to the reducer
+            link(getId(leafNode), getId(combineRTNodes.get(0)), BLACK);
+            link(getId(leafs(combineRTNodes).get(0)), getId(reduceRTNodes.get(0)), BLACK);
+          } else {
+            // connect
+            link(getId(leafNode), getId(reduceRTNodes.get(0)), BLACK);
+          }
+        } else {
+          link(getId(leafNode), getOutputNameId(outputName, mrJob), BLACK);
+        }
+      }
+
+      if (!isEmpty(reduceRTNodes)) {
+        List<RTNode> reduceTails = leafs(reduceRTNodes);
+        for (RTNode tailNode : reduceTails) {
+          String outputName = tailNode.getOutputName();
+          if (StringUtils.isEmpty(outputName)) {
+            throw new RuntimeException("Recue output RTNode with no named output! :" + tailNode);
+          } else {
+            link(getId(tailNode), getOutputNameId(outputName, mrJob), BLACK);
+          }
+        }
+      }
+
+      contentBuilder.append("}\n");
+
+    }
+  }
+
+  @Override
+  protected void doGetLegend(StringBuilder lsb) {
+    lsb.append(
+        "\"RTNodes\"  [label=\"{{RTNode Name | Output Name } | DoFn | PType | { Input Converter | Output Converter}}\"; shape=record;]\n")
+        .append("\"Inputs\"  [label=\"{InputFormat Name | {Node Index | Path List}}\"; shape=record; color = green]\n")
+        .append(
+            "\"Outputs\"  [label=\"{Output Name | OutputFormat Name |{Key Class | Value Class}}\"; shape=record; color = magenta]\n")
+        .append("\"Inputs\" -> \"RTNodes\" [style=invis];\n").append("\"RTNodes\" -> \"Outputs\" [style=invis];\n");
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index 2863e00..d341184 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -34,7 +34,6 @@ import org.apache.crunch.impl.mr.exec.CrunchJobHooks;
 import org.apache.crunch.impl.mr.run.CrunchCombiner;
 import org.apache.crunch.impl.mr.run.CrunchInputFormat;
 import org.apache.crunch.impl.mr.run.CrunchMapper;
-import org.apache.crunch.impl.mr.run.CrunchOutputFormat;
 import org.apache.crunch.impl.mr.run.CrunchReducer;
 import org.apache.crunch.impl.mr.run.NodeContext;
 import org.apache.crunch.impl.mr.run.RTNode;
@@ -215,7 +214,6 @@ class JobPrototype {
       job.setNumReduceTasks(0);
       inputNodes = Lists.newArrayList(outputNodes);
     }
-    job.setOutputFormatClass(CrunchOutputFormat.class);
     serialize(inputNodes, conf, workingPath, NodeContext.MAP);
 
     if (inputNodes.size() == 1) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 91e3036..b470586 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -36,15 +36,15 @@ import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
 import org.apache.crunch.impl.mr.exec.MRExecutor;
 import org.apache.crunch.materialize.MaterializableIterable;
 import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
-import com.google.common.collect.ImmutableMultimap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class MSCRPlanner {
 
@@ -86,19 +86,26 @@ public class MSCRPlanner {
   };  
 
   public MRExecutor plan(Class<?> jarClass, Configuration conf) throws IOException {
+
+    DotfileUtills dotfileUtills = new DotfileUtills(jarClass, conf);
+
+    // Generate the debug lineage dotfiles (if configuration is enabled)
+    dotfileUtills.buildLineageDotfile(outputs);
+
     Map<PCollectionImpl<?>, Set<Target>> targetDeps = Maps.newTreeMap(DEPTH_COMPARATOR);
     for (PCollectionImpl<?> pcollect : outputs.keySet()) {
       targetDeps.put(pcollect, pcollect.getTargetDependencies());
     }
-    
+
     Multimap<Target, JobPrototype> assignments = HashMultimap.create();
+
     while (!targetDeps.isEmpty()) {
       Set<Target> allTargets = Sets.newHashSet();
       for (PCollectionImpl<?> pcollect : targetDeps.keySet()) {
         allTargets.addAll(outputs.get(pcollect));
       }
       GraphBuilder graphBuilder = new GraphBuilder();
-      
+
       // Walk the current plan tree and build a graph in which the vertices are
       // sources, targets, and GBK operations.
       Set<PCollectionImpl<?>> currentStage = Sets.newHashSet();
@@ -109,7 +116,7 @@ public class MSCRPlanner {
           currentStage.add(output);
         }
       }
-      
+
       Graph baseGraph = graphBuilder.getGraph();
       boolean hasInputs = false;
       for (Vertex v : baseGraph) {
@@ -125,10 +132,14 @@ public class MSCRPlanner {
 
       // Create a new graph that splits up up dependent GBK nodes.
       Graph graph = prepareFinalGraph(baseGraph);
-      
+
       // Break the graph up into connected components.
       List<List<Vertex>> components = graph.connectedComponents();
-      
+
+      // Generate the debug graph dotfiles (if configuration is enabled)
+      dotfileUtills.buildBaseGraphDotfile(outputs, graph);
+      dotfileUtills.buildSplitGraphDotfile(outputs, graph, components);
+
       // For each component, we will create one or more job prototypes,
       // depending on its profile.
       // For dependency handling, we only need to care about which
@@ -191,18 +202,22 @@ public class MSCRPlanner {
         targetDeps.remove(output);
       }
     }
-    
+
     // Finally, construct the jobs from the prototypes and return.
-    DotfileWriter dotfileWriter = new DotfileWriter();
     MRExecutor exec = new MRExecutor(conf, jarClass, outputs, toMaterialize, appendedTargets, pipelineCallables);
+
+    // Generate the debug Plan dotfiles
+    dotfileUtills.buildPlanDotfile(exec, assignments, pipeline, lastJobID);
+
     for (JobPrototype proto : Sets.newHashSet(assignments.values())) {
-      dotfileWriter.addJobPrototype(proto);
       exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline, lastJobID));
     }
 
-    String planDotFile = dotfileWriter.buildDotfile();
-    exec.setPlanDotFile(planDotFile);
-    conf.set(PlanningParameters.PIPELINE_PLAN_DOTFILE, planDotFile);
+    // Generate the debug RTNode dotfiles (if configuration is enabled)
+    dotfileUtills.buildRTNodesDotfile(exec);
+
+    // Attach the dotfiles to the MRExcutor context
+    dotfileUtills.addDotfilesToContext(exec);
 
     return exec;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
index de89c48..a40abf9 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
@@ -33,6 +33,8 @@ public final class PlanningParameters {
    */
   public static final String PIPELINE_PLAN_DOTFILE = "crunch.planner.dotfile";
 
+  public static final String DEBUG_DOTFILES_ENABLED  = "crunch.internals.dotfiles";
+
   /**
    * Configuration key under which a directory URI can be stored where MapReduce pipeline job plans in
    * <a href="http://www.graphviz.org">DOT</a> format are stored. The dot files are only written if this configuration

http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java
deleted file mode 100644
index bd9cdc9..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-import org.apache.crunch.io.CrunchOutputs;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-
-public class CrunchOutputFormat<K, V> extends OutputFormat<K, V> {
-  @Override
-  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext taskAttemptContext)
-      throws IOException, InterruptedException {
-    return new RecordWriter<K, V>() {
-      @Override
-      public void write(K k, V v) throws IOException, InterruptedException {
-      }
-
-      @Override
-      public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-      }
-    };
-  }
-
-  @Override
-  public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
-    CrunchOutputs.checkOutputSpecs(jobContext);
-  }
-
-  @Override
-  public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext)
-      throws IOException, InterruptedException {
-    return CrunchOutputs.getOutputCommitter(taskAttemptContext);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
index 650d0c9..37bf5b6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
@@ -126,4 +126,37 @@ public class RTNode implements Serializable {
     return "RTNode [nodeName=" + nodeName + ", fn=" + fn + ", children=" + children + ", inputConverter="
         + inputConverter + ", outputConverter=" + outputConverter + ", outputName=" + outputName + "]";
   }
+
+  // Attributes needed to plot the dotfile diagrams
+  public String getNodeName() {
+    return this.nodeName;
+  }
+
+  public String getOutputName() {
+    return this.outputName;
+  }
+
+  public PType getPType() {
+    return outputPType;
+  }
+
+  public List<RTNode> getChildren() {
+    return children;
+  }
+
+  public DoFn<Object, Object> getDoFn() {
+    return fn;
+  }
+
+  public Converter getInputConverter() {
+    return inputConverter;
+  }
+
+  public Converter getOutputConverter() {
+    return outputConverter;
+  }
+
+  public Emitter<Object> getEmitter() {
+    return emitter;
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
index a536b38..e811bcf 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
@@ -17,20 +17,14 @@
  */
 package org.apache.crunch.io;
 
-import com.google.common.collect.Sets;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.base.Joiner;
@@ -41,7 +35,6 @@ import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * An analogue of {@link CrunchInputs} for handling multiple {@code OutputFormat} instances
@@ -71,32 +64,7 @@ public class CrunchOutputs<K, V> {
     conf.set(CRUNCH_OUTPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs);
   }
 
-  public static void checkOutputSpecs(JobContext jc) throws IOException, InterruptedException {
-    Map<String, OutputConfig> outputs = getNamedOutputs(jc.getConfiguration());
-    for (Map.Entry<String, OutputConfig> e : outputs.entrySet()) {
-      String namedOutput = e.getKey();
-      Job job = getJob(e.getKey(), jc.getConfiguration());
-      OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue());
-      fmt.checkOutputSpecs(jc);
-    }
-  }
-
-  public static OutputCommitter getOutputCommitter(TaskAttemptContext tac) throws IOException, InterruptedException {
-    Map<String, OutputConfig> outputs = getNamedOutputs(tac.getConfiguration());
-    Map<String, OutputCommitter> committers = Maps.newHashMap();
-    for (Map.Entry<String, OutputConfig> e : outputs.entrySet()) {
-      String namedOutput = e.getKey();
-      Job job = getJob(e.getKey(), tac.getConfiguration());
-      OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue());
-      TaskAttemptContext taskContext = TaskAttemptContextFactory.create(
-          job.getConfiguration(), tac.getTaskAttemptID());
-      OutputCommitter oc = fmt.getOutputCommitter(taskContext);
-      committers.put(namedOutput, oc);
-    }
-    return new CompositeOutputCommitter(outputs, committers);
-  }
-
-  private static class OutputConfig<K, V> {
+  public static class OutputConfig<K, V> {
     public FormatBundle<OutputFormat<K, V>> bundle;
     public Class<K> keyClass;
     public Class<V> valueClass;
@@ -108,14 +76,15 @@ public class CrunchOutputs<K, V> {
       this.valueClass = valueClass;
     }
   }
-  
-  private static Map<String, OutputConfig> getNamedOutputs(Configuration conf) {
+
+  private static Map<String, OutputConfig> getNamedOutputs(
+      TaskInputOutputContext<?, ?, ?, ?> context) {
+    return getNamedOutputs(context.getConfiguration());
+  }
+
+  public static Map<String, OutputConfig> getNamedOutputs(Configuration conf) {
     Map<String, OutputConfig> out = Maps.newHashMap();
-    String serOut = conf.get(CRUNCH_OUTPUTS);
-    if (serOut == null || serOut.isEmpty()) {
-      return out;
-    }
-    for (String input : Splitter.on(RECORD_SEP).split(serOut)) {
+    for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_OUTPUTS))) {
       List<String> fields = Lists.newArrayList(SPLITTER.split(input));
       String name = fields.get(0);
       FormatBundle<OutputFormat> bundle = FormatBundle.fromSerialized(fields.get(1), conf);
@@ -129,14 +98,13 @@ public class CrunchOutputs<K, V> {
     }
     return out;
   }
-  
   private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
   private static final String COUNTERS_GROUP = CrunchOutputs.class.getName();
 
-  private TaskInputOutputContext<?, ?, K, V> baseContext;
-  private Configuration baseConf;
+  private final TaskInputOutputContext<?, ?, K, V> baseContext;
   private final Map<String, OutputConfig> namedOutputs;
-  private final Map<String, OutputState<K, V>> outputStates;
+  private final Map<String, RecordWriter<K, V>> recordWriters;
+  private final Map<String, TaskAttemptContext> taskContextCache;
   private final boolean disableOutputCounters;
 
   /**
@@ -146,15 +114,11 @@ public class CrunchOutputs<K, V> {
    * @param context the TaskInputOutputContext object
    */
   public CrunchOutputs(TaskInputOutputContext<?, ?, K, V> context) {
-    this(context.getConfiguration());
     this.baseContext = context;
-  }
-
-  public CrunchOutputs(Configuration conf) {
-    this.baseConf = conf;
-    this.namedOutputs = getNamedOutputs(conf);
-    this.outputStates = Maps.newHashMap();
-    this.disableOutputCounters = conf.getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS, false);
+    namedOutputs = getNamedOutputs(context);
+    recordWriters = Maps.newHashMap();
+    taskContextCache = Maps.newHashMap();
+    this.disableOutputCounters = context.getConfiguration().getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS, false);
   }
 
   @SuppressWarnings("unchecked")
@@ -164,174 +128,63 @@ public class CrunchOutputs<K, V> {
       throw new IllegalArgumentException("Undefined named output '" +
         namedOutput + "'");
     }
+    TaskAttemptContext taskContext = getContext(namedOutput);
     if (!disableOutputCounters) {
       baseContext.getCounter(COUNTERS_GROUP, namedOutput).increment(1);
     }
-    getOutputState(namedOutput).write(key, value);
+    getRecordWriter(taskContext, namedOutput).write(key, value);
   }
   
   public void close() throws IOException, InterruptedException {
-    for (OutputState<?, ?> out : outputStates.values()) {
-      out.close();
+    for (RecordWriter<?, ?> writer : recordWriters.values()) {
+      writer.close(baseContext);
     }
   }
   
-  private OutputState<K, V> getOutputState(String namedOutput) throws IOException, InterruptedException {
-    OutputState<?, ?> out = outputStates.get(namedOutput);
-    if (out != null) {
-      return (OutputState<K, V>) out;
+  private TaskAttemptContext getContext(String nameOutput) throws IOException {
+    TaskAttemptContext taskContext = taskContextCache.get(nameOutput);
+    if (taskContext != null) {
+      return taskContext;
     }
 
     // The following trick leverages the instantiation of a record writer via
     // the job thus supporting arbitrary output formats.
-    Job job = getJob(namedOutput, baseConf);
-    OutputFormat<K, V> fmt = getOutputFormat(namedOutput, job, namedOutputs.get(namedOutput));
-    TaskAttemptContext taskContext = null;
-    RecordWriter<K, V> recordWriter = null;
-    if (baseContext != null) {
-      taskContext = TaskAttemptContextFactory.create(
-          job.getConfiguration(), baseContext.getTaskAttemptID());
-      recordWriter = fmt.getRecordWriter(taskContext);
-    }
-    OutputState<K, V> outputState = new OutputState(taskContext, recordWriter);
-    this.outputStates.put(namedOutput, outputState);
-    return outputState;
-  }
-
-  private static Job getJob(String namedOutput, Configuration baseConf) throws IOException {
-    Job job = new Job(new Configuration(baseConf));
-    job.getConfiguration().set("crunch.namedoutput", namedOutput);
-    return job;
-  }
-
-  private static void configureJob(
-    String namedOutput,
-    Job job,
-    OutputConfig outConfig) throws IOException {
-      job.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput);
-      job.setOutputFormatClass(outConfig.bundle.getFormatClass());
-      job.setOutputKeyClass(outConfig.keyClass);
-      job.setOutputValueClass(outConfig.valueClass);
-      outConfig.bundle.configure(job.getConfiguration());
-    }
-
-  private static OutputFormat getOutputFormat(
-      String namedOutput,
-      Job job,
-      OutputConfig outConfig) throws IOException {
-    configureJob(namedOutput, job, outConfig);
-    try {
-      return ReflectionUtils.newInstance(
-          job.getOutputFormatClass(),
-          job.getConfiguration());
-    } catch (ClassNotFoundException e) {
-      throw new IOException(e);
-    }
+    OutputConfig outConfig = namedOutputs.get(nameOutput);
+    Configuration conf = new Configuration(baseContext.getConfiguration());
+    Job job = new Job(conf);
+    job.getConfiguration().set("crunch.namedoutput", nameOutput);
+    job.setOutputFormatClass(outConfig.bundle.getFormatClass());
+    job.setOutputKeyClass(outConfig.keyClass);
+    job.setOutputValueClass(outConfig.valueClass);
+    outConfig.bundle.configure(job.getConfiguration());
+    taskContext = TaskAttemptContextFactory.create(
+      job.getConfiguration(), baseContext.getTaskAttemptID());
+
+    taskContextCache.put(nameOutput, taskContext);
+    return taskContext;
   }
 
-  private static class OutputState<K, V> {
-    private final TaskAttemptContext context;
-    private final RecordWriter<K, V> recordWriter;
-
-    public OutputState(TaskAttemptContext context, RecordWriter<K, V> recordWriter) {
-      this.context = context;
-      this.recordWriter = recordWriter;
-    }
-
-    public void write(K key, V value) throws IOException, InterruptedException {
-      recordWriter.write(key, value);
-    }
-
-    public void close() throws IOException, InterruptedException {
-      recordWriter.close(context);
-    }
-  }
-
-  private static class CompositeOutputCommitter extends OutputCommitter {
-
-    private final Map<String, OutputConfig> outputs;
-    private final Map<String, OutputCommitter> committers;
-
-    public CompositeOutputCommitter(Map<String, OutputConfig> outputs, Map<String, OutputCommitter> committers) {
-      this.outputs = outputs;
-      this.committers = committers;
-    }
-
-    private TaskAttemptContext getContext(String namedOutput, TaskAttemptContext baseContext) throws IOException {
-      Job job = getJob(namedOutput, baseContext.getConfiguration());
-      configureJob(namedOutput, job, outputs.get(namedOutput));
-      return TaskAttemptContextFactory.create(job.getConfiguration(), baseContext.getTaskAttemptID());
-    }
-
-    @Override
-    public void setupJob(JobContext jobContext) throws IOException {
-      Configuration conf = jobContext.getConfiguration();
-      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
-        Job job = getJob(e.getKey(), conf);
-        configureJob(e.getKey(), job, outputs.get(e.getKey()));
-        e.getValue().setupJob(job);
-      }
-    }
-
-    @Override
-    public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
-      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
-        e.getValue().setupTask(getContext(e.getKey(), taskAttemptContext));
-      }
-    }
-
-    @Override
-    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
-      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
-        if (e.getValue().needsTaskCommit(getContext(e.getKey(), taskAttemptContext))) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
-      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
-        e.getValue().commitTask(getContext(e.getKey(), taskAttemptContext));
-      }
-    }
-
-    @Override
-    public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
-      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
-        e.getValue().abortTask(getContext(e.getKey(), taskAttemptContext));
-      }
-    }
+  private synchronized RecordWriter<K, V> getRecordWriter(
+      TaskAttemptContext taskContext, String namedOutput)
+      throws IOException, InterruptedException {
+    // look for record-writer in the cache
+    RecordWriter<K, V> writer = recordWriters.get(namedOutput);
 
-    @Override
-    public void commitJob(JobContext jobContext) throws IOException {
-      Configuration conf = jobContext.getConfiguration();
-      Set<Path> handledPaths = Sets.newHashSet();
-      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
-        OutputCommitter oc = e.getValue();
-        if (oc instanceof FileOutputCommitter) {
-          Path workPath = ((FileOutputCommitter) oc).getWorkPath();
-          if (handledPaths.contains(workPath)) {
-            continue;
-          } else {
-            handledPaths.add(workPath);
-          }
-        }
-        Job job = getJob(e.getKey(), conf);
-        configureJob(e.getKey(), job, outputs.get(e.getKey()));
-        oc.commitJob(job);
+    // If not in cache, create a new one
+    if (writer == null) {
+      // get the record writer from context output format
+      taskContext.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput);
+      try {
+        OutputFormat format = ReflectionUtils.newInstance(
+            taskContext.getOutputFormatClass(),
+            taskContext.getConfiguration());
+        writer = format.getRecordWriter(taskContext);
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
       }
+      recordWriters.put(namedOutput, writer);
     }
 
-    @Override
-    public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
-      Configuration conf = jobContext.getConfiguration();
-      for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
-        Job job = getJob(e.getKey(), conf);
-        configureJob(e.getKey(), job, outputs.get(e.getKey()));
-        e.getValue().abortJob(job, state);
-      }
-    }
+    return writer;
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
index 5d0f953..3b5b419 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
@@ -18,12 +18,14 @@
 package org.apache.crunch.impl.spark;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+
 import org.apache.crunch.CombineFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PipelineCallable;
@@ -206,6 +208,11 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe
   }
 
   @Override
+  public Map<String, String> getNamedDotFiles() {
+    return ImmutableMap.of("", "");
+  }
+
+  @Override
   public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException {
     doneSignal.await(timeout, timeUnit);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/src/main/config/checkstyle.xml
----------------------------------------------------------------------
diff --git a/src/main/config/checkstyle.xml b/src/main/config/checkstyle.xml
index 4c74367..7cb721b 100644
--- a/src/main/config/checkstyle.xml
+++ b/src/main/config/checkstyle.xml
@@ -74,7 +74,6 @@ under the License.
     <module name="NeedBraces"/>
     <module name="RightCurly"/>
     <module name="AvoidInlineConditionals"/>
-    <module name="DoubleCheckedLocking"/>
     <module name="EmptyStatement"/>
     <module name="EqualsHashCode"/>
     <module name="HiddenField">


Mime
View raw message