tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhiyu...@apache.org
Subject [01/50] [abbrv] tez git commit: TEZ-3601. Add another HistoryLogLevel to suppress TaskAttempts at specific levels. Contributed by Harish Jaiprakash. [Forced Update!]
Date Tue, 28 Mar 2017 19:47:42 GMT
Repository: tez
Updated Branches:
  refs/heads/TEZ-1190 cc1c5cc73 -> d1b08e37e (forced update)


TEZ-3601. Add another HistoryLogLevel to suppress TaskAttempts at
specific levels. Contributed by Harish Jaiprakash.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c0270cb3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c0270cb3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c0270cb3

Branch: refs/heads/TEZ-1190
Commit: c0270cb30a582ab2b5cbc8442054ce0c2a766c15
Parents: a77d22d
Author: Siddharth Seth <sseth@apache.org>
Authored: Sun Feb 5 18:24:15 2017 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Sun Feb 5 18:24:15 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/tez/dag/api/HistoryLogLevel.java |   1 +
 .../apache/tez/dag/api/TezConfiguration.java    |  12 +-
 .../org/apache/tez/common/TezUtilsInternal.java |  19 ++++
 .../tez/dag/history/HistoryEventHandler.java    | 114 ++++++++++++++++---
 .../tez/dag/history/HistoryEventType.java       |   4 +-
 .../dag/history/TestHistoryEventHandler.java    |  79 ++++++++++---
 7 files changed, 199 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c0270cb3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bf9c9bc..37438d9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3601. Add another HistoryLogLevel to suppress TaskAttempts at specific levels
   TEZ-3600. Fix flaky test: TestTokenCache
   TEZ-3589. add a unit test for amKeepAlive not being shutdown if an app takes a long time
to launch.
   TEZ-3417. Reduce sleep time on AM shutdown to reduce test runtimes
@@ -194,6 +195,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3601. Add another HistoryLogLevel to suppress TaskAttempts at specific levels
   TEZ-3462. Task attempt failure during container shutdown loses useful container diagnostics
   TEZ-3574. Container reuse won't pickup extra dag level local resource.
   TEZ-3566. Avoid caching fs isntances in TokenCache after a point.

