tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject tez git commit: TEZ-2968. Counter limits exception causes AM to crash. (hitesh)
Date Fri, 04 Dec 2015 20:04:50 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 0a1a29b15 -> ae3329f87


TEZ-2968. Counter limits exception causes AM to crash. (hitesh)

(cherry picked from commit 9109645e5b7b630601a016e62e39f20678c63dde)

Conflicts:
	CHANGES.txt
	tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
	tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
	tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
	tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ae3329f8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ae3329f8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ae3329f8

Branch: refs/heads/branch-0.7
Commit: ae3329f873a3293cae82610ec1aa8189152d614b
Parents: 0a1a29b
Author: Hitesh Shah <hitesh@apache.org>
Authored: Fri Dec 4 09:32:09 2015 -0800
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Fri Dec 4 11:52:55 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/dag/app/dag/VertexTerminationCause.java |  5 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 59 ++++++++++++++-----
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 14 +++++
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 30 +++++++++-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 60 +++++++++++++++++---
 .../apache/tez/dag/app/web/AMWebController.java | 58 +++++++++++++------
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 39 +++++++++++++
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 47 +++++++++++++++
 9 files changed, 269 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ae3329f8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 80df681..4cc48b1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES
+  TEZ-2968. Counter limits exception causes AM to crash.
   TEZ-2960. Tez UI: Move hardcoded url namespace to the configuration file
   TEZ-2947. Tez UI: Timeline, RM & AM requests gets into a consecutive loop in counters
page without any delay
   TEZ-2946. Tez UI: At times RM return a huge error message making the yellow error bar to
fill the whole screen

http://git-wip-us.apache.org/repos/asf/tez/blob/ae3329f8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
index 28712ad..816f85a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
@@ -58,7 +58,10 @@ public enum VertexTerminationCause {
   INTERNAL_ERROR(VertexState.ERROR),
   
   /** error when writing recovery log */ 
-  RECOVERY_ERROR(VertexState.FAILED);
+  RECOVERY_ERROR(VertexState.FAILED),
+
+  /** This vertex failed due to counter limits exceeded. */
+  COUNTER_LIMITS_EXCEEDED(VertexState.FAILED);
 
   private VertexState finishedState;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/ae3329f8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 0553a8d..e6841ad 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -42,6 +42,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.common.counters.LimitExceededException;
 import org.apache.tez.state.OnStateChangedCallback;
 import org.apache.tez.state.StateMachineTez;
 import org.slf4j.Logger;
@@ -196,7 +197,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
   private final Configuration dagConf;
   private final DAGPlan jobPlan;
-  
+
+  private final AtomicBoolean internalErrorTriggered = new AtomicBoolean(false);
+
   Map<String, LocalResource> localResources;
   
   long startDAGCpuTime = 0;
@@ -1167,11 +1170,22 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       try {
          getStateMachine().doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state", e);
-        addDiagnostic("Invalid event " + event.getType() +
-            " on Job " + this.dagId);
+        String message = "Invalid event " + event.getType() + " on Dag " + this.dagId
+            + " at currentState=" + oldState;
+        LOG.error("Can't handle " + message, e);
+        addDiagnostic(message);
         eventHandler.handle(new DAGEvent(this.dagId,
             DAGEventType.INTERNAL_ERROR));
+      } catch (RuntimeException e) {
+        String message = "Uncaught Exception when handling event " + event.getType()
+            + " on Dag " + this.dagId + " at currentState=" + oldState;
+        LOG.error(message, e);
+        addDiagnostic(message);
+        if (!internalErrorTriggered.getAndSet(true)) {
+          // to prevent a recursive loop
+          eventHandler.handle(new DAGEvent(this.dagId,
+              DAGEventType.INTERNAL_ERROR));
+        }
       }
       //notify the eventhandler of state change
       if (oldState != getInternalState()) {
@@ -1240,12 +1254,22 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         new DAGHistoryEvent(dagId, startEvt));
   }
 
