Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DA1BE10600 for ; Thu, 9 Jan 2014 21:39:41 +0000 (UTC) Received: (qmail 9060 invoked by uid 500); 9 Jan 2014 21:39:41 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 9028 invoked by uid 500); 9 Jan 2014 21:39:41 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 9021 invoked by uid 99); 9 Jan 2014 21:39:41 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Jan 2014 21:39:41 +0000 X-ASF-Spam-Status: No, hits=-2000.1 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 09 Jan 2014 21:39:37 +0000 Received: (qmail 8866 invoked by uid 99); 9 Jan 2014 21:39:15 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Jan 2014 21:39:15 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id EE7A88B44D7; Thu, 9 Jan 2014 21:39:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hitesh@apache.org To: commits@tez.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-686. Add utility to visualize DAGs. (hitesh) Date: Thu, 9 Jan 2014 21:39:14 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/master 74c622591 -> 828000f01 TEZ-686. Add utility to visualize DAGs. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/828000f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/828000f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/828000f0 Branch: refs/heads/master Commit: 828000f01425a0b7a76b47b25a489d005d524120 Parents: 74c6225 Author: Hitesh Shah Authored: Thu Jan 9 13:38:50 2014 -0800 Committer: Hitesh Shah Committed: Thu Jan 9 13:38:50 2014 -0800 ---------------------------------------------------------------------- .../apache/tez/dag/api/TezConfiguration.java | 5 + .../org/apache/tez/dag/app/DAGAppMaster.java | 75 ++++++ .../java/org/apache/tez/dag/utils/Graph.java | 256 +++++++++++++++++++ 3 files changed, 336 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/828000f0/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index fb3c9a9..55bcedc 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -295,4 +295,9 @@ public class TezConfiguration extends Configuration { */ public static final String TEZ_QUEUE_NAME = TEZ_PREFIX + "queue.name"; + + public static final String TEZ_GENERATE_DAG_VIZ = + TEZ_PREFIX + "generate.dag.viz"; + public static final boolean TEZ_GENERATE_DAG_VIZ_DEFAULT = false; + } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/828000f0/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index a072b5b..02a0b4f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -89,6 +90,7 @@ import org.apache.tez.dag.api.client.DAGClientServer; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; +import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto; @@ -125,6 +127,7 @@ import org.apache.tez.dag.history.HistoryEventHandler; import org.apache.tez.dag.history.avro.HistoryEventType; import org.apache.tez.dag.history.events.AMStartedEvent; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.utils.Graph; import org.apache.tez.runtime.library.common.security.JobTokenSecretManager; import com.google.common.annotations.VisibleForTesting; import org.apache.tez.runtime.library.common.security.TokenCache; @@ -501,9 +504,81 @@ public class DAGAppMaster extends AbstractService { currentUser.getShortUserName(), taskHeartbeatHandler, context); + if (dagConf.getBoolean(TezConfiguration.TEZ_GENERATE_DAG_VIZ, + TezConfiguration.TEZ_GENERATE_DAG_VIZ_DEFAULT)) { + generateDAGVizFile(dagId, dagPB); + } + return newDag; } // end createDag() + String getShortClassName(String className) { + int pos = className.lastIndexOf("."); + if (pos != -1 && pos < className.length()-1) { + return className.substring(pos+1); + } + return className; + } + + private void generateDAGVizFile(TezDAGID dagId, DAGPlan dagPB) { + Graph graph = new Graph(dagPB.getName()); + + for (VertexPlan v : dagPB.getVertexList()) { + String nodeLabel = v.getName().replace("\\W","_") + + "[" + getShortClassName(v.getProcessorDescriptor().getClassName() + "]"); + Graph.Node n = graph.newNode(v.getName(), nodeLabel); + for (DAGProtos.RootInputLeafOutputProto input : v.getInputsList()) { + Graph.Node inputNode = graph.getNode(v.getName() + + "_" + input.getName()); + inputNode.setLabel(v.getName() + "[" + input.getName() + "]"); + inputNode.setShape("box"); + inputNode.addEdge(n, "Input" + + " [inputClass=" + getShortClassName(input.getEntityDescriptor().getClassName()) + + ", initializer=" + getShortClassName(input.getInitializerClassName()) + "]"); + } + for (DAGProtos.RootInputLeafOutputProto output : v.getOutputsList()) { + Graph.Node outputNode = graph.getNode(v.getName() + + "_" + output.getName()); + outputNode.setLabel(v.getName() + "[" + output.getName() + "]"); + outputNode.setShape("box"); + n.addEdge(outputNode, "Output" + + " [outputClass=" + getShortClassName(output.getEntityDescriptor().getClassName()) + + ", initializer=" + getShortClassName(output.getInitializerClassName()) + "]"); + } + } + + for (DAGProtos.EdgePlan e : dagPB.getEdgeList()) { + + Graph.Node n = graph.getNode(e.getInputVertexName()); + n.addEdge(graph.getNode(e.getOutputVertexName()), + "[" + + "input=" + getShortClassName(e.getEdgeSource().getClassName()) + + ", output=" + getShortClassName(e.getEdgeDestination().getClassName()) + + ", dataMovement=" + e.getDataMovementType().name().trim() + + ", schedulingType=" + e.getSchedulingType().name().trim() + "]"); + } + + String logDirs = System.getenv(Environment.LOG_DIRS.name()); + String outputFile = ""; + if (logDirs != null && !logDirs.isEmpty()) { + int pos = logDirs.indexOf(","); + if (pos != -1) { + outputFile += logDirs.substring(0, pos); + } else { + outputFile += logDirs; + } + outputFile += File.separator; + } + outputFile += dagId.toString() + ".dot"; + + try { + graph.save(outputFile); + } catch (IOException e) { + LOG.warn("Error occurred when trying to save graph structure" + + " for dag " + dagId.toString(), e); + } + } + protected void addIfService(Object object, boolean addDispatcher) { if (object instanceof Service) { Service service = (Service) object; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/828000f0/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java new file mode 100644 index 0000000..ecead77 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.utils; + +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.hadoop.classification.InterfaceAudience.Private; + +@Private +public class Graph { + public class Edge { + Node from; + Node to; + String label; + + public Edge(Node from, Node to, String info) { + this.from = from; + this.to = to; + this.label = info; + } + + public boolean sameAs(Edge rhs) { + if (this.from == rhs.from && + this.to == rhs.to) { + return true; + } + return false; + } + + public Edge combine(Edge rhs) { + String newlabel = this.label + "," + rhs.label; + return new Edge(this.from, this.to, newlabel); + } + } + + public class Node { + Graph parent; + String id; + List ins; + List outs; + String label; + String shape; + + public Node(String id) { + this(id, null); + } + + public Node(String id, String label) { + this.id = id; + this.label = label; + this.parent = Graph.this; + this.ins = new ArrayList(); + this.outs = new ArrayList(); + } + + public Graph getParent() { + return parent; + } + + public Node addEdge(Node to, String info) { + Edge e = new Edge(this, to, info); + outs.add(e); + to.ins.add(e); + return this; + } + + public String getLabel() { + if (label != null && !label.isEmpty()) { + return label; + } + return id; + } + + public void setLabel(String label) { + this.label = label; + } + + public String getUniqueId() { + return Graph.this.name + "." + id; + } + + public void setShape(String shape) { + this.shape = shape; + } + } + + private String name; + private Graph parent; + private Set nodes = new HashSet(); + private Set subgraphs = new HashSet(); + + public Graph(String name, Graph parent) { + this.name = name; + this.parent = parent; + } + + public Graph(String name) { + this(name, null); + } + + public Graph() { + this("graph", null); + } + + public String getName() { + return name; + } + + public Graph getParent() { + return parent; + } + + public Node newNode(String id, String label) { + Node ret = new Node(id, label); + nodes.add(ret); + return ret; + } + + private Node newNode(String id) { + return newNode(id, null); + } + + public Node getNode(String id) { + for (Node node : nodes) { + if (node.id.equals(id)) { + return node; + } + } + return newNode(id); + } + + public Graph newSubGraph(String name) { + Graph ret = new Graph(name, this); + subgraphs.add(ret); + return ret; + } + + public void addSubGraph(Graph graph) { + subgraphs.add(graph); + graph.parent = this; + } + + private static String wrapSafeString(String label) { + if (label.indexOf(',') >= 0) { + if (label.length()>14) { + label = label.replaceAll(",", ",\n"); + } + } + label = "\"" + StringEscapeUtils.escapeJava(label) + "\""; + return label; + } + + public String generateGraphViz(String indent) { + StringBuilder sb = new StringBuilder(); + if (this.parent == null) { + sb.append("digraph " + name + " {"); + sb.append(System.getProperty("line.separator")); + sb.append(String.format("graph [ label=%s, fontsize=24, fontname=Helvetica];", + wrapSafeString(name))); + sb.append(System.getProperty("line.separator")); + sb.append("node [fontsize=12, fontname=Helvetica];"); + sb.append(System.getProperty("line.separator")); + sb.append("edge [fontsize=9, fontcolor=blue, fontname=Arial];"); + sb.append(System.getProperty("line.separator")); + } else { + sb.append("subgraph cluster_" + name + " {\nlabel=\"" + name + "\""); + sb.append(System.getProperty("line.separator")); + } + for (Graph g : subgraphs) { + String ginfo = g.generateGraphViz(indent+" "); + sb.append(ginfo); + sb.append(System.getProperty("line.separator")); + } + for (Node n : nodes) { + if (n.shape != null && !n.shape.isEmpty()) { + sb.append(String.format( + "%s%s [ label = %s, shape = %s ];", + indent, + wrapSafeString(n.getUniqueId()), + wrapSafeString(n.getLabel()), + wrapSafeString(n.shape))); + } else { + sb.append(String.format( + "%s%s [ label = %s ];", + indent, + wrapSafeString(n.getUniqueId()), + wrapSafeString(n.getLabel()))); + } + sb.append(System.getProperty("line.separator")); + List combinedOuts = combineEdges(n.outs); + for (Edge e : combinedOuts) { + sb.append(String.format( + "%s%s -> %s [ label = %s ];", + indent, + wrapSafeString(e.from.getUniqueId()), + wrapSafeString(e.to.getUniqueId()), + wrapSafeString(e.label))); + sb.append(System.getProperty("line.separator")); + } + } + sb.append("}"); + sb.append(System.getProperty("line.separator")); + return sb.toString(); + } + + public String generateGraphViz() { + return generateGraphViz(""); + } + + public void save(String filePath) throws IOException { + FileWriter fout = new FileWriter(filePath); + fout.write(generateGraphViz()); + fout.close(); + } + + public static List combineEdges(List edges) { + List ret = new ArrayList(); + for (Edge edge : edges) { + boolean found = false; + for (int i = 0; i < ret.size(); i++) { + Edge current = ret.get(i); + if (edge.sameAs(current)) { + ret.set(i, current.combine(edge)); + found = true; + break; + } + } + if (!found) { + ret.add(edge); + } + } + return ret; + } +}