http://git-wip-us.apache.org/repos/asf/tez/blob/c0270cb3/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java b/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java
index 5eb4785..96d74f9 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/HistoryLogLevel.java
@@ -34,6 +34,7 @@ public enum HistoryLogLevel {
   DAG,
   VERTEX,
   TASK,
+  TASK_ATTEMPT,
   ALL;
 
   public static final HistoryLogLevel DEFAULT = ALL;

http://git-wip-us.apache.org/repos/asf/tez/blob/c0270cb3/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index df0605c..fd71b35 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -20,7 +20,6 @@ package org.apache.tez.dag.api;
 
 import java.lang.reflect.Field;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -1263,6 +1262,17 @@ public class TezConfiguration extends Configuration {
       TEZ_PREFIX + "history.logging.log.level";
 
   /**
+   * List of comma separated enum values. Specifies the list of task attempt termination
causes,
+   * which have to be suppressed from being logged to ATS. The valid filters are defined
in the
+   * enum TaskAttemptTerminationCause. The filters are applied only if tez.history.logging.log.level
+   * is set to TASK_ATTEMPT.
+   */
+  @ConfigurationScope(Scope.DAG)
+  @ConfigurationProperty
+  public static final String TEZ_HISTORY_LOGGING_TASKATTEMPT_FILTERS =
+      TEZ_PREFIX + "history.logging.taskattempt-filters";
+
+  /**
    * Comma separated list of Integers. These are the values that were set for the config
value
    * for {@value #TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP}. The older values are
required so
    * that the groupIds generated previously will continue to be generated by the plugin.
If an older

http://git-wip-us.apache.org/repos/asf/tez/blob/c0270cb3/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 7b19293..5ba2972 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -25,8 +25,10 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.nio.charset.Charset;
 import java.util.BitSet;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -41,6 +43,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.log4j.Appender;
 import org.apache.tez.common.io.NonSyncByteArrayOutputStream;
@@ -338,6 +341,22 @@ public class TezUtilsInternal {
     }
   }
 
+  public static <T extends Enum<T>> Set<T> getEnums(Configuration conf,
String confName,
+      Class<T> enumType, String defaultValues) {
+    String[] names = conf.getStrings(confName);
+    if (names == null) {
+      names = StringUtils.getStrings(defaultValues);
+    }
+    if (names == null) {
+      return null;
+    }
+    Set<T> enums = new HashSet<>();
+    for (String name : names) {
+      enums.add(Enum.valueOf(enumType, name));
+    }
+    return enums;
+  }
+
   @Private
   public static void setHadoopCallerContext(HadoopShim hadoopShim, TezTaskAttemptID attemptID)
{
     hadoopShim.setHadoopCallerContext("tez_ta:" + attemptID.toString());

http://git-wip-us.apache.org/repos/asf/tez/blob/c0270cb3/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 042d022..79d1fc3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.history;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.slf4j.Logger;
@@ -27,13 +28,18 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.HistoryLogLevel;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.history.logging.HistoryLoggingService;
 import org.apache.tez.dag.history.recovery.RecoveryService;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
 
 public class HistoryEventHandler extends CompositeService {
 
@@ -45,8 +51,13 @@ public class HistoryEventHandler extends CompositeService {
   private HistoryLoggingService historyLoggingService;
 
   private HistoryLogLevel amHistoryLogLevel;
-  private Map<TezDAGID, HistoryLogLevel> dagIdToLogLevel =
-      new ConcurrentHashMap<TezDAGID, HistoryLogLevel>();
+  private final Map<TezDAGID, HistoryLogLevel> dagIdToLogLevel = new ConcurrentHashMap<>();
+  private Set<TaskAttemptTerminationCause> amTaskAttemptFilters;
+  private final Map<TezDAGID, Set<TaskAttemptTerminationCause>> dagIdToTaskAttemptFilters
=
+      new ConcurrentHashMap<>();
+
+  private final ConcurrentHashMap<TezTaskAttemptID, DAGHistoryEvent> suppressedEvents
=
+      new ConcurrentHashMap<>();
 
   public HistoryEventHandler(AppContext context) {
     super(HistoryEventHandler.class.getName());
@@ -80,6 +91,11 @@ public class HistoryEventHandler extends CompositeService {
     }
 
     amHistoryLogLevel = HistoryLogLevel.getLogLevel(context.getAMConf(), HistoryLogLevel.DEFAULT);
+    amTaskAttemptFilters = TezUtilsInternal.getEnums(
+        context.getAMConf(),
+        TezConfiguration.TEZ_HISTORY_LOGGING_TASKATTEMPT_FILTERS,
+        TaskAttemptTerminationCause.class,
+        null);
 
     super.serviceInit(conf);
   }
@@ -108,15 +124,20 @@ public class HistoryEventHandler extends CompositeService {
     if(dagId != null) {
       dagIdStr = dagId.toString();
     }
+    HistoryEvent historyEvent = event.getHistoryEvent();
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Handling history event"
-          + ", eventType=" + event.getHistoryEvent().getEventType());
+          + ", eventType=" + historyEvent.getEventType());
     }
-    if (recoveryEnabled && event.getHistoryEvent().isRecoveryEvent()) {
+    if (recoveryEnabled && historyEvent.isRecoveryEvent()) {
       recoveryService.handle(event);
     }
-    if (event.getHistoryEvent().isHistoryEvent() && shouldLogEvent(event)) {
+    if (historyEvent.isHistoryEvent() && shouldLogEvent(event)) {
+      DAGHistoryEvent suppressedEvent = getSupressedEvent(historyEvent);
+      if (suppressedEvent != null) {
+        historyLoggingService.handle(suppressedEvent);
+      }
       historyLoggingService.handle(event);
     }
 
@@ -140,23 +161,86 @@ public class HistoryEventHandler extends CompositeService {
     }
 
     HistoryEvent historyEvent = event.getHistoryEvent();
-    if (historyEvent.getEventType() == HistoryEventType.DAG_SUBMITTED) {
-      dagLogLevel = HistoryLogLevel.getLogLevel(((DAGSubmittedEvent)historyEvent).getConf(),
-          amHistoryLogLevel);
+    HistoryEventType eventType = historyEvent.getEventType();
+    if (eventType == HistoryEventType.DAG_SUBMITTED) {
+      Configuration dagConf = ((DAGSubmittedEvent)historyEvent).getConf();
+      dagLogLevel = HistoryLogLevel.getLogLevel(dagConf, amHistoryLogLevel);
       dagIdToLogLevel.put(dagId, dagLogLevel);
-    } else if (historyEvent.getEventType() == HistoryEventType.DAG_RECOVERED) {
+      maybeUpdateDagTaskAttemptFilters(dagId, dagLogLevel, dagConf);
+    } else if (eventType == HistoryEventType.DAG_RECOVERED) {
       if (context.getCurrentDAG() != null) {
-        dagLogLevel = HistoryLogLevel.getLogLevel(context.getCurrentDAG().getConf(),
-            amHistoryLogLevel);
+        Configuration dagConf = context.getCurrentDAG().getConf();
+        dagLogLevel = HistoryLogLevel.getLogLevel(dagConf, amHistoryLogLevel);
         dagIdToLogLevel.put(dagId, dagLogLevel);
+        maybeUpdateDagTaskAttemptFilters(dagId, dagLogLevel, dagConf);
+      }
+    } else if (eventType == HistoryEventType.DAG_FINISHED) {
+      dagIdToLogLevel.remove(dagId);
+      dagIdToTaskAttemptFilters.remove(dagId);
+      suppressedEvents.clear();
+    }
+
+    if (dagLogLevel.shouldLog(historyEvent.getEventType().getHistoryLogLevel())) {
+      return shouldLogTaskAttemptEvents(event, dagLogLevel);
+    }
+    return false;
+  }
+
+  // If the log level is set to TASK_ATTEMPT and filters are configured, then we should suppress
+  // the start event and publish it only when TaskAttemptFinishedEvent is received after
+  // matching against the filter.
+  // Note: if the AM is killed before we get the TaskAttemptFinishedEvent, we'll lose this
event.
+  private boolean shouldLogTaskAttemptEvents(DAGHistoryEvent event, HistoryLogLevel dagLogLevel)
{
+    HistoryEvent historyEvent = event.getHistoryEvent();
+    HistoryEventType eventType = historyEvent.getEventType();
+    if (dagLogLevel == HistoryLogLevel.TASK_ATTEMPT &&
+        (eventType == HistoryEventType.TASK_ATTEMPT_STARTED ||
+         eventType == HistoryEventType.TASK_ATTEMPT_FINISHED)) {
+      TezDAGID dagId = event.getDagID();
+      Set<TaskAttemptTerminationCause> filters = null;
+      if (dagId != null) {
+        filters = dagIdToTaskAttemptFilters.get(dagId);
+      }
+      if (filters == null) {
+        filters = amTaskAttemptFilters;
+      }
+      if (filters == null) {
+        return true;
       }
-    } else if (historyEvent.getEventType() == HistoryEventType.DAG_FINISHED) {
-      if (dagIdToLogLevel.containsKey(dagId)) {
-        dagIdToLogLevel.remove(dagId);
+      if (eventType == HistoryEventType.TASK_ATTEMPT_STARTED) {
+        suppressedEvents.put(((TaskAttemptStartedEvent)historyEvent).getTaskAttemptID(),
event);
+        return false;
+      } else { // TaskAttemptFinishedEvent
+        TaskAttemptFinishedEvent finishedEvent = (TaskAttemptFinishedEvent)historyEvent;
+        if (filters.contains(finishedEvent.getTaskAttemptError())) {
+          suppressedEvents.remove(finishedEvent.getTaskAttemptID());
+          return false;
+        }
       }
     }
+    return true;
+  }
 
-    return dagLogLevel.shouldLog(historyEvent.getEventType().getHistoryLogLevel());
+  private void maybeUpdateDagTaskAttemptFilters(TezDAGID dagId, HistoryLogLevel dagLogLevel,
+      Configuration dagConf) {
+    if (dagLogLevel == HistoryLogLevel.TASK_ATTEMPT) {
+      Set<TaskAttemptTerminationCause> filters = TezUtilsInternal.getEnums(
+          dagConf,
+          TezConfiguration.TEZ_HISTORY_LOGGING_TASKATTEMPT_FILTERS,
+          TaskAttemptTerminationCause.class,
+          null);
+      if (filters != null) {
+        dagIdToTaskAttemptFilters.put(dagId, filters);
+      }
+    }
+  }
+
+  private DAGHistoryEvent getSupressedEvent(HistoryEvent historyEvent) {
+    if (historyEvent.getEventType() == HistoryEventType.TASK_ATTEMPT_FINISHED) {
+      TaskAttemptFinishedEvent finishedEvent = (TaskAttemptFinishedEvent)historyEvent;
+      return suppressedEvents.remove(finishedEvent.getTaskAttemptID());
+    }
+    return null;
   }
 
   public void handle(DAGHistoryEvent event) {

http://git-wip-us.apache.org/repos/asf/tez/blob/c0270cb3/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index a41d0e6..a536fdf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -36,8 +36,8 @@ public enum HistoryEventType {
   VERTEX_FINISHED(HistoryLogLevel.VERTEX),
   TASK_STARTED(HistoryLogLevel.TASK),
   TASK_FINISHED(HistoryLogLevel.TASK),
-  TASK_ATTEMPT_STARTED(HistoryLogLevel.ALL),
-  TASK_ATTEMPT_FINISHED(HistoryLogLevel.ALL),
+  TASK_ATTEMPT_STARTED(HistoryLogLevel.TASK_ATTEMPT),
+  TASK_ATTEMPT_FINISHED(HistoryLogLevel.TASK_ATTEMPT),
   CONTAINER_LAUNCHED(HistoryLogLevel.ALL),
   CONTAINER_STOPPED(HistoryLogLevel.ALL),
   DAG_COMMIT_STARTED(HistoryLogLevel.DAG),

http://git-wip-us.apache.org/repos/asf/tez/blob/c0270cb3/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
index c8a076d..4c0fe3f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java
@@ -32,23 +32,29 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.dag.api.HistoryLogLevel;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.ContainerStoppedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGRecoveredEvent;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.history.logging.HistoryLoggingService;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.hadoop.shim.HadoopShim;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestHistoryEventHandler {
@@ -56,42 +62,69 @@ public class TestHistoryEventHandler {
   private static ApplicationId appId = ApplicationId.newInstance(1000l, 1);
   private static ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId,
1);
   private static String user = "TEST_USER";
+  private Configuration baseConfig;
+
+  @Before
+  public void setupConfig() {
+    baseConfig = new Configuration(false);
+  }
 
   @Test
   public void testAll() {
-    testLogLevel(null, 6);
+    testLogLevel(null, 11);
     testLogLevel(HistoryLogLevel.NONE, 0);
     testLogLevel(HistoryLogLevel.AM, 1);
     testLogLevel(HistoryLogLevel.DAG, 3);
     testLogLevel(HistoryLogLevel.VERTEX, 4);
     testLogLevel(HistoryLogLevel.TASK, 5);
-    testLogLevel(HistoryLogLevel.ALL, 6);
+    testLogLevel(HistoryLogLevel.TASK_ATTEMPT, 9);
+    testLogLevel(HistoryLogLevel.ALL, 11);
+  }
+
+  @Test
+  public void testTaskAttemptFilters() {
+    baseConfig.set(TezConfiguration.TEZ_HISTORY_LOGGING_TASKATTEMPT_FILTERS,
+        "EXTERNAL_PREEMPTION,INTERRUPTED_BY_USER");
+    testLogLevel(HistoryLogLevel.TASK_ATTEMPT, 5);
+    testLogLevelWithRecovery(HistoryLogLevel.TASK_ATTEMPT, 5);
+
+    baseConfig.set(TezConfiguration.TEZ_HISTORY_LOGGING_TASKATTEMPT_FILTERS,
+        "EXTERNAL_PREEMPTION");
+    testLogLevel(HistoryLogLevel.TASK_ATTEMPT, 7);
+    testLogLevelWithRecovery(HistoryLogLevel.TASK_ATTEMPT, 7);
+
+    baseConfig.set(TezConfiguration.TEZ_HISTORY_LOGGING_TASKATTEMPT_FILTERS, "INTERNAL_PREEMPTION");
+    testLogLevel(HistoryLogLevel.TASK_ATTEMPT, 9);
+    testLogLevelWithRecovery(HistoryLogLevel.TASK_ATTEMPT, 9);
   }
 
   @Test
   public void testWithDAGRecovery() {
-    testLogLevelWithRecovery(null, 6);
+    testLogLevelWithRecovery(null, 11);
     testLogLevelWithRecovery(HistoryLogLevel.AM, 1);
     testLogLevelWithRecovery(HistoryLogLevel.DAG, 3);
     testLogLevelWithRecovery(HistoryLogLevel.VERTEX, 4);
     testLogLevelWithRecovery(HistoryLogLevel.TASK, 5);
-    testLogLevelWithRecovery(HistoryLogLevel.ALL, 6);
+    testLogLevelWithRecovery(HistoryLogLevel.TASK_ATTEMPT, 9);
+    testLogLevelWithRecovery(HistoryLogLevel.ALL, 11);
   }
 
   @Test
   public void testMultipleDag() {
-    testLogLevel(null, HistoryLogLevel.NONE, 7);
-    testLogLevel(null, HistoryLogLevel.AM, 7);
-    testLogLevel(null, HistoryLogLevel.DAG, 9);
-    testLogLevel(null, HistoryLogLevel.VERTEX, 10);
-    testLogLevel(null, HistoryLogLevel.TASK, 11);
-    testLogLevel(null, HistoryLogLevel.ALL, 12);
+    testLogLevel(null, HistoryLogLevel.NONE, 14);
+    testLogLevel(null, HistoryLogLevel.AM, 14);
+    testLogLevel(null, HistoryLogLevel.DAG, 16);
+    testLogLevel(null, HistoryLogLevel.VERTEX, 17);
+    testLogLevel(null, HistoryLogLevel.TASK, 18);
+    testLogLevel(null, HistoryLogLevel.TASK_ATTEMPT, 22);
+    testLogLevel(null, HistoryLogLevel.ALL, 22);
     testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.NONE, 5);
     testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.AM, 5);
     testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.DAG, 7);
     testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.VERTEX, 8);
     testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.TASK, 9);
-    testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.ALL, 10);
+    testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.TASK_ATTEMPT, 13);
+    testLogLevel(HistoryLogLevel.VERTEX, HistoryLogLevel.ALL, 13);
     testLogLevel(HistoryLogLevel.NONE, HistoryLogLevel.NONE, 0);
   }
 
@@ -153,7 +186,7 @@ public class TestHistoryEventHandler {
   }
 
   private HistoryEventHandler createHandler(HistoryLogLevel logLevel) {
-    Configuration conf = new Configuration(false);
+    Configuration conf = new Configuration(baseConfig);
     conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
     conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
         InMemoryHistoryLoggingService.class.getName());
@@ -181,6 +214,7 @@ public class TestHistoryEventHandler {
 
     long time = System.currentTimeMillis();
     Configuration conf = new Configuration(inConf);
+
     historyEvents.add(new DAGHistoryEvent(null,
         new AMStartedEvent(attemptId, time, user)));
     historyEvents.add(new DAGHistoryEvent(dagId,
@@ -189,16 +223,33 @@ public class TestHistoryEventHandler {
     TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
     historyEvents.add(new DAGHistoryEvent(dagId,
         new VertexStartedEvent(vertexID, time, time)));
+    ContainerId containerId = ContainerId.newContainerId(attemptId, dagId.getId());
     TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1);
     historyEvents.add(new DAGHistoryEvent(dagId,
         new TaskStartedEvent(tezTaskID, "test", time, time)));
+    historyEvents.add(
+        new DAGHistoryEvent(new ContainerLaunchedEvent(containerId, time, attemptId)));
     historyEvents.add(new DAGHistoryEvent(dagId,
         new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID, 1), "test", time,
-            ContainerId.newContainerId(attemptId, 1), NodeId.newInstance("localhost", 8765),
null,
-            null, null)));
+            containerId, NodeId.newInstance("localhost", 8765), null, null, null)));
+    historyEvents.add(new DAGHistoryEvent(dagId,
+        new TaskAttemptFinishedEvent(TezTaskAttemptID.getInstance(tezTaskID, 1), "test",
time,
+            time + 1, TaskAttemptState.KILLED, null,
+            TaskAttemptTerminationCause.EXTERNAL_PREEMPTION, "", null, null, null, time,
null, time,
+            containerId, NodeId.newInstance("localhost", 8765), null, null, null)));
+    historyEvents.add(new DAGHistoryEvent(dagId,
+        new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(tezTaskID, 2), "test", time,
+            containerId, NodeId.newInstance("localhost", 8765), null, null, null)));
+    historyEvents.add(new DAGHistoryEvent(dagId,
+        new TaskAttemptFinishedEvent(TezTaskAttemptID.getInstance(tezTaskID, 2), "test",
time + 2,
+            time + 3, TaskAttemptState.KILLED, null,
+            TaskAttemptTerminationCause.INTERRUPTED_BY_USER, "", null, null, null, time,
null,
+            time + 2, containerId, NodeId.newInstance("localhost", 8765), null, null, null)));
     historyEvents.add(new DAGHistoryEvent(dagId,
         new DAGFinishedEvent(dagId, time, time, DAGState.SUCCEEDED, null, null, user, "test",
null,
             attemptId, DAGPlan.getDefaultInstance())));
+    historyEvents.add(
+        new DAGHistoryEvent(new ContainerStoppedEvent(containerId, time + 4, 0, attemptId)));
     return historyEvents;
   }
 }


Mime
View raw message