-  void logJobHistoryUnsuccesfulEvent(DAGState state) throws IOException {
+  void logJobHistoryFinishedEvent(TezCounters counters) throws IOException {
     Map<String, Integer> taskStats = constructTaskStats(getDAGProgress());
-    DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
+    DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, clock.getTime(),
+        finishTime, DAGState.SUCCEEDED, "", counters,
+        this.userName, this.dagName, taskStats, this.appContext.getApplicationAttemptId());
+    this.appContext.getHistoryHandler().handleCriticalEvent(
+        new DAGHistoryEvent(dagId, finishEvt));
+  }
+
+  void logJobHistoryUnsuccesfulEvent(DAGState state, TezCounters counters) throws IOException
{
+    Map<String, Integer> taskStats = constructTaskStats(getDAGProgress());
+
+    DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, 0L,
         clock.getTime(), state,
         StringUtils.join(getDiagnostics(), LINE_SEPARATOR),
-        getAllCounters(), this.userName, this.dagName, taskStats,
+        counters, this.userName, this.dagName, taskStats,
         this.appContext.getApplicationAttemptId());
     this.appContext.getHistoryHandler().handleCriticalEvent(
         new DAGHistoryEvent(dagId, finishEvt));
@@ -1326,7 +1350,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       }
     } else {
       Preconditions.checkState(dag.getState() == DAGState.TERMINATING
-          || dag.getState() == DAGState.COMMITTING,
+              || dag.getState() == DAGState.COMMITTING,
           "DAG should be in COMMITTING/TERMINATING state, but in " + dag.getState());
       if (!dag.commitFutures.isEmpty() || dag.numCompletedVertices != dag.numVertices) {
         // pending commits are running or still some vertices are not completed
@@ -1366,12 +1390,19 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
     // update cpu time counters before finishing the dag
     updateCpuCounters();
-    
+    TezCounters counters = null;
+    try {
+      counters = getAllCounters();
+    } catch (LimitExceededException e) {
+      addDiagnostic("Counters limit exceeded: " + e.getMessage());
+      finalState = DAGState.FAILED;
+    }
+
     try {
       if (finalState == DAGState.SUCCEEDED) {
-        logJobHistoryFinishedEvent();
+        logJobHistoryFinishedEvent(counters);
       } else {
-        logJobHistoryUnsuccesfulEvent(finalState);
+        logJobHistoryUnsuccesfulEvent(finalState, counters);
       }
     } catch (IOException e) {
       LOG.warn("Failed to persist recovery event for DAG completion"
@@ -1714,7 +1745,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
             }
             DAGState endState = DAGState.FAILED;
             try {
-              dag.logJobHistoryUnsuccesfulEvent(endState);
+              dag.logJobHistoryUnsuccesfulEvent(endState, dag.getAllCounters());
             } catch (IOException e) {
               LOG.warn("Failed to persist recovery event for DAG completion"
                   + ", dagId=" + dag.dagId
@@ -2299,11 +2330,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       SingleArcTransition<DAGImpl, DAGEvent> {
     @Override
     public void transition(DAGImpl job, DAGEvent event) {
-      //TODO Is this JH event required.
       LOG.info(job.getID() + " terminating due to internal error");
       // terminate all vertices
-      job.enactKill(DAGTerminationCause.INTERNAL_ERROR,
-          VertexTerminationCause.INTERNAL_ERROR);
+      job.enactKill(DAGTerminationCause.INTERNAL_ERROR, VertexTerminationCause.INTERNAL_ERROR);
       job.setFinishTime();
       job.cancelCommits();
       job.finished(DAGState.ERROR);

http://git-wip-us.apache.org/repos/asf/tez/blob/ae3329f8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 5357063..b8f6459 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -788,6 +788,20 @@ public class TaskAttemptImpl implements TaskAttempt,
                 this.attemptId.getTaskID().getVertexID().getDAGId(),
                 DAGEventType.INTERNAL_ERROR)
             );
+      } catch (RuntimeException e) {
+        LOG.error("Uncaught exception when handling event " + event.getType()
+            + " at current state " + oldState + " for "
+            + this.attemptId, e);
+        eventHandler.handle(new DAGEventDiagnosticsUpdate(
+            this.attemptId.getTaskID().getVertexID().getDAGId(),
+            "Uncaught exception when handling event " + event.getType()
+                + " on TaskAttempt " + this.attemptId
+                + " at state " + oldState + ", error=" + e.getMessage()));
+        eventHandler.handle(
+            new DAGEvent(
+                this.attemptId.getTaskID().getVertexID().getDAGId(),
+                DAGEventType.INTERNAL_ERROR)
+        );
       }
       if (oldState != getInternalState()) {
         if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/ae3329f8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 46215d0..cc4f046 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -35,6 +35,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Maps;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -900,9 +901,13 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
       try {
         stateMachine.doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state for "
-            + this.taskId, e);
+        LOG.error("Can't handle this event" + event.getType()
+            + " at current state " + oldState + " for task " + this.taskId, e);
         internalError(event.getType());
+      } catch (RuntimeException e) {
+        LOG.error("Uncaught exception when trying handle event " + event.getType()
+            + " at current state " + oldState + " for task " + this.taskId, e);
+        internalErrorUncaughtException(event.getType(), e);
       }
       if (oldState != getInternalState()) {
         if (LOG.isDebugEnabled()) {
@@ -926,6 +931,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
         DAGEventType.INTERNAL_ERROR));
   }
 
+  protected void internalErrorUncaughtException(TaskEventType type, Exception e) {
+    eventHandler.handle(new DAGEventDiagnosticsUpdate(
+        this.taskId.getVertexID().getDAGId(), "Uncaught exception when handling  event "
+ type +
+        " on Task " + this.taskId + ", error=" + e.getMessage()));
+    eventHandler.handle(new DAGEvent(this.taskId.getVertexID().getDAGId(),
+        DAGEventType.INTERNAL_ERROR));
+  }
+
+
   private void sendTaskAttemptCompletionEvent(TezTaskAttemptID attemptId,
       TaskAttemptStateInternal attemptState) {
     eventHandler.handle(new VertexEventTaskAttemptCompleted(attemptId, attemptState));
@@ -1563,4 +1577,16 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
       */
     }
   }
+
+  @Private
+  @VisibleForTesting
+  void setCounters(TezCounters counters) {
+    try {
+      writeLock.lock();
+      this.counters = counters;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ae3329f8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 1419b06..cc1f489 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.common.ATSConstants;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.counters.LimitExceededException;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
@@ -716,6 +717,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
   final AtomicBoolean vmIsInitialized = new AtomicBoolean(false);
   final AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false);
 
+  private final AtomicBoolean internalErrorTriggered = new AtomicBoolean(false);
+
   @VisibleForTesting
   Map<Vertex, Edge> sourceVertices;
   private Map<Vertex, Edge> targetVertices;
@@ -1868,6 +1871,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
         addDiagnostic(message);
         eventHandler.handle(new VertexEvent(this.vertexId,
             VertexEventType.V_INTERNAL_ERROR));
+      } catch (RuntimeException e) {
+        String message = "Uncaught Exception when handling event " + event.getType() +
+            " on vertex " + this.vertexName +
+            " with vertexId " + this.vertexId +
+            " at current state " + oldState;
+        LOG.error(message, e);
+        addDiagnostic(message);
+        if (!internalErrorTriggered.getAndSet(true)) {
+          eventHandler.handle(new VertexEvent(this.vertexId,
+              VertexEventType.V_INTERNAL_ERROR));
+        }
       }
 
       if (oldState != getInternalState()) {
@@ -1928,16 +1942,24 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
 
   void logJobHistoryVertexFinishedEvent() throws IOException {
     this.setFinishTime();
-    logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, "");
+    logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, "",
+        getAllCounters());
   }
 
   void logJobHistoryVertexFailedEvent(VertexState state) throws IOException {
+    TezCounters counters = null;
+    try {
+      counters = getAllCounters();
+    } catch (LimitExceededException e) {
+      // Ignore as failed vertex
+      addDiagnostic("Counters limit exceeded: " + e.getMessage());
+    }
     logJobHistoryVertexCompletedHelper(state, clock.getTime(),
-        StringUtils.join(getDiagnostics(), LINE_SEPARATOR));
+        StringUtils.join(getDiagnostics(), LINE_SEPARATOR), counters);
   }
 
   private void logJobHistoryVertexCompletedHelper(VertexState finalState, long finishTime,
-                                                  String diagnostics) throws IOException
{
+                                                  String diagnostics, TezCounters counters)
throws IOException {
     Map<String, Integer> taskStats = new HashMap<String, Integer>();
     taskStats.put(ATSConstants.NUM_COMPLETED_TASKS, completedTaskCount);
     taskStats.put(ATSConstants.NUM_SUCCEEDED_TASKS, succeededTaskCount);
@@ -1948,7 +1970,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
 
     VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, vertexName, numTasks,
initTimeRequested,
         initedTime, startTimeRequested, startedTime, finishTime, finalState, diagnostics,
-        getAllCounters(), getVertexStats(), taskStats);
+        counters, getVertexStats(), taskStats);
     this.appContext.getHistoryHandler().handleCriticalEvent(
         new DAGHistoryEvent(getDAGId(), finishEvt));
   }
