tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject [1/2] tez git commit: TEZ-2692. bugfixes & enhancements related to job parser and analyzer (rbalamohan)
Date Tue, 11 Aug 2015 12:46:18 GMT
Repository: tez
Updated Branches:
  refs/heads/master eadbfec44 -> ecd90dc1f


http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
index 8df40ba..a570493 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.analyzer.plugins;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.analyzer.Analyzer;
@@ -43,15 +44,21 @@ import java.util.Map;
  */
 public class ShuffleTimeAnalyzer implements Analyzer {
 
-  private static final String SHUFFLE_TIME_RATIO = "tez.shuffle-time-analyzer.shuffle.ratio";
-  private static final float SHUFFLE_TIME_RATIO_DEFAULT = 0.5f;
+  /**
+   * ratio of (total time taken by task - shuffle time) / (total time taken by task)
+   */
+  private static final String REAL_WORK_DONE_RATIO = "tez.shuffle-time-analyzer.real-work.done.ratio";
+  private static final float REAL_WORK_DONE_RATIO_DEFAULT = 0.5f;
 
+  /**
+   * Number of min records that needs to get in as reduce input records.
+   */
   private static final String MIN_SHUFFLE_RECORDS = "tez.shuffle-time-analyzer.shuffle.min.records";
   private static final long MIN_SHUFFLE_RECORDS_DEFAULT = 10000;
 
   private static final String[] headers = { "vertexName", "taskAttemptId", "Node", "counterGroup",
       "Comments", "REDUCE_INPUT_GROUPS", "REDUCE_INPUT_RECORDS", "ratio", "SHUFFLE_BYTES",
-      "Time taken to receive all events", "MERGE_PHASE_TIME", "SHUFFLE_PHASE_TIME",
+      "TotalTime", "Time_taken_to_receive_all_events", "MERGE_PHASE_TIME", "SHUFFLE_PHASE_TIME",
       "TimeTaken_For_Real_Task", "FIRST_EVENT_RECEIVED", "LAST_EVENT_RECEIVED",
       "SHUFFLE_BYTES_DISK_DIRECT" };
 
@@ -59,15 +66,15 @@ public class ShuffleTimeAnalyzer implements Analyzer {
 
   private final Configuration config;
 
-  private final float shuffleTimeRatio;
+  private final float realWorkDoneRatio;
   private final long minShuffleRecords;
 
 
   public ShuffleTimeAnalyzer(Configuration config) {
     this.config = config;
 
-    shuffleTimeRatio = config.getFloat
-        (SHUFFLE_TIME_RATIO, SHUFFLE_TIME_RATIO_DEFAULT);
+    realWorkDoneRatio = config.getFloat
+        (REAL_WORK_DONE_RATIO, REAL_WORK_DONE_RATIO_DEFAULT);
     minShuffleRecords = config.getLong(MIN_SHUFFLE_RECORDS, MIN_SHUFFLE_RECORDS_DEFAULT);
   }
 
@@ -105,15 +112,20 @@ public class ShuffleTimeAnalyzer implements Analyzer {
             result.add(counterGroupName);
 
             //Real work done in the task
-            long timeTakenForRealWork = attemptInfo.getTimeTaken() -
-                Long.parseLong(getCounterValue(TaskCounter.MERGE_PHASE_TIME, counterGroupName,
-                    attemptInfo));
-
             String comments = "";
-            if ((timeTakenForRealWork * 1.0f / attemptInfo.getTimeTaken()) < shuffleTimeRatio)
{
-              comments = "Time taken in shuffle is more than the actual work being done in
task. "
-                  + " Check if source/destination machine is a slow node. Check if merge
phase "
-                  + "time is more to understand disk bottlenecks in this node.  Check for
skew";
+            String mergePhaseTime = getCounterValue(TaskCounter.MERGE_PHASE_TIME,
+                counterGroupName, attemptInfo);
+            String timeTakenForRealWork = "";
+            if (!Strings.isNullOrEmpty(mergePhaseTime)) {
+              long realWorkDone = attemptInfo.getTimeTaken() - Long.parseLong(mergePhaseTime);
+
+              if ((realWorkDone * 1.0f / attemptInfo.getTimeTaken()) < realWorkDoneRatio)
{
+                comments = "Time taken in shuffle is more than the actual work being done
in task. "
+                    + " Check if source/destination machine is a slow node. Check if merge
phase "
+                    + "time is more to understand disk bottlenecks in this node.  Check for
skew";
+              }
+
+              timeTakenForRealWork = Long.toString(realWorkDone);
             }
             result.add(comments);
 
@@ -122,13 +134,14 @@ public class ShuffleTimeAnalyzer implements Analyzer {
             result.add("" + (1.0f * reduceInputGroupsVal / reduceInputRecordsVal));
             result.add(getCounterValue(TaskCounter.SHUFFLE_BYTES, counterGroupName, attemptInfo));
 
+            result.add(Long.toString(attemptInfo.getTimeTaken()));
+
             //Total time taken for receiving all events from source tasks
             result.add(getOverheadFromSourceTasks(counterGroupName, attemptInfo));
             result.add(getCounterValue(TaskCounter.MERGE_PHASE_TIME, counterGroupName, attemptInfo));
             result.add(getCounterValue(TaskCounter.SHUFFLE_PHASE_TIME, counterGroupName,
attemptInfo));
 
-
-            result.add(Long.toString(timeTakenForRealWork));
+            result.add(timeTakenForRealWork);
 
             result.add(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED, counterGroupName,
attemptInfo));
             result.add(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED, counterGroupName,
attemptInfo));
@@ -150,11 +163,16 @@ public class ShuffleTimeAnalyzer implements Analyzer {
    * @return String
    */
   private String getOverheadFromSourceTasks(String counterGroupName, TaskAttemptInfo attemptInfo)
{
-    long firstEventReceived = Long.parseLong(getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED,
-        counterGroupName, attemptInfo));
-    long lastEventReceived = Long.parseLong(getCounterValue(TaskCounter.LAST_EVENT_RECEIVED,
-        counterGroupName, attemptInfo));
-    return Long.toString(lastEventReceived - firstEventReceived);
+    String firstEventReceived = getCounterValue(TaskCounter.FIRST_EVENT_RECEIVED,
+        counterGroupName, attemptInfo);
+    String lastEventReceived = getCounterValue(TaskCounter.LAST_EVENT_RECEIVED,
+        counterGroupName, attemptInfo);
+
+    if (!Strings.isNullOrEmpty(firstEventReceived) && !Strings.isNullOrEmpty(lastEventReceived))
{
+      return Long.toString(Long.parseLong(lastEventReceived) - Long.parseLong(firstEventReceived));
+    } else {
+      return "";
+    }
   }
 
   private String getCounterValue(TaskCounter counter, String counterGroupName,

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
index 8152344..f09380d 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java
@@ -57,6 +57,10 @@ import java.util.Map;
  */
 public class SkewAnalyzer implements Analyzer {
 
+  /**
+   * Amount of bytes that was sent as shuffle bytes from source. If it is below this threshold,
+   * it would not be considered for analysis.
+   */
   private static final String SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE = "tez.skew-analyzer.shuffle"
       + ".bytes.per.source";
   private static final long SHUFFLE_BYTES_PER_ATTEMPT_PER_SOURCE_DEFAULT = 900 * 1024 * 1024l;

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
index 7c7f5c0..1a8d9d3 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java
@@ -41,7 +41,7 @@ import java.util.List;
 public class SlowTaskIdentifier implements Analyzer {
 
   private static final String[] headers = { "vertexName", "taskAttemptId",
-      "Node", "taskDuration", "Status",
+      "Node", "taskDuration", "Status", "diagnostics",
       "NoOfInputs" };
 
   private final CSVResult csvResult;
@@ -72,14 +72,21 @@ public class SlowTaskIdentifier implements Analyzer {
       }
     });
 
-    int limit = config.getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT);
-    for(int i=0;i<limit;i++) {
+    int limit = Math.min(taskAttempts.size(),
+        Math.max(0, config.getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT)));
+
+    if (limit == 0) {
+      return;
+    }
+
+    for (int i = 0; i < limit - 1; i++) {
       List<String> record = Lists.newLinkedList();
       record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getVertexName());
       record.add(taskAttempts.get(i).getTaskAttemptId());
       record.add(taskAttempts.get(i).getContainer().getHost());
       record.add(taskAttempts.get(i).getTimeTaken() + "");
       record.add(taskAttempts.get(i).getStatus());
+      record.add(taskAttempts.get(i).getDiagnostics());
       record.add(taskAttempts.get(i).getTaskInfo().getVertexInfo().getInputEdges().size()
+ "");
 
       csvResult.addRecord(record.toArray(new String[record.size()]));

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
index b7fca0b..c8d9695 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java
@@ -29,6 +29,7 @@ import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.history.parser.datamodel.DagInfo;
 import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
 import org.apache.tez.history.parser.datamodel.VertexInfo;
 
 import java.util.List;
@@ -41,7 +42,7 @@ public class SlowestVertexAnalyzer implements Analyzer {
 
   private static final String[] headers = { "vertexName", "taskAttempts", "totalTime",
       "shuffleTime", "shuffleTime_Max", "LastEventReceived", "LastEventReceivedFrom",
-      "TimeTaken_ForRealWork", "75thPercentile", "95thPercentile", "98thPercentile", "Median",
+      "75thPercentile", "95thPercentile", "98thPercentile", "Median",
       "observation", "comments" };
 
   private final CSVResult csvResult = new CSVResult(headers);
@@ -50,8 +51,27 @@ public class SlowestVertexAnalyzer implements Analyzer {
   private final MetricRegistry metrics = new MetricRegistry();
   private Histogram taskAttemptRuntimeHistorgram;
 
+  private final static String MAX_VERTEX_RUNTIME = "tez.slowest-vertex-analyzer.max.vertex.runtime";
+  private final static long MAX_VERTEX_RUNTIME_DEFAULT = 100000;
+
+  private final long vertexRuntimeThreshold;
+
   public SlowestVertexAnalyzer(Configuration config) {
     this.config = config;
+    this.vertexRuntimeThreshold = Math.max(1, config.getLong(MAX_VERTEX_RUNTIME,
+        MAX_VERTEX_RUNTIME_DEFAULT));
+
+  }
+
+  private long getTaskRuntime(VertexInfo vertexInfo) {
+    TaskInfo firstTaskToStart = vertexInfo.getFirstTaskToStart();
+    TaskInfo lastTaskToFinish = vertexInfo.getLastTaskToFinish();
+
+    DagInfo dagInfo = vertexInfo.getDagInfo();
+    long totalTime = ((lastTaskToFinish == null) ?
+        dagInfo.getFinishTime() : lastTaskToFinish.getFinishTime()) -
+        ((firstTaskToStart == null) ? dagInfo.getStartTime() : firstTaskToStart.getStartTime());
+    return totalTime;
   }
 
   @Override
@@ -59,9 +79,13 @@ public class SlowestVertexAnalyzer implements Analyzer {
 
     for (VertexInfo vertexInfo : dagInfo.getVertices()) {
       String vertexName = vertexInfo.getVertexName();
-      long totalTime = vertexInfo.getTimeTaken();
+      if (vertexInfo.getFirstTaskToStart()  == null || vertexInfo.getLastTaskToFinish() ==
null) {
+        continue;
+      }
+
+      long totalTime = getTaskRuntime(vertexInfo);
 
-      long max = Long.MIN_VALUE;
+      long slowestLastEventTime = Long.MIN_VALUE;
       String maxSourceName = "";
       taskAttemptRuntimeHistorgram = metrics.histogram(vertexName);
 
@@ -81,10 +105,8 @@ public class SlowestVertexAnalyzer implements Analyzer {
             continue;
           }
           //Find the slowest last event received
-          if (entry.getValue().getValue() > max) {
-            //w.r.t vertex start time.
-            max =(attemptInfo.getStartTimeInterval() +  entry.getValue().getValue()) -
-                (vertexInfo.getStartTimeInterval());
+          if (entry.getValue().getValue() > slowestLastEventTime) {
+            slowestLastEventTime = entry.getValue().getValue();
             maxSourceName = entry.getKey();
           }
         }
@@ -104,9 +126,7 @@ public class SlowestVertexAnalyzer implements Analyzer {
           }
           //Find the slowest last event received
           if (entry.getValue().getValue() > shuffleMax) {
-            //w.r.t vertex start time.
-            shuffleMax =(attemptInfo.getStartTimeInterval() +  entry.getValue().getValue())
-
-                (vertexInfo.getStartTimeInterval());
+            shuffleMax = entry.getValue().getValue();
             shuffleMaxSource = entry.getKey();
           }
         }
@@ -120,9 +140,10 @@ public class SlowestVertexAnalyzer implements Analyzer {
       record.add(totalTime + "");
       record.add(Math.max(0, shuffleMax) + "");
       record.add(shuffleMaxSource);
-      record.add(Math.max(0, max) + "");
+      record.add(Math.max(0, slowestLastEventTime) + "");
       record.add(maxSourceName);
-      record.add(Math.max(0,(totalTime - max)) + "");
+      //Finding out real_work done at vertex level might be meaningless (as it is quite posisble
+      // that it went to starvation).
 
       StringBuilder sb = new StringBuilder();
       double percentile75 = taskAttemptRuntimeHistorgram.getSnapshot().get75thPercentile();
@@ -145,7 +166,7 @@ public class SlowestVertexAnalyzer implements Analyzer {
 
       if (totalTime > 0 && vertexInfo.getTaskAttempts().size() > 0) {
         if ((shuffleMax * 1.0f / totalTime) > 0.5) {
-          if ((max * 1.0f / totalTime) > 0.5) {
+          if ((slowestLastEventTime * 1.0f / totalTime) > 0.5) {
             comments = "This vertex is slow due to its dependency on parent. Got a lot delayed
last"
                 + " event received";
           } else {
@@ -153,8 +174,9 @@ public class SlowestVertexAnalyzer implements Analyzer {
                 "Spending too much time on shuffle. Check shuffle bytes from previous vertex";
           }
         } else {
-          if (totalTime > 10000) { //greater than 10 seconds. //TODO: Configure it later.
-            comments = "Concentrate on this vertex (totalTime > 10 seconds)";
+          if (totalTime > vertexRuntimeThreshold) { //greater than X seconds.
+            comments = "Concentrate on this vertex (totalTime > " + vertexRuntimeThreshold
+                + " seconds)";
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
index c650104..83b1bb0 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java
@@ -49,12 +49,21 @@ public class SpillAnalyzerImpl implements Analyzer {
 
   private final CSVResult csvResult;
 
-  private static long OUTPUT_BYTES_THRESHOLD = 1 * 1024 * 1024 * 1024l;
+  /**
+   * Minimum output bytes that should be chunrned out by a task
+   */
+  private static final String OUTPUT_BYTES_THRESHOLD = "tez.spill-analyzer.min.output.bytes"
+      + ".threshold";
+  private static long OUTPUT_BYTES_THRESHOLD_DEFAULT = 1 * 1024 * 1024 * 1024l;
+
+  private final long minOutputBytesPerTask;
 
   private final Configuration config;
 
   public SpillAnalyzerImpl(Configuration config) {
     this.config = config;
+    minOutputBytesPerTask = Math.max(0, config.getLong(OUTPUT_BYTES_THRESHOLD,
+        OUTPUT_BYTES_THRESHOLD_DEFAULT));
     this.csvResult = new CSVResult(headers);
   }
 
@@ -83,7 +92,7 @@ public class SpillAnalyzerImpl implements Analyzer {
           long outputRecords = outputRecordsMap.get(source).getValue();
           long spilledRecords = spilledRecordsMap.get(source).getValue();
 
-          if (spillCount > 1 && outBytes > OUTPUT_BYTES_THRESHOLD) {
+          if (spillCount > 1 && outBytes > minOutputBytesPerTask) {
             List<String> recorList = Lists.newLinkedList();
             recorList.add(vertexName);
             recorList.add(attemptInfo.getTaskAttemptId());
@@ -95,7 +104,7 @@ public class SpillAnalyzerImpl implements Analyzer {
             recorList.add(outputRecords + "");
             recorList.add(spilledRecords + "");
             recorList.add("Consider increasing " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB
-                + ", try increasing container size.");
+                + ". Try increasing container size.");
 
             csvResult.addRecord(recorList.toArray(new String[recorList.size()]));
           }

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
new file mode 100644
index 0000000..c07ff83
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.plugins;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.TreeMultiset;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.analyzer.Analyzer;
+import org.apache.tez.analyzer.CSVResult;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Analyze concurrent tasks running in every vertex at regular intervals.
+ */
+public class TaskConcurrencyAnalyzer implements Analyzer {
+
+  private static final String[] headers = { "time", "vertexName", "concurrentTasksRunning"
};
+
+  private final CSVResult csvResult;
+  private final Configuration config;
+
+  public TaskConcurrencyAnalyzer(Configuration conf) {
+    this.csvResult = new CSVResult(headers);
+    this.config = conf;
+  }
+
+  private enum EventType {START, FINISH}
+
+  static class TimeInfo {
+    EventType eventType;
+    long timestamp;
+    int concurrentTasks;
+
+    public TimeInfo(EventType eventType, long timestamp) {
+      this.eventType = eventType;
+      this.timestamp = timestamp;
+    }
+  }
+
+  @Override
+  public void analyze(DagInfo dagInfo) throws TezException {
+
+    //For each vertex find the concurrent tasks running at any point
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      List<TaskAttemptInfo> taskAttempts =
+          Lists.newLinkedList(vertexInfo.getTaskAttempts(true, null));
+
+      String vertexName = vertexInfo.getVertexName();
+
+      /**
+       * - Get sorted multi-set of timestamps (S1, S2,...E1, E2..). Possible to have multiple
+       * tasks starting/ending at same time.
+       * - Walk through the set
+       * - Increment concurrent tasks when start event is encountered
+       * - Decrement concurrent tasks when start event is encountered
+       */
+      TreeMultiset<TimeInfo> timeInfoSet = TreeMultiset.create(new Comparator<TimeInfo>()
{
+        @Override public int compare(TimeInfo o1, TimeInfo o2) {
+          return (o1.timestamp < o2.timestamp) ? -1 :
+              ((o1.timestamp == o2.timestamp) ? 0 : 1);
+        }
+      });
+
+      for (TaskAttemptInfo attemptInfo : taskAttempts) {
+        TimeInfo startTimeInfo = new TimeInfo(EventType.START, attemptInfo.getStartTime());
+        TimeInfo stopTimeInfo = new TimeInfo(EventType.FINISH, attemptInfo.getFinishTime());
+
+        timeInfoSet.add(startTimeInfo);
+        timeInfoSet.add(stopTimeInfo);
+      }
+
+      //Compute concurrent tasks in the list now.
+      int concurrentTasks = 0;
+      for(TimeInfo timeInfo : timeInfoSet.elementSet()) {
+        switch (timeInfo.eventType) {
+        case START:
+          concurrentTasks += timeInfoSet.count(timeInfo);
+          break;
+        case FINISH:
+          concurrentTasks -= timeInfoSet.count(timeInfo);
+          break;
+        default:
+          break;
+        }
+        timeInfo.concurrentTasks = concurrentTasks;
+        addToResult(vertexName, timeInfo.timestamp, timeInfo.concurrentTasks);
+      }
+    }
+  }
+
+  private void addToResult(String vertexName, long currentTime, int concurrentTasks) {
+    String[] record = { currentTime + "", vertexName, concurrentTasks + "" };
+    csvResult.addRecord(record);
+  }
+
+  @Override
+  public CSVResult getResult() throws TezException {
+    return csvResult;
+  }
+
+  @Override
+  public String getName() {
+    return "TaskConcurrencyAnalyzer";
+  }
+
+  @Override
+  public String getDescription() {
+    return "Analyze how many tasks were running in every vertex at given point in time. This
"
+        + "would be helpful in understanding whether any starvation was there or not.";
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return config;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
new file mode 100644
index 0000000..4a582bb
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.analyzer.utils;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.plutext.jaxb.svg11.Line;
+import org.plutext.jaxb.svg11.ObjectFactory;
+import org.plutext.jaxb.svg11.Svg;
+import org.plutext.jaxb.svg11.Title;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.namespace.QName;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.TreeSet;
+
+public class SVGUtils {
+
+  private static final String UTF8 = "UTF-8";
+
+  private static final Logger LOG = LoggerFactory.getLogger(SVGUtils.class);
+
+
+  private final ObjectFactory objectFactory;
+  private final Svg svg;
+  private final QName titleName = new QName("title");
+
+  private static int MAX_DAG_RUNTIME = 0;
+  private static final int SCREEN_WIDTH = 1800;
+
+  private final DagInfo dagInfo;
+
+  //Gap between various components
+  private static final int DAG_GAP = 70;
+  private static final int VERTEX_GAP = 50;
+  private static final int TASK_GAP = 5;
+  private static final int STROKE_WIDTH = 5;
+
+  //To compute the size of the graph.
+  private long MIN_X = Long.MAX_VALUE;
+  private long MAX_X = Long.MIN_VALUE;
+
+  private int x1 = 0;
+  private int y1 = 0;
+  private int y2 = 0;
+
+  public SVGUtils(DagInfo dagInfo) {
+    this.dagInfo = dagInfo;
+    this.objectFactory = new ObjectFactory();
+    this.svg = objectFactory.createSvg();
+  }
+
+  private Line createLine(int x1, int y1, int x2, int y2) {
+    Line line = objectFactory.createLine();
+    line.setX1(scaleDown(x1) + "");
+    line.setY1(y1 + "");
+    line.setX2(scaleDown(x2) + "");
+    line.setY2(y2 + "");
+    return line;
+  }
+
+  private Title createTitle(String msg) {
+    Title t = objectFactory.createTitle();
+    t.setContent(msg);
+    return t;
+  }
+
+  private Title createTitleForVertex(VertexInfo vertex) {
+    String titleStr = vertex.getVertexName() + ":"
+        + (vertex.getFinishTimeInterval())
+        + " ms, RelativeTimeToDAG:"
+        + (vertex.getInitTime() - this.dagInfo.getStartTime())
+        + " ms, counters:" + vertex.getTezCounters();
+    Title title = createTitle(titleStr);
+    return title;
+  }
+
+  private Title createTitleForTaskAttempt(TaskAttemptInfo taskAttemptInfo) {
+    String titleStr = "RelativeTimeToVertex:"
+        + (taskAttemptInfo.getStartTime() -
+        taskAttemptInfo.getTaskInfo().getVertexInfo().getInitTime()) +
+        " ms, " + taskAttemptInfo.toString() + ", counters:" + taskAttemptInfo.getTezCounters();
+    Title title = createTitle(titleStr);
+    return title;
+  }
+
+  /**
+   * Draw DAG from dagInfo
+   *
+   * @param dagInfo
+   */
+  private void drawDAG(DagInfo dagInfo) {
+    Title title = createTitle(dagInfo.getDagId() + " : " + dagInfo.getTimeTaken() + " ms");
+    int duration = (int) dagInfo.getFinishTimeInterval();
+    MAX_DAG_RUNTIME = duration;
+    MIN_X = Math.min(dagInfo.getStartTimeInterval(), MIN_X);
+    MAX_X = Math.max(dagInfo.getFinishTimeInterval(), MAX_X);
+    Line line = createLine(x1, y1, x1 + duration, y2);
+    line.getSVGDescriptionClass().add(new JAXBElement<Title>(titleName, Title.class,
title));
+    line.setStyle("stroke: black; stroke-width:20");
+    line.setOpacity("0.3");
+    svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass().add(line);
+    drawVertex();
+  }
+
+  private Collection<VertexInfo> getSortedVertices() {
+    Collection<VertexInfo> vertices = this.dagInfo.getVertices();
+    // Add corresponding vertex details
+    TreeSet<VertexInfo> vertexSet = new TreeSet<VertexInfo>(
+        new Comparator<VertexInfo>() {
+          @Override
+          public int compare(VertexInfo o1, VertexInfo o2) {
+            return (int) (o1.getFirstTaskStartTimeInterval() - o2.getFirstTaskStartTimeInterval());
+          }
+        });
+    vertexSet.addAll(vertices);
+    return  vertexSet;
+  }
+
+  private Collection<TaskInfo> getSortedTasks(VertexInfo vertexInfo) {
+    Collection<TaskInfo> tasks = vertexInfo.getTasks();
+    // Add corresponding task details
+    TreeSet<TaskInfo> taskSet = new TreeSet<TaskInfo>(new Comparator<TaskInfo>()
{
+      @Override
+      public int compare(TaskInfo o1, TaskInfo o2) {
+        return (int) (o1.getSuccessfulTaskAttempt().getStartTimeInterval()
+            - o2.getSuccessfulTaskAttempt().getStartTimeInterval());
+      }
+    });
+    taskSet.addAll(tasks);
+    return taskSet;
+  }
+
+  /**
+   * Draw the vertices
+   *
+   */
+  public void drawVertex() {
+    Collection<VertexInfo> vertices = getSortedVertices();
+    for (VertexInfo vertex : vertices) {
+      //Set vertex start time as the one when its first task attempt started executing
+      x1 = (int) vertex.getStartTimeInterval();
+      y1 += VERTEX_GAP;
+      int duration = ((int) (vertex.getTimeTaken()));
+      Line line = createLine(x1, y1, x1 + duration, y1);
+      line.setStyle("stroke: red; stroke-width:" + STROKE_WIDTH);
+      line.setOpacity("0.3");
+
+      Title vertexTitle = createTitleForVertex(vertex);
+      line.getSVGDescriptionClass().add(
+          new JAXBElement<Title>(titleName, Title.class, vertexTitle));
+      svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass().add(line);
+      // For each vertex, draw the tasks
+      drawTask(vertex);
+    }
+    x1 = x1 + (int) dagInfo.getFinishTimeInterval();
+    y1 = y1 + DAG_GAP;
+    y2 = y1;
+  }
+
+  /**
+   * Draw tasks
+   *
+   * @param vertex
+   */
+  public void drawTask(VertexInfo vertex) {
+    Collection<TaskInfo> tasks = getSortedTasks(vertex);
+    for (TaskInfo task : tasks) {
+      for (TaskAttemptInfo taskAttemptInfo : task.getTaskAttempts()) {
+        x1 = (int) taskAttemptInfo.getStartTimeInterval();
+        y1 += TASK_GAP;
+        int duration = (int) taskAttemptInfo.getTimeTaken();
+        Line line = createLine(x1, y1, x1 + duration, y1);
+        String color =
+            taskAttemptInfo.getStatus().equalsIgnoreCase(TaskAttemptState.SUCCEEDED.name())
+                ? "green" : "red";
+        line.setStyle("stroke: " + color + "; stroke-width:" + STROKE_WIDTH);
+        Title title = createTitleForTaskAttempt(taskAttemptInfo);
+        line.getSVGDescriptionClass().add(
+            new JAXBElement<Title>(titleName, Title.class, title));
+        svg.getSVGDescriptionClassOrSVGAnimationClassOrSVGStructureClass()
+            .add(line);
+      }
+    }
+  }
+
+  /**
+   * Convert DAG to graph
+   *
+   * @throws java.io.IOException
+   * @throws javax.xml.bind.JAXBException
+   */
+  public void saveAsSVG(String fileName) throws IOException, JAXBException {
+    drawDAG(dagInfo);
+    svg.setHeight("" + y2);
+    svg.setWidth("" + (MAX_X - MIN_X));
+    String tempFileName = System.nanoTime() + ".svg";
+    File file = new File(tempFileName);
+    JAXBContext jaxbContext = JAXBContext.newInstance(Svg.class);
+    Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
+    jaxbMarshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+    jaxbMarshaller.marshal(svg, file);
+    //TODO: dirty workaround to get rid of XMLRootException issue
+    BufferedReader reader = new BufferedReader(
+        new InputStreamReader(new FileInputStream(file), UTF8));
+    BufferedWriter writer = new BufferedWriter(
+        new OutputStreamWriter(new FileOutputStream(fileName), UTF8));
+    try {
+      while (reader.ready()) {
+        String line = reader.readLine();
+        if (line != null) {
+          line = line.replaceAll(
+              " xmlns:ns3=\"http://www.w3.org/2000/svg\" xmlns=\"\"", "");
+          writer.write(line);
+          writer.newLine();
+        }
+      }
+    } finally {
+      IOUtils.closeQuietly(reader);
+      IOUtils.closeQuietly(writer);
+      if (file.exists()) {
+        boolean deleted = file.delete();
+        LOG.debug("Deleted {}" + file.getAbsolutePath());
+      }
+    }
+  }
+
+  private float scaleDown(int len) {
+    return (len * 1.0f / MAX_DAG_RUNTIME) * SCREEN_WIDTH;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/ecd90dc1/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java
new file mode 100644
index 0000000..8bcf265
--- /dev/null
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.analyzer.utils;
+
+import com.sun.istack.Nullable;
+import org.apache.tez.dag.utils.Graph;
+import org.apache.tez.history.parser.datamodel.AdditionalInputOutputDetails;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.EdgeInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class Utils {
+
+  private static Pattern sanitizeLabelPattern = Pattern.compile("[:\\-\\W]+");
+
+  public static String getShortClassName(String className) {
+    int pos = className.lastIndexOf(".");
+    if (pos != -1 && pos < className.length() - 1) {
+      return className.substring(pos + 1);
+    }
+    return className;
+  }
+
+  public static String sanitizeLabelForViz(String label) {
+    Matcher m = sanitizeLabelPattern.matcher(label);
+    return m.replaceAll("_");
+  }
+
+  public static void generateDAGVizFile(DagInfo dagInfo, String fileName,
+      @Nullable List<String> criticalVertices) throws IOException {
+    Graph graph = new Graph(sanitizeLabelForViz(dagInfo.getName()));
+
+    for (VertexInfo v : dagInfo.getVertices()) {
+      String nodeLabel = sanitizeLabelForViz(v.getVertexName())
+          + "[" + getShortClassName(v.getProcessorClassName()
+          + ", tasks=" + v.getTasks().size() + ", time=" + v.getTimeTaken() +" ms]");
+      Graph.Node n = graph.newNode(sanitizeLabelForViz(v.getVertexName()), nodeLabel);
+
+      boolean criticalVertex = (criticalVertices != null) ? criticalVertices.contains(v
+          .getVertexName()) : false;
+      if (criticalVertex) {
+        n.setColor("red");
+      }
+
+
+      for (AdditionalInputOutputDetails input : v.getAdditionalInputInfoList()) {
+        Graph.Node inputNode = graph.getNode(sanitizeLabelForViz(v.getVertexName())
+            + "_" + sanitizeLabelForViz(input.getName()));
+        inputNode.setLabel(sanitizeLabelForViz(v.getVertexName())
+            + "[" + sanitizeLabelForViz(input.getName()) + "]");
+        inputNode.setShape("box");
+        inputNode.addEdge(n, "Input name=" + input.getName()
+            + " [inputClass=" + getShortClassName(input.getClazz())
+            + ", initializer=" + getShortClassName(input.getInitializer()) + "]");
+      }
+      for (AdditionalInputOutputDetails output : v.getAdditionalOutputInfoList()) {
+        Graph.Node outputNode = graph.getNode(sanitizeLabelForViz(v.getVertexName())
+            + "_" + sanitizeLabelForViz(output.getName()));
+        outputNode.setLabel(sanitizeLabelForViz(v.getVertexName())
+            + "[" + sanitizeLabelForViz(output.getName()) + "]");
+        outputNode.setShape("box");
+        n.addEdge(outputNode, "Output name=" + output.getName()
+            + " [outputClass=" + getShortClassName(output.getClazz())
+            + ", committer=" + getShortClassName(output.getInitializer()) + "]");
+      }
+
+    }
+
+    for (EdgeInfo e : dagInfo.getEdges()) {
+      Graph.Node n = graph.getNode(sanitizeLabelForViz(e.getInputVertexName()));
+      n.addEdge(graph.getNode(sanitizeLabelForViz(e.getOutputVertexName())),
+          "[input=" + getShortClassName(e.getEdgeSourceClass())
+              + ", output=" + getShortClassName(e.getEdgeDestinationClass())
+              + ", dataMovement=" + e.getDataMovementType().trim() + "]");
+    }
+
+    graph.save(fileName);
+  }
+}


Mime
View raw message