incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject git commit: CRUNCH 105: Add writing of Graphviz dot files for job plan
Date Wed, 31 Oct 2012 18:53:16 GMT
Updated Branches:
  refs/heads/master a8847de58 -> a547e90dc


CRUNCH 105: Add writing of Graphviz dot files for job plan


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/a547e90d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/a547e90d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/a547e90d

Branch: refs/heads/master
Commit: a547e90dce7f51295c95cc1b05d428670f621b43
Parents: a8847de
Author: Gabriel Reid <greid@apache.org>
Authored: Tue Oct 30 00:06:44 2012 +0100
Committer: Gabriel Reid <greid@apache.org>
Committed: Wed Oct 31 19:40:16 2012 +0100

----------------------------------------------------------------------
 .../java/org/apache/crunch/impl/mr/MRPipeline.java |   16 +-
 .../apache/crunch/impl/mr/plan/DotfileWriter.java  |  238 +++++++++++++++
 .../apache/crunch/impl/mr/plan/JobPrototype.java   |   16 +
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java    |    8 +-
 .../crunch/impl/mr/plan/PlanningParameters.java    |   10 +
 .../crunch/impl/mr/plan/DotfileWriterTest.java     |  132 ++++++++
 6 files changed, 416 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a547e90d/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index 043f7b1..60950f3 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -40,7 +40,9 @@ import org.apache.crunch.impl.mr.collect.PCollectionImpl;
 import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
 import org.apache.crunch.impl.mr.collect.UnionCollection;
 import org.apache.crunch.impl.mr.collect.UnionTable;
+import org.apache.crunch.impl.mr.exec.MRExecutor;
 import org.apache.crunch.impl.mr.plan.MSCRPlanner;
+import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
 import org.apache.crunch.impl.mr.run.RuntimeParameters;
 import org.apache.crunch.io.At;
 import org.apache.crunch.io.ReadableSourceTarget;
@@ -136,13 +138,21 @@ public class MRPipeline implements Pipeline {
     this.tempDirectory = createTempDirectory(conf);
   }
 