@@ -2083,7 +2105,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
   }
 
   private static VertexState finishWithTerminationCause(VertexImpl vertex) {
-    Preconditions.checkArgument(vertex.getTerminationCause()!= null, "TerminationCause is
not set");
+    Preconditions.checkArgument(vertex.getTerminationCause() != null, "TerminationCause is
not set");
     String diagnosticMsg = "Vertex did not succeed due to " + vertex.getTerminationCause()
         + ", failedTasks:" + vertex.failedTaskCount
         + " killedTasks:" + vertex.killedTaskCount;
@@ -2156,9 +2178,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
         break;
       case SUCCEEDED:
         try {
-          logJobHistoryVertexFinishedEvent();
-          eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
-              finalState));
+          try {
+            logJobHistoryVertexFinishedEvent();
+            eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
+                finalState));
+          } catch (LimitExceededException e) {
+            LOG.error("Counter limits exceeded for vertex: " + getLogIdentifier(), e);
+            finalState = VertexState.FAILED;
+            addDiagnostic("Counters limit exceeded: " + e.getMessage());
+            trySetTerminationCause(VertexTerminationCause.COUNTER_LIMITS_EXCEEDED);
+            logJobHistoryVertexFailedEvent(finalState);
+            eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
+                finalState));
+          }
         } catch (IOException e) {
           LOG.error("Failed to send vertex finished event to recovery", e);
           finalState = VertexState.FAILED;
@@ -4921,4 +4953,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
       LOG.debug("Vertex: " + vertexName + ", rack: " + rack.toString());
     }
   }
