tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prak...@apache.org
Subject [1/2] tez git commit: TEZ-2813. Tez UI: add counter data for rest api calls to AM Web Services v2 (sree via pramachandran)
Date Mon, 14 Sep 2015 17:31:14 GMT
Repository: tez
Updated Branches:
  refs/heads/master b6f15dcdc -> b3ad59578


TEZ-2813. Tez UI: add counter data for rest api calls to AM Web Services v2 (sree via pramachandran)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1e58a0e4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1e58a0e4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1e58a0e4

Branch: refs/heads/master
Commit: 1e58a0e418bfd93be1b789f288299a02a6b4c0dd
Parents: b6f15dc
Author: Prakash Ramachandran <pramachandran@hortonworks.com>
Authored: Mon Sep 14 22:57:21 2015 +0530
Committer: Prakash Ramachandran <pramachandran@hortonworks.com>
Committed: Mon Sep 14 22:57:21 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   7 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  32 ++-
 .../apache/tez/dag/app/web/AMWebController.java | 228 +++++++++++++++++--
 .../apache/tez/dag/app/web/WebUIService.java    |   4 +
 .../tez/dag/app/web/TestAMWebController.java    | 191 +++++++++++++++-
 6 files changed, 439 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1e58a0e4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 60010d0..bb023b1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -176,6 +176,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2813. Tez UI: add counter data for rest api calls to AM Web Services v2
   TEZ-2660. Tez UI: need to show application page even if system metrics publish is disabled.
   TEZ-2787. Tez AM should have java.io.tmpdir=./tmp to be consistent with tasks
   TEZ-2780. Tez UI: Update All Tasks page while in progress

http://git-wip-us.apache.org/repos/asf/tez/blob/1e58a0e4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 552da11..96301a5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -80,6 +80,13 @@ public interface Vertex extends Comparable<Vertex> {
    */
   TezCounters getAllCounters();
 
+  /**
+   * Get all the counters of this vertex.
+   * @return aggregate task-counters
+   */
+  TezCounters getCachedCounters();
+
+
   Map<TezTaskID, Task> getTasks();
   Task getTask(TezTaskID taskID);
   Task getTask(int taskIndex);

http://git-wip-us.apache.org/repos/asf/tez/blob/1e58a0e4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 3dae42b..2e13090 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -228,6 +228,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
   volatile LinkedHashMap<TezTaskID, Task> tasks = new LinkedHashMap<TezTaskID, Task>();
   private Object fullCountersLock = new Object();
   private TezCounters fullCounters = null;
+  private TezCounters cachedCounters = null;
+  private long cachedCountersTimestamp = 0;
   private Resource taskResource;
 
   private Configuration vertexConf;
@@ -1190,6 +1192,34 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
     }
   }
 