+  public MRExecutor plan() {
+    MSCRPlanner planner = new MSCRPlanner(this, outputTargets);
+    try {
+      return planner.plan(jarClass, conf);
+    } catch (IOException e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+
   @Override
   public PipelineResult run() {
-    MSCRPlanner planner = new MSCRPlanner(this, outputTargets);
     PipelineResult res = null;
     try {
-      res = planner.plan(jarClass, conf).execute();
-    } catch (IOException e) {
+      res = plan().execute();
+    } catch (CrunchRuntimeException e) {
       LOG.error(e);
       return PipelineResult.EMPTY;
     }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a547e90d/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
new file mode 100644
index 0000000..46d8c53
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.collect.InputCollection;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * Writes <a href="http://www.graphviz.org">Graphviz</a> dot files to illustrate
+ * the topology of Crunch pipelines.
+ */
+public class DotfileWriter {
+  
+  /** The types of tasks within a MapReduce job. */
+  enum MRTaskType { MAP, REDUCE };
+
+  private Set<JobPrototype> jobPrototypes = Sets.newHashSet();
+  private HashMultimap<Pair<JobPrototype, MRTaskType>, String> jobNodeDeclarations
= HashMultimap.create();
+  private Set<String> globalNodeDeclarations = Sets.newHashSet();
+  private Set<String> nodePathChains = Sets.newHashSet();
+
+  /**
+   * Format the declaration of a node based on a PCollection.
+   * 
+   * @param pcollectionImpl PCollection for which a node will be declared
+   * @param jobPrototype The job containing the PCollection
+   * @return The node declaration
+   */
+  String formatPCollectionNodeDeclaration(PCollectionImpl<?> pcollectionImpl, JobPrototype
jobPrototype) {
+    String shape = "box";
+    if (pcollectionImpl instanceof InputCollection) {
+      shape = "folder";
+    }
+    return String.format("%s [label=\"%s\" shape=%s];", formatPCollection(pcollectionImpl,
jobPrototype), pcollectionImpl.getName(),
+        shape);
+  }
+
+  /**
+   * Format a Target as a node declaration.
+   * 
+   * @param target A Target used within a MapReduce pipeline
+   * @return The global node declaration for the Target
+   */
+  String formatTargetNodeDeclaration(Target target) {
+    return String.format("\"%s\" [label=\"%s\" shape=folder];", target.toString(), target.toString());
+  }
+
+  /**
+   * Format a PCollectionImpl into a format to be used for dot files.
+   * 
+   * @param pcollectionImpl The PCollectionImpl to be formatted
+   * @param jobPrototype The job containing the PCollection
+   * @return The dot file formatted representation of the PCollectionImpl
+   */
+  String formatPCollection(PCollectionImpl<?> pcollectionImpl, JobPrototype jobPrototype)
{
+    if (pcollectionImpl instanceof InputCollection) {
+      InputCollection<?> inputCollection = (InputCollection<?>) pcollectionImpl;
+      return String.format("\"%s\"", inputCollection.getSource());
+    }
+    return String.format("\"%s@%d@%d\"", pcollectionImpl.getName(), pcollectionImpl.hashCode(),
jobPrototype.hashCode());
+  }
+
+  /**
+   * Format a collection of node strings into dot file syntax.
+   * 
+   * @param nodeCollection Collection of chained node strings
+   * @return The dot-formatted chain of nodes
+   */
+  String formatNodeCollection(List<String> nodeCollection) {
+    return String.format("%s;", Joiner.on(" -> ").join(nodeCollection));
+  }
+
+  /**
+   * Format a NodePath in dot file syntax.
+   * 
+   * @param nodePath The node path to be formatted
+   * @param jobPrototype The job containing the NodePath
+   * @return The dot file representation of the node path
+   */
+  List<String> formatNodePath(NodePath nodePath, JobPrototype jobPrototype) {
+    List<String> formattedNodePaths = Lists.newArrayList();
+    
+    List<PCollectionImpl<?>> pcollections = Lists.newArrayList(nodePath);
+    for (int collectionIndex = 1; collectionIndex < pcollections.size(); collectionIndex++){
+      String fromNode = formatPCollection(pcollections.get(collectionIndex - 1), jobPrototype);
+      String toNode = formatPCollection(pcollections.get(collectionIndex), jobPrototype);
+      formattedNodePaths.add(formatNodeCollection(Lists.newArrayList(fromNode, toNode)));
+    }
+    return formattedNodePaths;
+  }
+
+  /**
+   * Add a NodePath to be formatted as a list of node declarations within a
+   * single job.
+   * 
+   * @param jobPrototype The job containing the node path
+   * @param nodePath The node path to be formatted
+   */
+  void addNodePathDeclarations(JobPrototype jobPrototype, NodePath nodePath) {
+    boolean groupingEncountered = false;
+    for (PCollectionImpl<?> pcollectionImpl : nodePath) {
+      if (pcollectionImpl instanceof InputCollection) {
+        globalNodeDeclarations.add(formatPCollectionNodeDeclaration(pcollectionImpl, jobPrototype));
+      } else {
+        if (!groupingEncountered){
+          groupingEncountered = (pcollectionImpl instanceof PGroupedTableImpl);
+        }
+
+        MRTaskType taskType = groupingEncountered ? MRTaskType.REDUCE : MRTaskType.MAP;
+        jobNodeDeclarations.put(Pair.of(jobPrototype, taskType), formatPCollectionNodeDeclaration(pcollectionImpl,
jobPrototype));
+      }
+    }
+  }
+
+  /**
+   * Add the chaining of a NodePath to the graph.
+   * 
+   * @param nodePath The path to be formatted as a node chain in the dot file
+   * @param jobPrototype The job containing the NodePath
+   */
+  void addNodePathChain(NodePath nodePath, JobPrototype jobPrototype) {
+    for (String nodePathChain : formatNodePath(nodePath, jobPrototype)){
+      this.nodePathChains.add(nodePathChain);
+    }
+  }
+
+  /**
+   * Get the graph attributes for a task-specific subgraph.
+   * 
+   * @param taskType The type of task in the subgraph
+   * @return Graph attributes
+   */
+  String getTaskGraphAttributes(MRTaskType taskType) {
+    if (taskType == MRTaskType.MAP) {
+      return "label = Map; color = blue;";
+    } else {
+      return "label = Reduce; color = red;";
+    }
+  }
+
+  /**
+   * Add the contents of a {@link JobPrototype} to the graph describing a
+   * pipeline.
+   * 
+   * @param jobPrototype A JobPrototype representing a portion of a MapReduce
+   *          pipeline
+   */
+  public void addJobPrototype(JobPrototype jobPrototype) {
+    jobPrototypes.add(jobPrototype);
+    if (!jobPrototype.isMapOnly()) {
+      for (NodePath nodePath : jobPrototype.getMapNodePaths()) {
+        addNodePathDeclarations(jobPrototype, nodePath);
+        addNodePathChain(nodePath, jobPrototype);
+      }
+    }
+
+    HashMultimap<Target, NodePath> targetsToNodePaths = jobPrototype.getTargetsToNodePaths();
+    for (Target target : targetsToNodePaths.keySet()) {
+      globalNodeDeclarations.add(formatTargetNodeDeclaration(target));
+      for (NodePath nodePath : targetsToNodePaths.get(target)) {
+        addNodePathDeclarations(jobPrototype, nodePath);
+        addNodePathChain(nodePath, jobPrototype);
+        nodePathChains.add(formatNodeCollection(Lists.newArrayList(formatPCollection(nodePath.descendingIterator()
+            .next(), jobPrototype), String.format("\"%s\"", target.toString()))));
+      }
+    }
+  }
+
+  /**
+   * Build up the full dot file containing the description of a MapReduce
+   * pipeline.
+   * 
+   * @return Graphviz dot file contents
+   */
+  public String buildDotfile() {
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append("digraph G {\n");
+    int clusterIndex = 0;
+
+    for (String globalDeclaration : globalNodeDeclarations) {
+      stringBuilder.append(String.format("  %s\n", globalDeclaration));
+    }
+
+    for (JobPrototype jobPrototype : jobPrototypes){
+      StringBuilder jobProtoStringBuilder = new StringBuilder();
+      jobProtoStringBuilder.append(String.format("  subgraph cluster%d {\n", clusterIndex++));
+      for (MRTaskType taskType : MRTaskType.values()){
+        Pair<JobPrototype,MRTaskType> jobTaskKey = Pair.of(jobPrototype, taskType);
+        if (jobNodeDeclarations.containsKey(jobTaskKey)){
+          jobProtoStringBuilder.append(String.format("    subgraph cluster%d {\n", clusterIndex++));
+          jobProtoStringBuilder.append(String.format("      %s\n", getTaskGraphAttributes(taskType)));
+          for (String declarationEntry : jobNodeDeclarations.get(jobTaskKey)){
+            jobProtoStringBuilder.append(String.format("      %s\n", declarationEntry));
+          }
+          jobProtoStringBuilder.append("    }\n");
+        }
+      }
+      jobProtoStringBuilder.append("  }\n");
+      stringBuilder.append(jobProtoStringBuilder.toString());
+    }
+    
+    for (String nodePathChain : nodePathChains) {
+      stringBuilder.append(String.format("  %s\n", nodePathChain));
+    }
+
+    stringBuilder.append("}\n");
+    return stringBuilder.toString();
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a547e90d/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index 0cae079..0ad1d00 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -82,6 +82,22 @@ public class JobPrototype {
     this.targetsToNodePaths = outputPaths;
   }
 
+  public boolean isMapOnly() {
+    return this.group == null;
+  }
+
+  Set<NodePath> getMapNodePaths() {
+    return mapNodePaths;
+  }
+
+  PGroupedTableImpl<?, ?> getGroupingTable() {
+    return group;
+  }
+
+  HashMultimap<Target, NodePath> getTargetsToNodePaths() {
+    return targetsToNodePaths;
+  }
+
   public void addReducePaths(HashMultimap<Target, NodePath> outputPaths) {
     if (group == null) {
       throw new IllegalStateException("Cannot add a reduce phase to a map-only job");

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a547e90d/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 59b95e8..7fe2809 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -55,7 +55,7 @@ public class MSCRPlanner {
       }
       return cmp;
     }
-  };
+  };  
 
   private final MRPipeline pipeline;
   private final Map<PCollectionImpl<?>, Set<Target>> outputs;
@@ -91,6 +91,7 @@ public class MSCRPlanner {
       assignments.putAll(constructJobPrototypes(component));
     }
     
+
     // Add in the job dependency information here.
     for (Map.Entry<Vertex, JobPrototype> e : assignments.entries()) {
       JobPrototype current = e.getValue();
@@ -103,10 +104,15 @@ public class MSCRPlanner {
     }
     
     // Finally, construct the jobs from the prototypes and return.
+    DotfileWriter dotfileWriter = new DotfileWriter();
     MRExecutor exec = new MRExecutor(jarClass);
     for (JobPrototype proto : Sets.newHashSet(assignments.values())) {
+      dotfileWriter.addJobPrototype(proto);
       exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline));
     }
+
+    conf.set(PlanningParameters.PIPELINE_PLAN_DOTFILE, dotfileWriter.buildDotfile());
+
     return exec;
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a547e90d/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
index 6ead212..b90a911 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
@@ -17,12 +17,22 @@
  */
 package org.apache.crunch.impl.mr.plan;
 
+/**
+ * Collection of Configuration keys and various constants used when planning MapReduce jobs
for a
+ * pipeline.
+ */
 public class PlanningParameters {
 
   public static final String MULTI_OUTPUT_PREFIX = "out";
 
   public static final String CRUNCH_WORKING_DIRECTORY = "crunch.work.dir";
 
+  /**
+   * Configuration key under which a <a href="http://www.graphviz.org">DOT</a>
file containing the
+   * pipeline job graph is stored by the planner.
+   */
+  public static final String PIPELINE_PLAN_DOTFILE = "crunch.planner.dotfile";
+
   private PlanningParameters() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a547e90d/crunch/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
new file mode 100644
index 0000000..562238d
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.mr.plan;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+
+import org.apache.crunch.Source;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.collect.InputCollection;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.impl.mr.plan.DotfileWriter.MRTaskType;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Lists;
+
+public class DotfileWriterTest {
+
+  private DotfileWriter dotfileWriter;
+
+  @Before
+  public void setUp() {
+    dotfileWriter = new DotfileWriter();
+  }
+
+  @Test
+  public void testFormatPCollectionNodeDeclaration() {
+    PCollectionImpl<?> pcollectionImpl = mock(PCollectionImpl.class);
+    JobPrototype jobPrototype = mock(JobPrototype.class);
+    when(pcollectionImpl.getName()).thenReturn("collection");
+
+    assertEquals("\"collection@" + pcollectionImpl.hashCode() + "@" + jobPrototype.hashCode()
+        + "\" [label=\"collection\" shape=box];",
+        dotfileWriter.formatPCollectionNodeDeclaration(pcollectionImpl, jobPrototype));
+  }
+
+  @Test
+  public void testFormatPCollectionNodeDeclaration_InputPCollection() {
+    InputCollection<?> inputCollection = mock(InputCollection.class, Mockito.RETURNS_DEEP_STUBS);
+    JobPrototype jobPrototype = mock(JobPrototype.class);
+    when(inputCollection.getName()).thenReturn("input");
+    when(inputCollection.getSource().toString()).thenReturn("source");
+
+    assertEquals("\"source\" [label=\"input\" shape=folder];",
+        dotfileWriter.formatPCollectionNodeDeclaration(inputCollection, jobPrototype));
+  }
+
+  @Test
+  public void testFormatTargetNodeDeclaration() {
+    Target target = mock(Target.class);
+    when(target.toString()).thenReturn("target/path");
+
+    assertEquals("\"target/path\" [label=\"target/path\" shape=folder];",
+        dotfileWriter.formatTargetNodeDeclaration(target));
+  }
+
+  @Test
+  public void testFormatPCollection() {
+    PCollectionImpl<?> pcollectionImpl = mock(PCollectionImpl.class);
+    JobPrototype jobPrototype = mock(JobPrototype.class);
+    when(pcollectionImpl.getName()).thenReturn("collection");
+
+    assertEquals("\"collection@" + pcollectionImpl.hashCode() + "@" + jobPrototype.hashCode()
+ "\"",
+        dotfileWriter.formatPCollection(pcollectionImpl, jobPrototype));
+  }
+
+  @Test
+  public void testFormatPCollection_InputCollection() {
+    InputCollection<Object> inputCollection = mock(InputCollection.class);
+    Source<Object> source = mock(Source.class);
+    JobPrototype jobPrototype = mock(JobPrototype.class);
+    when(source.toString()).thenReturn("mocksource");
+    when(inputCollection.getSource()).thenReturn(source);
+
+    assertEquals("\"mocksource\"", dotfileWriter.formatPCollection(inputCollection, jobPrototype));
+  }
+
+  @Test
+  public void testFormatNodeCollection() {
+    List<String> nodeCollection = Lists.newArrayList("one", "two", "three");
+    assertEquals("one -> two -> three;", dotfileWriter.formatNodeCollection(nodeCollection));
+  }
+
+  @Test
+  public void testFormatNodePath() {
+    PCollectionImpl<?> tail = mock(PCollectionImpl.class);
+    PCollectionImpl<?> head = mock(PCollectionImpl.class);
+    JobPrototype jobPrototype = mock(JobPrototype.class);
+
+    when(tail.getName()).thenReturn("tail");
+    when(head.getName()).thenReturn("head");
+
+    NodePath nodePath = new NodePath(tail);
+    nodePath.close(head);
+
+    assertEquals(
+        Lists.newArrayList("\"head@" + head.hashCode() + "@" + jobPrototype.hashCode() +
"\" -> \"tail@"
+            + tail.hashCode() + "@" + jobPrototype.hashCode() + "\";"),
+        dotfileWriter.formatNodePath(nodePath, jobPrototype));
+  }
+
+  @Test
+  public void testGetTaskGraphAttributes_Map() {
+    assertEquals("label = Map; color = blue;", dotfileWriter.getTaskGraphAttributes(MRTaskType.MAP));
+  }
+
+  @Test
+  public void testGetTaskGraphAttributes_Reduce() {
+    assertEquals("label = Reduce; color = red;", dotfileWriter.getTaskGraphAttributes(MRTaskType.REDUCE));
+  }
+
+}


Mime
View raw message