tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject [09/27] tez git commit: TEZ-1773. Add attempt failure cause enum to the attempt failed/killed history record (bikaS)
Date Wed, 10 Dec 2014 03:30:19 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index afc3433..d953fef 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -297,8 +297,8 @@ public class TestTaskRecovery {
     long taFinishTime = taStartTime + 100L;
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "",
-            new TezCounters()));
+            taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
+            "", new TezCounters()));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -329,8 +329,8 @@ public class TestTaskRecovery {
     long taFinishTime = taStartTime + 100L;
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.FAILED, "",
-            new TezCounters()));
+            taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
+            "", new TezCounters()));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -362,8 +362,8 @@ public class TestTaskRecovery {
     long taFinishTime = taStartTime + 100L;
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.KILLED, "",
-            new TezCounters()));
+            taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
+            "", new TezCounters()));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -397,8 +397,8 @@ public class TestTaskRecovery {
     long taFinishTime = taStartTime + 100L;
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "",
-            new TezCounters()));
+            taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
+            "", new TezCounters()));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -439,8 +439,8 @@ public class TestTaskRecovery {
     long taFinishTime = taStartTime + 100L;
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "",
-            new TezCounters()));
+            taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
+            "", new TezCounters()));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -451,8 +451,8 @@ public class TestTaskRecovery {
     // it is possible for TaskAttempt transit from SUCCEEDED to FAILURE due to output failure.
     recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.FAILED, "",
-            new TezCounters()));
+            taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
+            "", new TezCounters()));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -486,8 +486,8 @@ public class TestTaskRecovery {
     long taFinishTime = taStartTime + 100L;
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "",
-            new TezCounters()));
+            taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
+            "", new TezCounters()));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -498,8 +498,8 @@ public class TestTaskRecovery {
     // it is possible for TaskAttempt transit from SUCCEEDED to KILLED due to node failure.
     recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.KILLED, "",
-            new TezCounters()));
+            taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
+            "", new TezCounters()));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -537,8 +537,8 @@ public class TestTaskRecovery {
     long taFinishTime = taStartTime + 100L;
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "",
-            new TezCounters()));
+            taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
+            "", new TezCounters()));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -577,8 +577,8 @@ public class TestTaskRecovery {
     long taFinishTime = taStartTime + 100L;
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "",
-            new TezCounters()));
+            taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
+            "", new TezCounters()));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -658,8 +658,8 @@ public class TestTaskRecovery {
     long taFinishTime = taStartTime + 100L;
     recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-            taStartTime, taFinishTime, TaskAttemptState.KILLED, "",
-            new TezCounters()));
+            taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
+            "", new TezCounters()));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(TaskAttemptStateInternal.NEW,
         ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
@@ -700,7 +700,7 @@ public class TestTaskRecovery {
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
           mock(ContainerId.class), mock(NodeId.class), "", "", ""));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
-          0, TaskAttemptState.KILLED, "", null));
+          0, TaskAttemptState.KILLED, null, "", null));
     }
     assertEquals(maxFailedAttempts, task.getAttempts().size());
     assertEquals(0, task.failedAttempts);
@@ -730,7 +730,7 @@ public class TestTaskRecovery {
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
           mock(ContainerId.class), mock(NodeId.class), "", "", ""));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
-          0, TaskAttemptState.FAILED, "", null));
+          0, TaskAttemptState.FAILED, null, "", null));
     }
     assertEquals(maxFailedAttempts, task.getAttempts().size());
     assertEquals(maxFailedAttempts, task.failedAttempts);
@@ -760,7 +760,7 @@ public class TestTaskRecovery {
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
           mock(ContainerId.class), mock(NodeId.class), "", "", ""));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
-          0, TaskAttemptState.FAILED, "", null));
+          0, TaskAttemptState.FAILED, null, "", null));
     }
     assertEquals(maxFailedAttempts - 1, task.getAttempts().size());
     assertEquals(maxFailedAttempts - 1, task.failedAttempts);

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 9500c97..687908d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -56,6 +56,9 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -100,12 +103,15 @@ import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ClusterInfo;
+import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.RootInputInitializerManager;
 import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexState;