+
+  @Private
+  @VisibleForTesting
+  void setCounters(TezCounters counters) {
+    try {
+      writeLock.lock();
+      this.fullCounters = counters;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ae3329f8/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
index f91bc42..c70fdae 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
@@ -42,6 +42,7 @@ import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
 import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.LimitExceededException;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.client.ProgressBuilder;
@@ -511,12 +512,17 @@ public class AMWebController extends Controller {
     dagInfo.put("progress", Float.toString(dag.getCompletedTaskProgress()));
     dagInfo.put("status", dag.getState().toString());
 
-    if (counterNames != null && !counterNames.isEmpty()) {
-      TezCounters counters = dag.getCachedCounters();
-      Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters,
counterNames);
-      if (counterMap != null && !counterMap.isEmpty()) {
-        dagInfo.put("counters", counterMap);
+    try {
+      if (counterNames != null && !counterNames.isEmpty()) {
+        TezCounters counters = dag.getCachedCounters();
+        Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters,
counterNames);
+        if (counterMap != null && !counterMap.isEmpty()) {
+          dagInfo.put("counters", counterMap);
+        }
       }
+    } catch (LimitExceededException e) {
+      // Ignore
+      // TODO: add an error message instead for counter key
     }
     renderJSON(ImmutableMap.of(
         "dag", dagInfo
@@ -576,12 +582,17 @@ public class AMWebController extends Controller {
     vertexInfo.put("killedTaskAttempts",
         Integer.toString(vertexProgress.getKilledTaskAttemptCount()));
 
-    if (counterNames != null && !counterNames.isEmpty()) {
-      TezCounters counters = vertex.getCachedCounters();
-      Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters,
counterNames);
-      if (counterMap != null && !counterMap.isEmpty()) {
-        vertexInfo.put("counters", counterMap);
+    try {
+      if (counterNames != null && !counterNames.isEmpty()) {
+        TezCounters counters = vertex.getCachedCounters();
+        Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters,
counterNames);
+        if (counterMap != null && !counterMap.isEmpty()) {
+          vertexInfo.put("counters", counterMap);
+        }
       }
+    } catch (LimitExceededException e) {
+      // Ignore
+      // TODO: add an error message instead for counter key
     }
 
     return vertexInfo;
@@ -733,11 +744,17 @@ public class AMWebController extends Controller {
       taskInfo.put("progress", Float.toString(t.getProgress()));
       taskInfo.put("status", t.getState().toString());
 
-      TezCounters counters = t.getCounters();
-      Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters,
counterNames);
-      if (counterMap != null && !counterMap.isEmpty()) {
-        taskInfo.put("counters", counterMap);
+      try {
+        TezCounters counters = t.getCounters();
+        Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters,
counterNames);
+        if (counterMap != null && !counterMap.isEmpty()) {
+          taskInfo.put("counters", counterMap);
+        }
+      } catch (LimitExceededException e) {
+        // Ignore
+        // TODO: add an error message instead for counter key
       }
+
       tasksInfo.add(taskInfo);
     }
 
@@ -825,10 +842,15 @@ public class AMWebController extends Controller {
       attemptInfo.put("progress", Float.toString(a.getProgress()));
       attemptInfo.put("status", a.getState().toString());
 
-      TezCounters counters = a.getCounters();
-      Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters,
counterNames);
-      if (counterMap != null && !counterMap.isEmpty()) {
-        attemptInfo.put("counters", counterMap);
+      try {
+        TezCounters counters = a.getCounters();
+        Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters,
counterNames);
+        if (counterMap != null && !counterMap.isEmpty()) {
+          attemptInfo.put("counters", counterMap);
+        }
+      } catch (LimitExceededException e) {
+        // Ignore
+        // TODO: add an error message instead for counter key
       }
       attemptsInfo.add(attemptInfo);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/ae3329f8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 49f534b..c7dec36 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -38,6 +38,8 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.tez.common.counters.Limits;
+import org.apache.tez.common.counters.TezCounters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -185,6 +187,14 @@ public class TestDAGImpl {
   private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
   private ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10));
 
+  static {
+    Limits.reset();
+    Configuration conf = new Configuration(false);
+    conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX, 100);
+    conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS, 100);
+    Limits.setConfiguration(conf);
+  }
+
   private DAGImpl chooseDAG(TezDAGID curDAGId) {
     if (curDAGId.equals(dagId)) {
       return dag;
@@ -2209,4 +2219,33 @@ public class TestDAGImpl {
       }
     }
   }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testCounterLimits() {
+    initDAG(mrrDag);
+    dispatcher.await();
+    startDAG(mrrDag);
+    dispatcher.await();
+    for (int i=0; i<3; ++i) {
+      Vertex v = mrrDag.getVertex("vertex"+(i+1));
+      dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
+          TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
+      TezCounters ctrs = new TezCounters();
+      for (int j = 0; j < 50; ++j) {
+        ctrs.findCounter("g", "c" + i + "_" + j).increment(1);
+      }
+      ((VertexImpl) v).setCounters(ctrs);
+      dispatcher.await();
+      Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+      Assert.assertEquals(i+1, mrrDag.getSuccessfulVertices());
+    }
+
+    Assert.assertEquals(3, mrrDag.getSuccessfulVertices());
+    Assert.assertEquals(DAGState.FAILED, mrrDag.getState());
+    Assert.assertTrue("Diagnostics should contain counter limits error message",
+        StringUtils.join(mrrDag.getDiagnostics(), ",").contains("Counters limit exceeded"));
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ae3329f8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index e6387b3..e49f97d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -49,6 +49,8 @@ import java.util.concurrent.locks.ReentrantLock;
 import com.google.protobuf.ByteString;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.tez.common.counters.Limits;
+import org.apache.tez.common.counters.TezCounters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -239,6 +241,14 @@ public class TestVertexImpl {
   private StateChangeNotifierForTest updateTracker;
   private static TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption;
 
+  static {
+    Limits.reset();
+    Configuration conf = new Configuration(false);
+    conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX, 100);
+    conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS, 100);
+    Limits.setConfiguration(conf);
+  }
+
   public static class CountingOutputCommitter extends OutputCommitter {
 
     public int initCounter = 0;
@@ -6446,5 +6456,42 @@ public class TestVertexImpl {
     void setContext(InputInitializerContext context);
   }
 
+  @Test(timeout = 5000)
+  public void testCounterLimits() {
+    initAllVertices(VertexState.INITED);
+
+    VertexImpl v = vertices.get("vertex2");
+    startVertex(v);
+
+    TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+    TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
+
+    for (int i = 0; i < 2; ++i) {
+      TezCounters ctrs = new TezCounters();
+      for (int j = 0; j < 75; ++j) {
+        ctrs.findCounter("g", "c" + i + "_" + j).increment(1);
+      }
+      Task t = v.getTask(i);
+      ((TaskImpl) t).setCounters(ctrs);
+    }
+
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, v.getState());
+    Assert.assertEquals(1, v.getCompletedTasks());
+    Assert.assertTrue((0.5f) == v.getCompletedTaskProgress());
+
+    dispatcher.getEventHandler().handle(
+        new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.FAILED, v.getState());
+    Assert.assertEquals(2, v.getCompletedTasks());
+
+    System.out.println(v.getDiagnostics());
+    Assert.assertTrue("Diagnostics should contain counter limits error message",
+        StringUtils.join(v.getDiagnostics(), ",").contains("Counters limit exceeded"));
+
+  }
 
 }


Mime
View raw message