+  @Override
+  public TezCounters getCachedCounters() {
+    readLock.lock();
+
+    try {
+      // FIXME a better lightweight approach for counters is needed
+      if (fullCounters == null && cachedCounters != null
+          && ((cachedCountersTimestamp+10000) > System.currentTimeMillis())) {
+        LOG.info("Asked for counters"
+            + ", cachedCountersTimestamp=" + cachedCountersTimestamp
+            + ", currentTime=" + System.currentTimeMillis());
+        return cachedCounters;
+      }
+
+      cachedCountersTimestamp = System.currentTimeMillis();
+      if (inTerminalState()) {
+        this.mayBeConstructFinalFullCounters();
+        return fullCounters;
+      }
+
+      TezCounters counters = new TezCounters();
+      cachedCounters = incrTaskCounters(counters, tasks.values());
+      return cachedCounters;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   public VertexStats getVertexStats() {
 
     readLock.lock();
@@ -2843,7 +2873,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
   
   private static List<TaskAttemptIdentifier> getTaskAttemptIdentifiers(DAG dag, 
       List<TezTaskAttemptID> taIds) {
-    List<TaskAttemptIdentifier> attempts = new ArrayList<>(taIds.size());
+    List<TaskAttemptIdentifier> attempts = new ArrayList<TaskAttemptIdentifier>(taIds.size());
     String dagName = dag.getName();
     for (TezTaskAttemptID taId : taIds) {
       String vertexName = dag.getVertex(taId.getTaskID().getVertexID()).getName();

http://git-wip-us.apache.org/repos/asf/tez/blob/1e58a0e4/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
index 06f282c..cede341 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
@@ -25,18 +25,29 @@ import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.StringTokenizer;
+import java.util.TreeMap;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
+
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.client.ProgressBuilder;
 import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -343,7 +354,7 @@ public class AMWebController extends Controller {
   Collection<Integer> getVertexIDsFromRequest() {
     final String valueStr = $(WebUIService.VERTEX_ID).trim();
 
-    List<Integer> vertexIDs = new ArrayList<>();
+    List<Integer> vertexIDs = new ArrayList<Integer>();
     if (!valueStr.equals("")) {
       String[] vertexIdsStr = valueStr.split(",", MAX_QUERIED);
 
@@ -362,8 +373,46 @@ public class AMWebController extends Controller {
     return vertexIDs;
   }
 
+  /**
+   * Parse a params list in the format: CtrGroup/CtrName1,CtrName2;CtrGroup2;
+   * @return nested structure of counter groups and names. Null if nothing specified.
+   */
+  Map<String, Set<String>> getCounterListFromRequest() {
+    final String counterStr = $(WebUIService.COUNTERS).trim();
+    if (counterStr == null || counterStr.isEmpty()) {
+      return null;
+    }
+
+    String delimiter = ";";
+    String groupDelimiter = "/";
+    String counterDelimiter = ",";
+
+    StringTokenizer tokenizer = new StringTokenizer(counterStr, delimiter);
+
+    Map<String, Set<String>> counterList = new TreeMap<String, Set<String>>();
+    while (tokenizer.hasMoreElements()) {
+      String token = tokenizer.nextToken().trim();
+      int pos = token.indexOf(groupDelimiter);
+      if (pos == -1) {
+        counterList.put(token, Collections.<String>emptySet());
+        continue;
+      }
+      String counterGroup = token.substring(0, pos);
+      Set<String> counters = Collections.<String>emptySet();
+      if (pos < token.length() - 1) {
+        String counterNames = token.substring(pos+1, token.length());
+        counters = Sets.newHashSet(
+            Splitter.on(counterDelimiter).omitEmptyStrings()
+                .trimResults().split(counterNames));
+      }
+      counterList.put(counterGroup, counters);
+    }
+    return counterList;
+  }
+
+
   List<String> splitString(String str, String delimiter, Integer limit) {
-    List<String> items = new ArrayList<>();
+    List<String> items = new ArrayList<String>();
 
     StringTokenizer tokenizer = new StringTokenizer(str, delimiter);
     for(int count = 0; tokenizer.hasMoreElements() && count < limit; count ++)
{
@@ -386,7 +435,7 @@ public class AMWebController extends Controller {
   List<Integer> getIntegersFromRequest(String paramName, Integer limit) {
     String valuesStr = $(paramName).trim();
 
-    List<Integer> values = new ArrayList<>();
+    List<Integer> values = new ArrayList<Integer>();
     if (!valuesStr.equals("")) {
       try {
         for (String valueStr : splitString(valuesStr, ",", limit)) {
@@ -404,7 +453,7 @@ public class AMWebController extends Controller {
   }
 
   /**
-   * getIDsFromRequest
+   * getTaskIDsFromRequest
    * Takes in "1_0,1_3" and returns [[1,0],[1,3]]
    * Mainly to parse a query parameter with comma separated indexes. For vertex its the index,
    * for task its vertexIndex_taskIndex and for attempts its vertexIndex_taskIndex_attemptNo
@@ -415,16 +464,16 @@ public class AMWebController extends Controller {
    *
    * @return {List<List<Integer>>} List of parsed values
    */
-  List<List<Integer>> getIDsFromRequest(String paramName, Integer limit) {
+  List<List<Integer>> getIDsFromRequest(String paramName, Integer limit, Integer
count) {
     String valuesStr = $(paramName).trim();
 
-    List<List<Integer>> values = new ArrayList<>();
+    List<List<Integer>> values = new ArrayList<List<Integer>>();
     if (!valuesStr.equals("")) {
       try {
         for (String valueStr : splitString(valuesStr, ",", limit)) {
-          List<Integer> innerValues = new ArrayList<>();
+          List<Integer> innerValues = new ArrayList<Integer>();
           String innerValueStrs[] = valueStr.split("_");
-          if(innerValueStrs.length == 2) {
+          if(innerValueStrs.length == count) {
             for (String innerValueStr : innerValueStrs) {
               int value = Integer.parseInt(innerValueStr);
               innerValues.add(value);
@@ -452,7 +501,7 @@ public class AMWebController extends Controller {
       return;
     }
 
-    Map<String, String> dagInfo = new HashMap<>();
+    Map<String, String> dagInfo = new HashMap<String, String>();
     dagInfo.put("id", dag.getID().toString());
     dagInfo.put("progress", Float.toString(dag.getProgress()));
     dagInfo.put("status", dag.getState().toString());
@@ -462,8 +511,33 @@ public class AMWebController extends Controller {
     ));
   }
 
-  private Map<String,String> getVertexInfoMap(Vertex vertex) {
-    Map<String, String> vertexInfo = new HashMap<>();
+  Map<String, Map<String, Long>> constructCounterMapInfo(TezCounters counters,
+      Map<String, Set<String>> counterNames) {
+    if (counterNames == null || counterNames.isEmpty()) {
+      return null;
+    }
+    LOG.info("Requested counter names=" + counterNames.entrySet());
+    LOG.info("actual counters=" + counters);
+
+    Map<String, Map<String, Long>> counterInfo = new TreeMap<String, Map<String,
Long>>();
+
+    for (Entry<String, Set<String>> entry : counterNames.entrySet()) {
+      Map<String, Long> matchedCounters = new HashMap<String, Long>();
+      CounterGroup grpCounters = counters.getGroup(entry.getKey());
+      for (TezCounter counter : grpCounters) {
+        if (entry.getValue().isEmpty() || entry.getValue().contains(counter.getName())) {
+          matchedCounters.put(counter.getName(), counter.getValue());
+        }
+      }
+      counterInfo.put(entry.getKey(), matchedCounters);
+    }
+
+    return counterInfo;
+  }
+
+  private Map<String, Object> getVertexInfoMap(Vertex vertex,
+                                               Map<String, Set<String>> counterNames)
{
+    Map<String, Object> vertexInfo = new HashMap<String, Object>();
     vertexInfo.put("id", vertex.getVertexId().toString());
     vertexInfo.put("status", vertex.getState().toString());
     vertexInfo.put("progress", Float.toString(vertex.getProgress()));
@@ -473,8 +547,18 @@ public class AMWebController extends Controller {
     vertexInfo.put("runningTasks", Integer.toString(vertexProgress.getRunningTaskCount()));
     vertexInfo.put("succeededTasks", Integer.toString(vertexProgress.getSucceededTaskCount()));
 
-    vertexInfo.put("failedTaskAttempts", Integer.toString(vertexProgress.getFailedTaskAttemptCount()));
-    vertexInfo.put("killedTaskAttempts", Integer.toString(vertexProgress.getKilledTaskAttemptCount()));
+    vertexInfo.put("failedTaskAttempts",
+        Integer.toString(vertexProgress.getFailedTaskAttemptCount()));
+    vertexInfo.put("killedTaskAttempts",
+        Integer.toString(vertexProgress.getKilledTaskAttemptCount()));
+
+    if (counterNames != null && !counterNames.isEmpty()) {
+      TezCounters counters = vertex.getCachedCounters();
+      Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters,
counterNames);
+      if (counterMap != null && !counterMap.isEmpty()) {
+        vertexInfo.put("counters", counterMap);
+      }
+    }
 
     return vertexInfo;
   }
@@ -495,6 +579,8 @@ public class AMWebController extends Controller {
       return;
     }
 
+    Map<String, Set<String>> counterNames = getCounterListFromRequest();
+
     Collection<Vertex> vertexList;
     if (requestedIDs.isEmpty()) {
       // no ids specified return all.
@@ -503,9 +589,9 @@ public class AMWebController extends Controller {
       vertexList = getVerticesByIdx(dag, requestedIDs);
     }
 
-    ArrayList<Map<String, String>> verticesInfo = new ArrayList<>();
+    ArrayList<Map<String, Object>> verticesInfo = new ArrayList<Map<String,
Object>>();
     for(Vertex v : vertexList) {
-      verticesInfo.add(getVertexInfoMap(v));
+      verticesInfo.add(getVertexInfoMap(v, counterNames));
     }
 
     renderJSON(ImmutableMap.of(
@@ -528,9 +614,9 @@ public class AMWebController extends Controller {
    * @param limit {Integer}
    */
   List<Task> getRequestedTasks(DAG dag, Integer limit) {
-    List<Task> tasks = new ArrayList<>();
+    List<Task> tasks = new ArrayList<Task>();
 
-    List<List<Integer>> taskIDs = getIDsFromRequest(WebUIService.TASK_ID, limit);
+    List<List<Integer>> taskIDs = getIDsFromRequest(WebUIService.TASK_ID, limit,
2);
     if(taskIDs == null) {
       return null;
     }
@@ -564,7 +650,7 @@ public class AMWebController extends Controller {
           if(vertex == null) {
             continue;
           }
-          List<Task> vertexTasks = new ArrayList<>(vertex.getTasks().values());
+          List<Task> vertexTasks = new ArrayList<Task>(vertex.getTasks().values());
           tasks.addAll(vertexTasks.subList(0, Math.min(vertexTasks.size(), limit - tasks.size())));
 
           if(tasks.size() >= limit) {
@@ -575,7 +661,7 @@ public class AMWebController extends Controller {
       else {
         Collection<Vertex> vertices = dag.getVertices().values();
         for (Vertex vertex : vertices) {
-          List<Task> vertexTasks = new ArrayList<>(vertex.getTasks().values());
+          List<Task> vertexTasks = new ArrayList<Task>(vertex.getTasks().values());
           tasks.addAll(vertexTasks.subList(0, Math.min(vertexTasks.size(), limit - tasks.size())));
 
           if(tasks.size() >= limit) {
@@ -614,12 +700,20 @@ public class AMWebController extends Controller {
       return;
     }
 
-    ArrayList<Map<String, String>> tasksInfo = new ArrayList<>();
+    Map<String, Set<String>> counterNames = getCounterListFromRequest();
+
+    ArrayList<Map<String, Object>> tasksInfo = new ArrayList<Map<String,
Object>>();
     for(Task t : tasks) {
-      Map<String, String> taskInfo = new HashMap<>();
+      Map<String, Object> taskInfo = new HashMap<String, Object>();
       taskInfo.put("id", t.getTaskId().toString());
       taskInfo.put("progress", Float.toString(t.getProgress()));
       taskInfo.put("status", t.getState().toString());
+
+      TezCounters counters = t.getCounters();
+      Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters,
counterNames);
+      if (counterMap != null && !counterMap.isEmpty()) {
+        taskInfo.put("counters", counterMap);
+      }
       tasksInfo.add(taskInfo);
     }
 
@@ -628,6 +722,96 @@ public class AMWebController extends Controller {
     ));
   }
 
+  /**
+   * getRequestedAttempts
+   * Given a dag and a limit, based on the incoming query parameters. Used by getAttemptsInfo
+   * returns a list of task instances
+   *
+   * @param dag {DAG}
+   * @param limit {Integer}
+   */
+  List<TaskAttempt> getRequestedAttempts(DAG dag, Integer limit) {
+    List<TaskAttempt> attempts = new ArrayList<TaskAttempt>();
+
+    List<List<Integer>> attemptIDs = getIDsFromRequest(WebUIService.ATTEMPT_ID,
limit, 3);
+    if(attemptIDs == null) {
+      return null;
+    }
+    else if(!attemptIDs.isEmpty()) {
+      for (List<Integer> indexes : attemptIDs) {
+        Vertex vertex = getVertexFromIndex(dag, indexes.get(0));
+        if(vertex == null) {
+          continue;
+        }
+        Task task = vertex.getTask(indexes.get(1));
+        if(task == null) {
+          continue;
+        }
+
+        TaskAttempt attempt = task.
+            getAttempt(TezTaskAttemptID.getInstance(task.getTaskId(), indexes.get(2)));
+        if(attempt == null) {
+          continue;
+        }
+        else {
+          attempts.add(attempt);
+        }
+
+        if(attempts.size() >= limit) {
+          break;
+        }
+      }
+    }
+
+    return attempts;
+  }
+
+  /**
+   * Renders the response JSON for attemptsInfo API
+   * The JSON will have an array of attempt objects under the key attempts.
+   */
+  public void getAttemptsInfo() {
+    if (!setupResponse()) {
+      return;
+    }
+
+    DAG dag = checkAndGetDAGFromRequest();
+    if (dag == null) {
+      return;
+    }
+
+    int limit = MAX_QUERIED;
+    try {
+      limit = getQueryParamInt(WebUIService.LIMIT);
+    } catch (NumberFormatException e) {
+      //Ignore
+    }
+
+    List<TaskAttempt> attempts = getRequestedAttempts(dag, limit);
+    if(attempts == null) {
+      return;
+    }
+
+    Map<String, Set<String>> counterNames = getCounterListFromRequest();
+
+    ArrayList<Map<String, Object>> attemptsInfo = new ArrayList<Map<String,
Object>>();
+    for(TaskAttempt a : attempts) {
+      Map<String, Object> attemptInfo = new HashMap<String, Object>();
+      attemptInfo.put("id", a.getID().toString());
+      attemptInfo.put("progress", Float.toString(a.getProgress()));
+      attemptInfo.put("status", a.getState().toString());
+
+      TezCounters counters = a.getCounters();
+      Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters,
counterNames);
+      if (counterMap != null && !counterMap.isEmpty()) {
+        attemptInfo.put("counters", counterMap);
+      }
+      attemptsInfo.add(attemptInfo);
+    }
+
+    renderJSON(ImmutableMap.of("attempts", attemptsInfo));
+  }
+
   @Override
   @VisibleForTesting
   public void renderJSON(Object object) {

http://git-wip-us.apache.org/repos/asf/tez/blob/1e58a0e4/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
index 32b57e9..a894d25 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
@@ -41,6 +41,8 @@ public class WebUIService extends AbstractService {
   public static final String VERTEX_ID = "vertexID";
   public static final String DAG_ID = "dagID";
   public static final String TASK_ID = "taskID";
+  public static final String ATTEMPT_ID = "attemptID";
+  public static final String COUNTERS = "counters";
 
   public static final String LIMIT = "limit";
 
@@ -202,6 +204,8 @@ public class WebUIService extends AbstractService {
       route(WS_PREFIX_V2 + pajoin("verticesInfo", VERTEX_ID, DAG_ID), AMWebController.class,
"getVerticesInfo");
       route(WS_PREFIX_V2 + pajoin("tasksInfo", TASK_ID, VERTEX_ID, DAG_ID), AMWebController.class,
           "getTasksInfo");
+      route(WS_PREFIX_V2 + pajoin("attemptsInfo", ATTEMPT_ID, DAG_ID), AMWebController.class,
+          "getAttemptsInfo");
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1e58a0e4/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
index cba3c3e..56a4a82 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
@@ -38,6 +38,8 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -45,17 +47,21 @@ import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.webapp.Controller;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.common.security.ACLManager;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.client.ProgressBuilder;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.junit.Assert;
@@ -321,6 +327,9 @@ public class TestAMWebController {
     doReturn(true).when(spy).setupResponse();
     doReturn(mockDAG).when(spy).checkAndGetDAGFromRequest();
 
+    Map<String, Set<String>> counterList = new TreeMap<String, Set<String>>();
+    doReturn(counterList).when(spy).getCounterListFromRequest();
+
     List<Integer> requested;
     if (numVerticesRequested == 0) {
       requested = ImmutableList.of();
@@ -352,6 +361,21 @@ public class TestAMWebController {
     doReturn(progress).when(mockVertex).getProgress();
     doReturn(pb).when(mockVertex).getVertexProgress();
 
+    TezCounters counters = new TezCounters();
+    counters.addGroup("g1", "g1");
+    counters.addGroup("g2", "g2");
+    counters.addGroup("g3", "g3");
+    counters.addGroup("g4", "g4");
+    counters.findCounter("g1", "g1_c1").setValue(100);
+    counters.findCounter("g1", "g1_c2").setValue(100);
+    counters.findCounter("g2", "g2_c3").setValue(100);
+    counters.findCounter("g2", "g2_c4").setValue(100);
+    counters.findCounter("g3", "g3_c5").setValue(100);
+    counters.findCounter("g3", "g3_c6").setValue(100);
+
+    doReturn(counters).when(mockVertex).getAllCounters();
+    doReturn(counters).when(mockVertex).getCachedCounters();
+
     return mockVertex;
   }
 
@@ -557,11 +581,14 @@ public class TestAMWebController {
     // Set mock query params
     doReturn(limit).when(spy).getQueryParamInt(WebUIService.LIMIT);
     doReturn(vertexMinIds).when(spy).getIntegersFromRequest(WebUIService.VERTEX_ID, limit);
-    doReturn(taskMinIds).when(spy).getIDsFromRequest(WebUIService.TASK_ID, limit);
+    doReturn(taskMinIds).when(spy).getIDsFromRequest(WebUIService.TASK_ID, limit, 2);
 
     // Set function mocks
     doReturn(mockDAG).when(spy).checkAndGetDAGFromRequest();
 
+    Map<String, Set<String>> counterList = new TreeMap<String, Set<String>>();
+    doReturn(counterList).when(spy).getCounterListFromRequest();
+
     spy.getTasksInfo();
     verify(spy).renderJSON(returnResultCaptor.capture());
 
@@ -589,6 +616,20 @@ public class TestAMWebController {
     doReturn(status).when(mockTask).getState();
     doReturn(progress).when(mockTask).getProgress();
 
+    TezCounters counters = new TezCounters();
+    counters.addGroup("g1", "g1");
+    counters.addGroup("g2", "g2");
+    counters.addGroup("g3", "g3");
+    counters.addGroup("g4", "g4");
+    counters.findCounter("g1", "g1_c1").setValue(101);
+    counters.findCounter("g1", "g1_c2").setValue(102);
+    counters.findCounter("g2", "g2_c3").setValue(103);
+    counters.findCounter("g2", "g2_c4").setValue(104);
+    counters.findCounter("g3", "g3_c5").setValue(105);
+    counters.findCounter("g3", "g3_c6").setValue(106);
+
+    doReturn(counters).when(mockTask).getCounters();
+
     return mockTask;
   }
 
@@ -598,4 +639,152 @@ public class TestAMWebController {
     Assert.assertEquals(mockTask.getState().toString(), taskResult.get("status"));
     Assert.assertEquals(Float.toString(mockTask.getProgress()), taskResult.get("progress"));
   }
+
+  //-- Get Attempts Info Tests -----------------------------------------------------------------------
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testGetAttemptsInfoWithIds() {
+    List <TaskAttempt> attempts = createMockAttempts();
+    List <Integer> vertexMinIds = Arrays.asList();
+    List <Integer> taskMinIds = Arrays.asList();
+    List <List <Integer>> attemptMinIds = Arrays.asList(Arrays.asList(0, 0, 0),
+        Arrays.asList(0, 0, 1),
+        Arrays.asList(0, 0, 2),
+        Arrays.asList(0, 0, 3));
+
+    // Fetch All
+    Map<String, Object> result = getAttemptsTestHelper(attempts, attemptMinIds, vertexMinIds,
+        taskMinIds, AMWebController.MAX_QUERIED);
+
+    Assert.assertEquals(1, result.size());
+    Assert.assertTrue(result.containsKey("attempts"));
+
+    ArrayList<Map<String, String>> attemptsInfo = (ArrayList<Map<String,
String>>) result.
+        get("attempts");
+    Assert.assertEquals(4, attemptsInfo.size());
+
+    verifySingleAttemptResult(attempts.get(0), attemptsInfo.get(0));
+    verifySingleAttemptResult(attempts.get(1), attemptsInfo.get(1));
+    verifySingleAttemptResult(attempts.get(2), attemptsInfo.get(2));
+    verifySingleAttemptResult(attempts.get(3), attemptsInfo.get(3));
+
+    // With limit
+    result = getAttemptsTestHelper(attempts, attemptMinIds, vertexMinIds, taskMinIds, 2);
+
+    Assert.assertEquals(1, result.size());
+    Assert.assertTrue(result.containsKey("attempts"));
+
+    attemptsInfo = (ArrayList<Map<String, String>>) result.get("attempts");
+    Assert.assertEquals(2, attemptsInfo.size());
+
+    verifySingleAttemptResult(attempts.get(0), attemptsInfo.get(0));
+    verifySingleAttemptResult(attempts.get(1), attemptsInfo.get(1));
+  }
+
+  Map<String, Object> getAttemptsTestHelper(List<TaskAttempt> attempts, List
<List <Integer>> attemptMinIds,
+                                         List<Integer> vertexMinIds, List<Integer>
taskMinIds, Integer limit) {
+    //Creating mock DAG
+    DAG mockDAG = mock(DAG.class);
+    doReturn(TezDAGID.fromString("dag_1441301219877_0109_1")).when(mockDAG).getID();
+
+    //Creating mock vertex and attaching to mock DAG
+    TezVertexID vertexID = TezVertexID.fromString("vertex_1441301219877_0109_1_00");
+    Vertex mockVertex = mock(Vertex.class);
+    doReturn(vertexID).when(mockVertex).getVertexId();
+
+    doReturn(mockVertex).when(mockDAG).getVertex(vertexID);
+    doReturn(ImmutableMap.of(
+        vertexID, mockVertex
+    )).when(mockDAG).getVertices();
+
+    //Creating mock task and attaching to mock Vertex
+    TezTaskID taskID = TezTaskID.fromString("task_1441301219877_0109_1_00_000000");
+    Task mockTask = mock(Task.class);
+    doReturn(taskID).when(mockTask).getTaskId();
+    int taskIndex = taskID.getId();
+    doReturn(mockTask).when(mockVertex).getTask(taskIndex);
+    doReturn(ImmutableMap.of(
+        taskID, mockTask
+    )).when(mockVertex).getTasks();
+
+    //Creating mock tasks and attaching to mock vertex
+    Map<TezTaskAttemptID, TaskAttempt> attemptsMap = Maps.newHashMap();
+    for(TaskAttempt attempt : attempts) {
+      TezTaskAttemptID attemptId = attempt.getID();
+      doReturn(attempt).when(mockTask).getAttempt(attemptId);
+      attemptsMap.put(attemptId, attempt);
+    }
+    doReturn(attemptsMap).when(mockTask).getAttempts();
+
+    //Creates & setup controller spy
+    AMWebController amWebController = new AMWebController(mockRequestContext, mockAppContext,
+        "TEST_HISTORY_URL");
+    AMWebController spy = spy(amWebController);
+    doReturn(true).when(spy).setupResponse();
+    doNothing().when(spy).renderJSON(any());
+
+    // Set mock query params
+    doReturn(limit).when(spy).getQueryParamInt(WebUIService.LIMIT);
+    doReturn(vertexMinIds).when(spy).getIntegersFromRequest(WebUIService.VERTEX_ID, limit);
+    doReturn(taskMinIds).when(spy).getIDsFromRequest(WebUIService.TASK_ID, limit, 2);
+    doReturn(attemptMinIds).when(spy).getIDsFromRequest(WebUIService.ATTEMPT_ID, limit, 3);
+
+    // Set function mocks
+    doReturn(mockDAG).when(spy).checkAndGetDAGFromRequest();
+
+    Map<String, Set<String>> counterList = new TreeMap<String, Set<String>>();
+    doReturn(counterList).when(spy).getCounterListFromRequest();
+
+    spy.getAttemptsInfo();
+    verify(spy).renderJSON(returnResultCaptor.capture());
+
+    return returnResultCaptor.getValue();
+  }
+
+  private List<TaskAttempt> createMockAttempts() {
+    TaskAttempt mockAttempt1 = createMockAttempt("attempt_1441301219877_0109_1_00_000000_0",
TaskAttemptState.RUNNING,
+        0.33f);
+    TaskAttempt mockAttempt2 = createMockAttempt("attempt_1441301219877_0109_1_00_000000_1",
TaskAttemptState.SUCCEEDED,
+        1.0f);
+    TaskAttempt mockAttempt3 = createMockAttempt("attempt_1441301219877_0109_1_00_000000_2",
TaskAttemptState.FAILED,
+        .8f);
+    TaskAttempt mockAttempt4 = createMockAttempt("attempt_1441301219877_0109_1_00_000000_3",
TaskAttemptState.SUCCEEDED,
+        .8f);
+
+    List <TaskAttempt> attempts = Arrays.asList(mockAttempt1, mockAttempt2, mockAttempt3,
mockAttempt4);
+    return attempts;
+  }
+
+  private TaskAttempt createMockAttempt(String attemptIDStr, TaskAttemptState status, float
progress) {
+    TaskAttempt mockAttempt = mock(TaskAttempt.class);
+
+    doReturn(TezTaskAttemptID.fromString(attemptIDStr)).when(mockAttempt).getID();
+    doReturn(status).when(mockAttempt).getState();
+    doReturn(progress).when(mockAttempt).getProgress();
+
+    TezCounters counters = new TezCounters();
+    counters.addGroup("g1", "g1");
+    counters.addGroup("g2", "g2");
+    counters.addGroup("g3", "g3");
+    counters.addGroup("g4", "g4");
+    counters.findCounter("g1", "g1_c1").setValue(101);
+    counters.findCounter("g1", "g1_c2").setValue(102);
+    counters.findCounter("g2", "g2_c3").setValue(103);
+    counters.findCounter("g2", "g2_c4").setValue(104);
+    counters.findCounter("g3", "g3_c5").setValue(105);
+    counters.findCounter("g3", "g3_c6").setValue(106);
+
+    doReturn(counters).when(mockAttempt).getCounters();
+
+    return mockAttempt;
+  }
+
+  private void verifySingleAttemptResult(TaskAttempt mockTask, Map<String, String>
taskResult) {
+    Assert.assertEquals(3, taskResult.size());
+    Assert.assertEquals(mockTask.getID().toString(), taskResult.get("id"));
+    Assert.assertEquals(mockTask.getState().toString(), taskResult.get("status"));
+    Assert.assertEquals(Float.toString(mockTask.getProgress()), taskResult.get("progress"));
+  }
+
 }


Mime
View raw message