@@ -114,6 +120,8 @@ import org.apache.tez.dag.app.dag.VertexTerminationCause;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
@@ -131,9 +139,12 @@ import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
 import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
 import org.apache.tez.dag.app.dag.impl.TestVertexImpl.VertexManagerWithException.VMExceptionLocation;
 import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
+import org.apache.tez.dag.app.rm.container.AMContainerMap;
+import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
 import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -151,6 +162,7 @@ import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.test.EdgeManagerForTest;
 import org.apache.tez.test.VertexManagerPluginForTest;
@@ -2802,7 +2814,113 @@ public class TestVertexImpl {
     Assert.assertEquals(0, committer.commitCounter);
     Assert.assertEquals(1, committer.abortCounter);
   }
+  
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testVertexTaskAttemptProcessorFailure() {
+    initAllVertices(VertexState.INITED);
+
+    VertexImpl v = vertices.get("vertex1");
+
+    startVertex(v);
+    TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next();
+    ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2));
+    
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        new ContainerContextMatcher(), appContext);
+    containers.addContainerIfNew(container);
+    doReturn(containers).when(appContext).getAllContainers();
+
+    ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
+    Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
+
+    dispatcher.getEventHandler().handle(
+        new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent(
+            new TaskAttemptFailedEvent("Failed"), new EventMetaData(
+                EventProducerConsumerType.PROCESSOR, v.getName(), null, ta.getID())))));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, v.getState());
+    Assert.assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, ta.getTerminationCause());
+  }
 
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testVertexTaskAttemptInputFailure() {
+    initAllVertices(VertexState.INITED);
+
+    VertexImpl v = vertices.get("vertex1");
+
+    startVertex(v);
+    TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next();
+    ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2));
+    
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        new ContainerContextMatcher(), appContext);
+    containers.addContainerIfNew(container);
+    doReturn(containers).when(appContext).getAllContainers();
+
+    ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
+    Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
+
+    dispatcher.getEventHandler().handle(
+        new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent(
+            new TaskAttemptFailedEvent("Failed"), new EventMetaData(
+                EventProducerConsumerType.INPUT, v.getName(), null, ta.getID())))));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, v.getState());
+    Assert.assertEquals(TaskAttemptTerminationCause.INPUT_READ_ERROR, ta.getTerminationCause());
+  }
+
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testVertexTaskAttemptOutputFailure() {
+    initAllVertices(VertexState.INITED);
+
+    VertexImpl v = vertices.get("vertex1");
+
+    startVertex(v);
+    TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next();
+    ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2));
+    
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        new ContainerContextMatcher(), appContext);
+    containers.addContainerIfNew(container);
+    doReturn(containers).when(appContext).getAllContainers();
+
+    ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
+    Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
+
+    dispatcher.getEventHandler().handle(
+        new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent(
+            new TaskAttemptFailedEvent("Failed"), new EventMetaData(
+                EventProducerConsumerType.OUTPUT, v.getName(), null, ta.getID())))));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, v.getState());
+    Assert.assertEquals(TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR, ta.getTerminationCause());
+  }
+  
   @Test(timeout = 5000)
   public void testSourceVertexStartHandling() {
     LOG.info("Testing testSourceVertexStartHandling");
@@ -2819,21 +2937,6 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
-  public void testCounters() {
-    // FIXME need to test counters at vertex level
-  }
-
-  @Test(timeout = 5000)
-  public void testDiagnostics() {
-    // FIXME need to test diagnostics in various cases
-  }
-
-  @Test(timeout = 5000)
-  public void testTaskAttemptCompletionEvents() {
-    // FIXME need to test handling of task attempt events
-  }
-
-  @Test(timeout = 5000)
   public void testSourceTaskAttemptCompletionEvents() {
     LOG.info("Testing testSourceTaskAttemptCompletionEvents");
     initAllVertices(VertexState.INITED);

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index 4ec1916..d2dece3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -49,6 +49,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.junit.Assert;
 import org.junit.Before;
@@ -179,6 +180,8 @@ public class TestTaskSchedulerEventHandler {
     Assert.assertEquals("Container preempted externally. Container preempted by RM.", 
         completedEvent.getDiagnostics());
     Assert.assertTrue(completedEvent.isPreempted());
+    Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION,
+        completedEvent.getTerminationCause());
     Assert.assertFalse(completedEvent.isDiskFailed());
 
     schedulerHandler.stop();
@@ -186,6 +189,31 @@ public class TestTaskSchedulerEventHandler {
   }
   
   @Test (timeout = 5000)
+  public void testContainerInternalPreempted() throws IOException {
+    Configuration conf = new Configuration(false);
+    schedulerHandler.init(conf);
+    schedulerHandler.start();
+    
+    ContainerId mockCId = mock(ContainerId.class);
+    verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId)any());
+    schedulerHandler.preemptContainer(mockCId);
+    verify(mockTaskScheduler, times(1)).deallocateContainer(mockCId);
+    Assert.assertEquals(1, mockEventHandler.events.size());
+    Event event = mockEventHandler.events.get(0);
+    Assert.assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
+    AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
+    Assert.assertEquals(mockCId, completedEvent.getContainerId());
+    Assert.assertEquals("Container preempted internally", completedEvent.getDiagnostics());
+    Assert.assertFalse(completedEvent.isPreempted());
+    Assert.assertFalse(completedEvent.isDiskFailed());
+    Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
+        completedEvent.getTerminationCause());
+
+    schedulerHandler.stop();
+    schedulerHandler.close();
+  }
+  
+  @Test (timeout = 5000)
   public void testContainerDiskFailed() throws IOException {
     Configuration conf = new Configuration(false);
     schedulerHandler.init(conf);
@@ -211,6 +239,8 @@ public class TestTaskSchedulerEventHandler {
         completedEvent.getDiagnostics());
     Assert.assertFalse(completedEvent.isPreempted());
     Assert.assertTrue(completedEvent.isDiskFailed());
+    Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR,
+        completedEvent.getTerminationCause());
 
     schedulerHandler.stop();
     schedulerHandler.close();

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index c0be044..f273896 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -67,17 +67,21 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminatedBySystem;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.rm.AMSchedulerEventType;
 import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventHandler;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
