Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 07C3B10ED8 for ; Thu, 17 Oct 2013 13:45:35 +0000 (UTC) Received: (qmail 52750 invoked by uid 500); 17 Oct 2013 13:45:29 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 52617 invoked by uid 500); 17 Oct 2013 13:45:27 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 52462 invoked by uid 99); 17 Oct 2013 13:45:22 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Oct 2013 13:45:22 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 746E5916E31; Thu, 17 Oct 2013 13:45:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: <5e544da33cfd421daf38d7c9d42c9323@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-283: Additional diagnostics for the planner dotfile. Date: Thu, 17 Oct 2013 13:45:22 +0000 (UTC) Updated Branches: refs/heads/master 9c42bab14 -> ff56d0539 CRUNCH-283: Additional diagnostics for the planner dotfile. 1. Add the target dependencies that are implied by ParallelDoOptions to the directed graphs (target -> PCollectionImpl) as dotted lines 2. Add a label to each of the clustered subgraphs that includes the Crunch JobID, to make it easier to map from running jobs to the dotfile for diagnosis. Signed-off-by: Josh Wills Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ff56d053 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ff56d053 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ff56d053 Branch: refs/heads/master Commit: ff56d0539f50b402cc23fe34059e1e44309b5677 Parents: 9c42bab Author: Josh Wills Authored: Wed Oct 16 18:57:34 2013 -0700 Committer: Josh Wills Committed: Thu Oct 17 06:43:14 2013 -0700 ---------------------------------------------------------------------- .../crunch/impl/mr/collect/PCollectionImpl.java | 6 +- .../crunch/impl/mr/plan/DotfileWriter.java | 71 +++++++++++++++----- .../crunch/impl/mr/plan/DotfileWriterTest.java | 41 +++++++++++ 3 files changed, 100 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/ff56d053/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java index 43711fc..958d7f6 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java @@ -232,7 +232,11 @@ public abstract class PCollectionImpl implements PCollection { } return pipeline; } - + + public ParallelDoOptions getParallelDoOptions() { + return doOptions; + } + public Set> getTargetDependencies() { Set> targetDeps = doOptions.getSourceTargets(); for (PCollectionImpl parent : getParents()) { http://git-wip-us.apache.org/repos/asf/crunch/blob/ff56d053/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java index 2834fb9..dc4a9c2 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java @@ -18,9 +18,11 @@ package org.apache.crunch.impl.mr.plan; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.crunch.Pair; +import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; import org.apache.crunch.impl.mr.collect.InputCollection; import org.apache.crunch.impl.mr.collect.PCollectionImpl; @@ -28,6 +30,8 @@ import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; import com.google.common.base.Joiner; import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -36,7 +40,7 @@ import com.google.common.collect.Sets; * the topology of Crunch pipelines. */ public class DotfileWriter { - + /** The types of tasks within a MapReduce job. */ enum MRTaskType { MAP, REDUCE }; @@ -47,7 +51,7 @@ public class DotfileWriter { /** * Format the declaration of a node based on a PCollection. - * + * * @param pcollectionImpl PCollection for which a node will be declared * @param jobPrototype The job containing the PCollection * @return The node declaration @@ -63,7 +67,7 @@ public class DotfileWriter { /** * Format a Target as a node declaration. - * + * * @param target A Target used within a MapReduce pipeline * @return The global node declaration for the Target */ @@ -73,7 +77,7 @@ public class DotfileWriter { /** * Format a PCollectionImpl into a format to be used for dot files. - * + * * @param pcollectionImpl The PCollectionImpl to be formatted * @param jobPrototype The job containing the PCollection * @return The dot file formatted representation of the PCollectionImpl @@ -88,29 +92,61 @@ public class DotfileWriter { /** * Format a collection of node strings into dot file syntax. - * + * * @param nodeCollection Collection of chained node strings * @return The dot-formatted chain of nodes */ String formatNodeCollection(List nodeCollection) { - return String.format("%s;", Joiner.on(" -> ").join(nodeCollection)); + return formatNodeCollection(nodeCollection, ImmutableMap.of()); + } + + /** + * Format a collection of node strings into dot file syntax. + * + * @param nodeCollection Collection of chained node strings + * @param edgeAttributes map of attribute names and values to be applied to the edge + * @return The dot-formatted chain of nodes + */ + String formatNodeCollection(List nodeCollection, Map edgeAttributes) { + String edgeAttributeString = ""; + if (!edgeAttributes.isEmpty()) { + edgeAttributeString = String.format(" [%s]", + Joiner.on(' ').withKeyValueSeparator("=").join(edgeAttributes)); + } + return String.format("%s%s;", Joiner.on(" -> ").join(nodeCollection), edgeAttributeString); } /** * Format a NodePath in dot file syntax. - * + * * @param nodePath The node path to be formatted * @param jobPrototype The job containing the NodePath * @return The dot file representation of the node path */ List formatNodePath(NodePath nodePath, JobPrototype jobPrototype) { List formattedNodePaths = Lists.newArrayList(); - - List> pcollections = Lists.newArrayList(nodePath); + + List> pcollections = ImmutableList.copyOf(nodePath); for (int collectionIndex = 1; collectionIndex < pcollections.size(); collectionIndex++){ String fromNode = formatPCollection(pcollections.get(collectionIndex - 1), jobPrototype); String toNode = formatPCollection(pcollections.get(collectionIndex), jobPrototype); - formattedNodePaths.add(formatNodeCollection(Lists.newArrayList(fromNode, toNode))); + formattedNodePaths.add(formatNodeCollection(ImmutableList.of(fromNode, toNode))); + } + + // Add SourceTarget dependencies, if any + for (PCollectionImpl pcollection : pcollections) { + Set> targetDeps = pcollection.getParallelDoOptions().getSourceTargets(); + if (!targetDeps.isEmpty()) { + String toNode = formatPCollection(pcollection, jobPrototype); + for(Target target : targetDeps) { + globalNodeDeclarations.add(formatTargetNodeDeclaration(target)); + String fromNode = String.format("\"%s\"", target.toString()); + formattedNodePaths.add( + formatNodeCollection( + ImmutableList.of(fromNode, toNode), + ImmutableMap.of("style", "dashed"))); + } + } } return formattedNodePaths; } @@ -118,7 +154,7 @@ public class DotfileWriter { /** * Add a NodePath to be formatted as a list of node declarations within a * single job. - * + * * @param jobPrototype The job containing the node path * @param nodePath The node path to be formatted */ @@ -128,7 +164,7 @@ public class DotfileWriter { if (pcollectionImpl instanceof InputCollection) { globalNodeDeclarations.add(formatPCollectionNodeDeclaration(pcollectionImpl, jobPrototype)); } else { - if (!groupingEncountered){ + if (!groupingEncountered) { groupingEncountered = (pcollectionImpl instanceof PGroupedTableImpl); } @@ -141,7 +177,7 @@ public class DotfileWriter { /** * Add the chaining of a NodePath to the graph. - * + * * @param nodePath The path to be formatted as a node chain in the dot file * @param jobPrototype The job containing the NodePath */ @@ -153,7 +189,7 @@ public class DotfileWriter { /** * Get the graph attributes for a task-specific subgraph. - * + * * @param taskType The type of task in the subgraph * @return Graph attributes */ @@ -183,7 +219,7 @@ public class DotfileWriter { /** * Add the contents of a {@link JobPrototype} to the graph describing a * pipeline. - * + * * @param jobPrototype A JobPrototype representing a portion of a MapReduce * pipeline */ @@ -202,7 +238,7 @@ public class DotfileWriter { /** * Build up the full dot file containing the description of a MapReduce * pipeline. - * + * * @return Graphviz dot file contents */ public String buildDotfile() { @@ -217,6 +253,7 @@ public class DotfileWriter { // Must prefix subgraph name with "cluster", otherwise its border won't render. I don't know why. StringBuilder jobProtoStringBuilder = new StringBuilder(); jobProtoStringBuilder.append(String.format(" subgraph \"cluster-job%d\" {\n", jobPrototype.getJobID())); + jobProtoStringBuilder.append(String.format(" label=\"Crunch Job %d\";\n", jobPrototype.getJobID())); for (MRTaskType taskType : MRTaskType.values()){ Pair jobTaskKey = Pair.of(jobPrototype, taskType); if (jobNodeDeclarations.containsKey(jobTaskKey)){ @@ -232,7 +269,7 @@ public class DotfileWriter { jobProtoStringBuilder.append(" }\n"); stringBuilder.append(jobProtoStringBuilder.toString()); } - + for (String nodePathChain : nodePathChains) { stringBuilder.append(String.format(" %s\n", nodePathChain)); } http://git-wip-us.apache.org/repos/asf/crunch/blob/ff56d053/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java index 562238d..e85419c 100644 --- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java @@ -23,7 +23,9 @@ import static org.mockito.Mockito.when; import java.util.List; +import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.Source; +import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; import org.apache.crunch.impl.mr.collect.InputCollection; import org.apache.crunch.impl.mr.collect.PCollectionImpl; @@ -32,6 +34,8 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; public class DotfileWriterTest { @@ -102,13 +106,24 @@ public class DotfileWriterTest { } @Test + public void testFormatNodeCollection_WithStyles() { + List nodeCollection = Lists.newArrayList("one", "two"); + assertEquals( + "one -> two [style=dotted];", + dotfileWriter.formatNodeCollection(nodeCollection, ImmutableMap.of("style", "dotted"))); + } + + @Test public void testFormatNodePath() { PCollectionImpl tail = mock(PCollectionImpl.class); PCollectionImpl head = mock(PCollectionImpl.class); JobPrototype jobPrototype = mock(JobPrototype.class); + ParallelDoOptions doOptions = ParallelDoOptions.builder().build(); when(tail.getName()).thenReturn("tail"); when(head.getName()).thenReturn("head"); + when(tail.getParallelDoOptions()).thenReturn(doOptions); + when(head.getParallelDoOptions()).thenReturn(doOptions); NodePath nodePath = new NodePath(tail); nodePath.close(head); @@ -120,6 +135,32 @@ public class DotfileWriterTest { } @Test + public void testFormatNodePathWithTargetDependencies() { + PCollectionImpl tail = mock(PCollectionImpl.class); + PCollectionImpl head = mock(PCollectionImpl.class); + SourceTarget srcTarget = mock(SourceTarget.class); + JobPrototype jobPrototype = mock(JobPrototype.class); + + ParallelDoOptions tailOptions = ParallelDoOptions.builder().sourceTargets(srcTarget).build(); + ParallelDoOptions headOptions = ParallelDoOptions.builder().build(); + when(srcTarget.toString()).thenReturn("target"); + when(tail.getName()).thenReturn("tail"); + when(head.getName()).thenReturn("head"); + when(tail.getParallelDoOptions()).thenReturn(tailOptions); + when(head.getParallelDoOptions()).thenReturn(headOptions); + + NodePath nodePath = new NodePath(tail); + nodePath.close(head); + + assertEquals( + ImmutableList.of("\"head@" + head.hashCode() + "@" + jobPrototype.hashCode() + "\" -> \"tail@" + + tail.hashCode() + "@" + jobPrototype.hashCode() + "\";", + "\"target\" -> \"tail@" + tail.hashCode() + "@" + jobPrototype.hashCode() + "\" [style=dashed];"), + dotfileWriter.formatNodePath(nodePath, jobPrototype)); + } + + + @Test public void testGetTaskGraphAttributes_Map() { assertEquals("label = Map; color = blue;", dotfileWriter.getTaskGraphAttributes(MRTaskType.MAP)); }