@@ -87,9 +91,7 @@ import com.google.common.collect.Maps;
 
 
 public class TestAMContainer {
-
-
-  @Test
+  @Test (timeout=5000)
   // Assign before launch.
   public void tetSingleSuccessfulTaskFlow() {
     WrappedContainer wc = new WrappedContainer();
@@ -135,7 +137,7 @@ public class TestAMContainer {
     assertNull(wc.amContainer.getRunningTaskAttempt());
     verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
 
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
@@ -146,7 +148,7 @@ public class TestAMContainer {
     assertFalse(wc.amContainer.isInErrorState());
   }
 
-  @Test
+  @Test (timeout=5000)
   // Assign after launch.
   public void testSingleSuccessfulTaskFlow2() {
     WrappedContainer wc = new WrappedContainer();
@@ -191,7 +193,7 @@ public class TestAMContainer {
     assertNull(wc.amContainer.getRunningTaskAttempt());
     verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
 
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
@@ -202,7 +204,7 @@ public class TestAMContainer {
     assertFalse(wc.amContainer.isInErrorState());
   }
 
-  @Test
+  @Test (timeout=5000)
   public void testSingleSuccessfulTaskFlowStopRequest() {
     WrappedContainer wc = new WrappedContainer();
 
@@ -225,7 +227,7 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.STOPPING);
     wc.verifyNoOutgoingEvents();
 
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
@@ -238,7 +240,7 @@ public class TestAMContainer {
     assertFalse(wc.amContainer.isInErrorState());
   }
 
-  @Test
+  @Test (timeout=5000)
   public void testSingleSuccessfulTaskFlowFailedNMStopRequest() {
     WrappedContainer wc = new WrappedContainer();
 
@@ -264,7 +266,7 @@ public class TestAMContainer {
     assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() ==
         AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
 
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
@@ -278,7 +280,7 @@ public class TestAMContainer {
   }
 
   @SuppressWarnings("rawtypes")
-  @Test
+  @Test (timeout=5000)
   public void testMultipleAllocationsAtIdle() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
@@ -303,7 +305,7 @@ public class TestAMContainer {
     assertTrue(wc.amContainer.isInErrorState());
 
     wc.nmStopSent();
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     // 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -317,7 +319,7 @@ public class TestAMContainer {
   }
 
   @SuppressWarnings("rawtypes")
-  @Test
+  @Test (timeout=5000)
   public void testAllocationAtRunning() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
@@ -343,7 +345,7 @@ public class TestAMContainer {
     assertTrue(wc.amContainer.isInErrorState());
 
     wc.nmStopSent();
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     // 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -357,7 +359,7 @@ public class TestAMContainer {
   }
 
   @SuppressWarnings("rawtypes")
-  @Test
+  @Test (timeout=5000)
   public void testMultipleAllocationsAtLaunching() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
@@ -382,7 +384,7 @@ public class TestAMContainer {
     assertTrue(wc.amContainer.isInErrorState());
 
     wc.nmStopSent();
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     // 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -396,7 +398,7 @@ public class TestAMContainer {
   }
 
   @SuppressWarnings("rawtypes")
-  @Test
+  @Test (timeout=5000)
   public void testContainerTimedOutAtRunning() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
@@ -418,7 +420,7 @@ public class TestAMContainer {
         NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
     // TODO Should this be an RM DE-ALLOCATE instead ?
 
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -432,7 +434,7 @@ public class TestAMContainer {
   }
 
   @SuppressWarnings("rawtypes")
-  @Test
+  @Test (timeout=5000)
   public void testLaunchFailure() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
@@ -449,22 +451,28 @@ public class TestAMContainer {
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_CONTAINER_TERMINATING,
         AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+    for (Event e : outgoingEvents) {
+      if (e.getType() == TaskAttemptEventType.TA_CONTAINER_TERMINATING) {
+        Assert.assertEquals(TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,
+            ((TaskAttemptEventContainerTerminating)e).getTerminationCause());        
+      }
+    }
 
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED);
-
+    
     // Valid transition. Container complete, but not with an error.
     assertFalse(wc.amContainer.isInErrorState());
   }
 
-  @Test
+  @Test (timeout=5000)
   public void testContainerCompletedAtAllocated() {
     WrappedContainer wc = new WrappedContainer();
     wc.verifyState(AMContainerState.ALLOCATED);
 
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
 
@@ -472,7 +480,7 @@ public class TestAMContainer {
   }
 
   @SuppressWarnings("rawtypes")
-  @Test
+  @Test (timeout=5000)
   // Verify that incoming NM launched events to COMPLETED containers are
   // handled.
   public void testContainerCompletedAtLaunching() {
@@ -484,7 +492,7 @@ public class TestAMContainer {
 
     wc.assignTaskAttempt(wc.taskAttemptID);
 
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID);
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -492,6 +500,8 @@ public class TestAMContainer {
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+    Assert.assertEquals(TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,
+        ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause());
 
     assertFalse(wc.amContainer.isInErrorState());
 
@@ -501,9 +511,71 @@ public class TestAMContainer {
 
     assertFalse(wc.amContainer.isInErrorState());
   }
+  
+  @SuppressWarnings("rawtypes")
+  @Test (timeout=5000)
+  public void testContainerCompletedAtLaunchingSpecificClusterError() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+
+    wc.launchContainer();
+
+
+    wc.assignTaskAttempt(wc.taskAttemptID);
+
+    wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
+    wc.verifyState(AMContainerState.COMPLETED);
+    verify(wc.tal).registerRunningContainer(wc.containerID);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+    Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR,
+        ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
 
+    assertFalse(wc.amContainer.isInErrorState());
+
+    // Container launched generated by NM call.
+    wc.containerLaunched();
+    wc.verifyNoOutgoingEvents();
+
+    assertFalse(wc.amContainer.isInErrorState());
+  }
+  
   @SuppressWarnings("rawtypes")
-  @Test
+  @Test (timeout=5000)
+  public void testContainerCompletedAtLaunchingSpecificError() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+
+    wc.launchContainer();
+
+
+    wc.assignTaskAttempt(wc.taskAttemptID);
+
+    wc.containerCompleted(ContainerExitStatus.ABORTED, TaskAttemptTerminationCause.NODE_FAILED);
+    wc.verifyState(AMContainerState.COMPLETED);
+    verify(wc.tal).registerRunningContainer(wc.containerID);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+    Assert.assertEquals(TaskAttemptTerminationCause.NODE_FAILED,
+        ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause());
+
+    assertFalse(wc.amContainer.isInErrorState());
+
+    // Container launched generated by NM call.
+    wc.containerLaunched();
+    wc.verifyNoOutgoingEvents();
+
+    assertFalse(wc.amContainer.isInErrorState());
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Test (timeout=5000)
   public void testContainerCompletedAtIdle() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
@@ -514,7 +586,7 @@ public class TestAMContainer {
     wc.containerLaunched();
     wc.verifyState(AMContainerState.IDLE);
 
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID);
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -538,7 +610,7 @@ public class TestAMContainer {
   }
 
   @SuppressWarnings("rawtypes")
-  @Test
+  @Test (timeout=5000)
   public void testContainerCompletedAtRunning() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
@@ -550,7 +622,7 @@ public class TestAMContainer {
     wc.pullTaskToRun();
     wc.verifyState(AMContainerState.RUNNING);
 
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID);
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -574,7 +646,7 @@ public class TestAMContainer {
   }
 
   @SuppressWarnings("rawtypes")
-  @Test
+  @Test (timeout=5000)
   public void testContainerPreemptedAtRunning() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
@@ -586,7 +658,7 @@ public class TestAMContainer {
     wc.pullTaskToRun();
     wc.verifyState(AMContainerState.RUNNING);
 
-    wc.containerCompleted(ContainerExitStatus.PREEMPTED);
+    wc.containerCompleted(ContainerExitStatus.PREEMPTED, TaskAttemptTerminationCause.EXTERNAL_PREEMPTION);
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID);
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -594,6 +666,8 @@ public class TestAMContainer {
     verify(wc.chh).unregister(wc.containerID);
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION,
+        ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
 
@@ -608,9 +682,47 @@ public class TestAMContainer {
 
     assertFalse(wc.amContainer.isInErrorState());
   }
+
+  @SuppressWarnings("rawtypes")
+  @Test (timeout=5000)
+  public void testContainerInternallyPreemptedAtRunning() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+
+    wc.launchContainer();
+
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.containerLaunched();
+    wc.pullTaskToRun();
+    wc.verifyState(AMContainerState.RUNNING);
+
+    wc.containerCompleted(ContainerExitStatus.INVALID, TaskAttemptTerminationCause.INTERNAL_PREEMPTION);
+    wc.verifyState(AMContainerState.COMPLETED);
+    verify(wc.tal).registerRunningContainer(wc.containerID);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.chh).register(wc.containerID);
+    verify(wc.chh).unregister(wc.containerID);
+
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
+        ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause());
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+
+    assertFalse(wc.amContainer.isInErrorState());
+
+    // Pending task complete. (Ideally, container should be dead at this point
+    // and this event should not be generated. Network timeout on NM-RM heartbeat
+    // can cause it to be genreated)
+    wc.taskAttemptSucceeded(wc.taskAttemptID);
+    wc.verifyNoOutgoingEvents();
+    wc.verifyHistoryStopEvent();
+
+    assertFalse(wc.amContainer.isInErrorState());
+  }
   
   @SuppressWarnings("rawtypes")
-  @Test
+  @Test (timeout=5000)
   public void testContainerDiskFailedAtRunning() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
@@ -622,7 +734,7 @@ public class TestAMContainer {
     wc.pullTaskToRun();
     wc.verifyState(AMContainerState.RUNNING);
 
-    wc.containerCompleted(ContainerExitStatus.DISKS_FAILED);
+    wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID);
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -630,6 +742,8 @@ public class TestAMContainer {
     verify(wc.chh).unregister(wc.containerID);
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR,
+        ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
 
@@ -646,7 +760,7 @@ public class TestAMContainer {
   }
   
   @SuppressWarnings("rawtypes")
-  @Test
+  @Test (timeout=5000)
   public void testTaskAssignedToCompletedContainer() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
@@ -657,7 +771,7 @@ public class TestAMContainer {
     wc.pullTaskToRun();
     wc.taskAttemptSucceeded(wc.taskAttemptID);
 
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
 
     TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
@@ -677,7 +791,7 @@ public class TestAMContainer {
     assertTrue(wc.amContainer.isInErrorState());
   }
 
-  @Test
+  @Test (timeout=5000)
   public void testTaskPullAtLaunching() {
     WrappedContainer wc = new WrappedContainer();
 
@@ -690,7 +804,7 @@ public class TestAMContainer {
   }
 
   @SuppressWarnings("rawtypes")
-  @Test
+  @Test (timeout=5000)
   public void testNodeFailedAtIdle() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
@@ -712,11 +826,11 @@ public class TestAMContainer {
     for (Event event : outgoingEvents) {
       if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
         TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
-        assertEquals("nodeFailed", nfEvent.getDiagnosticInfo());
+        assertTrue(nfEvent.getDiagnosticInfo().contains("nodeFailed"));
       }
     }
 
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -726,7 +840,7 @@ public class TestAMContainer {
   }
 
   @SuppressWarnings("rawtypes")
-  @Test
+  @Test (timeout=5000)
   public void testNodeFailedAtIdleMultipleAttempts() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
@@ -756,13 +870,13 @@ public class TestAMContainer {
     for (Event event : outgoingEvents) {
       if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
         TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
-        assertEquals("nodeFailed", nfEvent.getDiagnosticInfo());
+        assertTrue(nfEvent.getDiagnosticInfo().contains("nodeFailed"));
       }
     }
 
     assertFalse(wc.amContainer.isInErrorState());
 
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     wc.verifyNoOutgoingEvents();
     wc.verifyHistoryStopEvent();
 
@@ -772,7 +886,7 @@ public class TestAMContainer {
   }
 
   @SuppressWarnings("rawtypes")
-  @Test
+  @Test (timeout=5000)
   public void testNodeFailedAtRunningMultipleAttempts() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
@@ -801,11 +915,11 @@ public class TestAMContainer {
     for (Event event : outgoingEvents) {
       if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
         TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
-        assertEquals("nodeFailed", nfEvent.getDiagnosticInfo());
+        assertTrue(nfEvent.getDiagnosticInfo().contains("nodeFailed"));
       }
     }
 
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -818,7 +932,7 @@ public class TestAMContainer {
   }
 
   @SuppressWarnings("rawtypes")
-  @Test
+  @Test (timeout=5000)
   public void testNodeFailedAtCompletedMultipleSuccessfulTAs() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
@@ -835,7 +949,7 @@ public class TestAMContainer {
     wc.taskAttemptSucceeded(taID2);
     wc.stopRequest();
     wc.nmStopSent();
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
 
     wc.nodeFailed();
@@ -849,7 +963,7 @@ public class TestAMContainer {
     assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
   }
 
-  @Test
+  @Test (timeout=5000)
   public void testDuplicateCompletedEvents() {
     WrappedContainer wc = new WrappedContainer();
 
@@ -865,17 +979,17 @@ public class TestAMContainer {
     wc.taskAttemptSucceeded(taID2);
     wc.stopRequest();
     wc.nmStopSent();
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
 
     wc.verifyNoOutgoingEvents();
 
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     wc.verifyNoOutgoingEvents();
     wc.verifyHistoryStopEvent();
   }
   
-  @Test
+  @Test (timeout=5000)
   public void testLocalResourceAddition() {
     WrappedContainer wc = new WrappedContainer();
 
@@ -926,13 +1040,13 @@ public class TestAMContainer {
     wc.taskAttemptSucceeded(taID3);
 
     // Verify references are cleared after a container completes.
-    wc.containerCompleted(false);
+    wc.containerCompleted();
     assertNull(wc.amContainer.containerLocalResources);
     assertNull(wc.amContainer.additionalLocalResources);
   }
 
   @SuppressWarnings("unchecked")
-  @Test
+  @Test (timeout=5000)
   public void testCredentialsTransfer() {
     WrappedContainerMultipleDAGs wc = new WrappedContainerMultipleDAGs();
 
@@ -1183,15 +1297,15 @@ public class TestAMContainer {
           AMContainerEventType.C_NM_STOP_FAILED));
     }
 
-    public void containerCompleted(boolean preempted) {
+    public void containerCompleted() {
       reset(eventHandler);
-      amContainer.handle(new AMContainerEventCompleted(containerID, 
-          (preempted ? ContainerExitStatus.PREEMPTED : ContainerExitStatus.SUCCESS), null));
+      amContainer.handle(new AMContainerEventCompleted(containerID, ContainerExitStatus.SUCCESS, null,
+          TaskAttemptTerminationCause.CONTAINER_EXITED));
     }
     
-    public void containerCompleted(int exitStatus) {
+    public void containerCompleted(int exitStatus, TaskAttemptTerminationCause errCause) {
       reset(eventHandler);
-      amContainer.handle(new AMContainerEventCompleted(containerID, exitStatus, null));
+      amContainer.handle(new AMContainerEventCompleted(containerID, exitStatus, null, errCause));
     }
 
     public void containerTimedOut() {

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 8913287..cd770a3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.history.events;
 import static org.junit.Assert.fail;
 
 import java.nio.ByteBuffer;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -44,6 +45,7 @@ import org.apache.tez.dag.app.dag.impl.VertexStats;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -471,7 +473,7 @@ public class TestHistoryEventsProtoConversion {
           TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
           "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
-          null, null);
+          null, null, null);
       TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getTaskAttemptID(),
@@ -491,7 +493,7 @@ public class TestHistoryEventsProtoConversion {
           TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
           "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
-          "diagnose", new TezCounters());
+          TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters());
       TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getTaskAttemptID(),
@@ -504,6 +506,8 @@ public class TestHistoryEventsProtoConversion {
           deserializedEvent.getState());
       Assert.assertEquals(event.getCounters(),
           deserializedEvent.getCounters());
+      Assert.assertEquals(event.getTaskAttemptError(),
+          deserializedEvent.getTaskAttemptError());
       logEvents(event, deserializedEvent);
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index a20c9fe..e0f8c21 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -59,6 +59,7 @@ import org.apache.tez.dag.history.events.VertexInitializedEvent;
 import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -155,7 +156,7 @@ public class TestHistoryEventJsonConversion {
           break;
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
-              random.nextInt(), TaskAttemptState.FAILED, null, null);
+              random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null);
           break;
         case CONTAINER_LAUNCHED:
           event = new ContainerLaunchedEvent(containerId, random.nextInt(),

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 4b6d648..91346ae 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -376,6 +376,9 @@ public class HistoryEventTimelineConversion {
     atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
     atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
     atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+    if (event.getTaskAttemptError() != null) {
+      atsEntity.addOtherInfo(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, event.getTaskAttemptError().name());
+    }
     atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
     atsEntity.addOtherInfo(ATSConstants.COUNTERS,
         DAGUtils.convertCountersToATSMap(event.getCounters()));

http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index ce47820..0f2942c 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -63,6 +63,7 @@ import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.history.logging.EntityTypes;
 import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -157,7 +158,7 @@ public class TestHistoryEventTimelineConversion {
           break;
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
-              random.nextInt(), TaskAttemptState.FAILED, null, null);
+              random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null);
           break;
         case CONTAINER_LAUNCHED:
           event = new ContainerLaunchedEvent(containerId, random.nextInt(),


Mime
View raw message