Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9A32910770 for ; Mon, 24 Nov 2014 19:15:59 +0000 (UTC) Received: (qmail 6290 invoked by uid 500); 24 Nov 2014 19:15:59 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 6243 invoked by uid 500); 24 Nov 2014 19:15:59 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 6233 invoked by uid 99); 24 Nov 2014 19:15:59 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Nov 2014 19:15:59 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 39775A1740D; Mon, 24 Nov 2014 19:15:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bikas@apache.org To: commits@tez.apache.org Date: Mon, 24 Nov 2014 19:15:59 -0000 Message-Id: <357c83886ae14330b27cf108cab40a6b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] tez git commit: TEZ-1773. Add attempt failure cause enum to the attempt failed/killed history record (bikaS) Repository: tez Updated Branches: refs/heads/master e8294b886 -> 81eef37d9 http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java index afc3433..d953fef 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java @@ -297,8 +297,8 @@ public class TestTaskRecovery { long taFinishTime = taStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, + "", new TezCounters())); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -329,8 +329,8 @@ public class TestTaskRecovery { long taFinishTime = taStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.FAILED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.FAILED, null, + "", new TezCounters())); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -362,8 +362,8 @@ public class TestTaskRecovery { long taFinishTime = taStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.KILLED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.KILLED, null, + "", new TezCounters())); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -397,8 +397,8 @@ public class TestTaskRecovery { long taFinishTime = taStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, + "", new TezCounters())); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -439,8 +439,8 @@ public class TestTaskRecovery { long taFinishTime = taStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, + "", new TezCounters())); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -451,8 +451,8 @@ public class TestTaskRecovery { // it is possible for TaskAttempt transit from SUCCEEDED to FAILURE due to output failure. recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.FAILED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.FAILED, null, + "", new TezCounters())); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -486,8 +486,8 @@ public class TestTaskRecovery { long taFinishTime = taStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, + "", new TezCounters())); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -498,8 +498,8 @@ public class TestTaskRecovery { // it is possible for TaskAttempt transit from SUCCEEDED to KILLED due to node failure. recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.KILLED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.KILLED, null, + "", new TezCounters())); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -537,8 +537,8 @@ public class TestTaskRecovery { long taFinishTime = taStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, + "", new TezCounters())); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -577,8 +577,8 @@ public class TestTaskRecovery { long taFinishTime = taStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, + "", new TezCounters())); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -658,8 +658,8 @@ public class TestTaskRecovery { long taFinishTime = taStartTime + 100L; recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - taStartTime, taFinishTime, TaskAttemptState.KILLED, "", - new TezCounters())); + taStartTime, taFinishTime, TaskAttemptState.KILLED, null, + "", new TezCounters())); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(TaskAttemptStateInternal.NEW, ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState()); @@ -700,7 +700,7 @@ public class TestTaskRecovery { task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, mock(ContainerId.class), mock(NodeId.class), "", "", "")); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, - 0, TaskAttemptState.KILLED, "", null)); + 0, TaskAttemptState.KILLED, null, "", null)); } assertEquals(maxFailedAttempts, task.getAttempts().size()); assertEquals(0, task.failedAttempts); @@ -730,7 +730,7 @@ public class TestTaskRecovery { task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, mock(ContainerId.class), mock(NodeId.class), "", "", "")); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, - 0, TaskAttemptState.FAILED, "", null)); + 0, TaskAttemptState.FAILED, null, "", null)); } assertEquals(maxFailedAttempts, task.getAttempts().size()); assertEquals(maxFailedAttempts, task.failedAttempts); @@ -760,7 +760,7 @@ public class TestTaskRecovery { task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, mock(ContainerId.class), mock(NodeId.class), "", "", "")); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, - 0, TaskAttemptState.FAILED, "", null)); + 0, TaskAttemptState.FAILED, null, "", null)); } assertEquals(maxFailedAttempts - 1, task.getAttempts().size()); assertEquals(maxFailedAttempts - 1, task.failedAttempts); http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 9500c97..687908d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -56,6 +56,9 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -100,12 +103,15 @@ import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto; import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.ClusterInfo; +import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.RootInputInitializerManager; import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.app.dag.Task; +import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.VertexState; @@ -114,6 +120,8 @@ import org.apache.tez.dag.app.dag.VertexTerminationCause; import org.apache.tez.dag.app.dag.event.DAGEvent; import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.dag.event.TaskEvent; import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate; @@ -131,9 +139,12 @@ import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source; import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo; import org.apache.tez.dag.app.dag.impl.TestVertexImpl.VertexManagerWithException.VMExceptionLocation; import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler; +import org.apache.tez.dag.app.rm.container.AMContainerMap; +import org.apache.tez.dag.app.rm.container.ContainerContextMatcher; import org.apache.tez.dag.history.HistoryEventHandler; import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; @@ -151,6 +162,7 @@ import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.events.InputInitializerEvent; import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent; +import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.test.EdgeManagerForTest; import org.apache.tez.test.VertexManagerPluginForTest; @@ -2802,7 +2814,113 @@ public class TestVertexImpl { Assert.assertEquals(0, committer.commitCounter); Assert.assertEquals(1, committer.abortCounter); } + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testVertexTaskAttemptProcessorFailure() { + initAllVertices(VertexState.INITED); + + VertexImpl v = vertices.get("vertex1"); + + startVertex(v); + TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next(); + ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2)); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + new ContainerContextMatcher(), appContext); + containers.addContainerIfNew(container); + doReturn(containers).when(appContext).getAllContainers(); + + ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null)); + Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState()); + + dispatcher.getEventHandler().handle( + new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent( + new TaskAttemptFailedEvent("Failed"), new EventMetaData( + EventProducerConsumerType.PROCESSOR, v.getName(), null, ta.getID()))))); + dispatcher.await(); + Assert.assertEquals(VertexState.RUNNING, v.getState()); + Assert.assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, ta.getTerminationCause()); + } + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testVertexTaskAttemptInputFailure() { + initAllVertices(VertexState.INITED); + + VertexImpl v = vertices.get("vertex1"); + + startVertex(v); + TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next(); + ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2)); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + new ContainerContextMatcher(), appContext); + containers.addContainerIfNew(container); + doReturn(containers).when(appContext).getAllContainers(); + + ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null)); + Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState()); + + dispatcher.getEventHandler().handle( + new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent( + new TaskAttemptFailedEvent("Failed"), new EventMetaData( + EventProducerConsumerType.INPUT, v.getName(), null, ta.getID()))))); + dispatcher.await(); + Assert.assertEquals(VertexState.RUNNING, v.getState()); + Assert.assertEquals(TaskAttemptTerminationCause.INPUT_READ_ERROR, ta.getTerminationCause()); + } + + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testVertexTaskAttemptOutputFailure() { + initAllVertices(VertexState.INITED); + + VertexImpl v = vertices.get("vertex1"); + + startVertex(v); + TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next(); + ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2)); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + new ContainerContextMatcher(), appContext); + containers.addContainerIfNew(container); + doReturn(containers).when(appContext).getAllContainers(); + + ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null)); + Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState()); + + dispatcher.getEventHandler().handle( + new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent( + new TaskAttemptFailedEvent("Failed"), new EventMetaData( + EventProducerConsumerType.OUTPUT, v.getName(), null, ta.getID()))))); + dispatcher.await(); + Assert.assertEquals(VertexState.RUNNING, v.getState()); + Assert.assertEquals(TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR, ta.getTerminationCause()); + } + @Test(timeout = 5000) public void testSourceVertexStartHandling() { LOG.info("Testing testSourceVertexStartHandling"); @@ -2819,21 +2937,6 @@ public class TestVertexImpl { } @Test(timeout = 5000) - public void testCounters() { - // FIXME need to test counters at vertex level - } - - @Test(timeout = 5000) - public void testDiagnostics() { - // FIXME need to test diagnostics in various cases - } - - @Test(timeout = 5000) - public void testTaskAttemptCompletionEvents() { - // FIXME need to test handling of task attempt events - } - - @Test(timeout = 5000) public void testSourceTaskAttemptCompletionEvents() { LOG.info("Testing testSourceTaskAttemptCompletionEvents"); initAllVertices(VertexState.INITED); http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java index 4ec1916..d2dece3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java @@ -49,6 +49,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted; import org.apache.tez.dag.app.rm.container.AMContainerEventType; import org.apache.tez.dag.app.rm.container.AMContainerMap; import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; import org.junit.Assert; import org.junit.Before; @@ -179,6 +180,8 @@ public class TestTaskSchedulerEventHandler { Assert.assertEquals("Container preempted externally. Container preempted by RM.", completedEvent.getDiagnostics()); Assert.assertTrue(completedEvent.isPreempted()); + Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION, + completedEvent.getTerminationCause()); Assert.assertFalse(completedEvent.isDiskFailed()); schedulerHandler.stop(); @@ -186,6 +189,31 @@ public class TestTaskSchedulerEventHandler { } @Test (timeout = 5000) + public void testContainerInternalPreempted() throws IOException { + Configuration conf = new Configuration(false); + schedulerHandler.init(conf); + schedulerHandler.start(); + + ContainerId mockCId = mock(ContainerId.class); + verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId)any()); + schedulerHandler.preemptContainer(mockCId); + verify(mockTaskScheduler, times(1)).deallocateContainer(mockCId); + Assert.assertEquals(1, mockEventHandler.events.size()); + Event event = mockEventHandler.events.get(0); + Assert.assertEquals(AMContainerEventType.C_COMPLETED, event.getType()); + AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event; + Assert.assertEquals(mockCId, completedEvent.getContainerId()); + Assert.assertEquals("Container preempted internally", completedEvent.getDiagnostics()); + Assert.assertFalse(completedEvent.isPreempted()); + Assert.assertFalse(completedEvent.isDiskFailed()); + Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION, + completedEvent.getTerminationCause()); + + schedulerHandler.stop(); + schedulerHandler.close(); + } + + @Test (timeout = 5000) public void testContainerDiskFailed() throws IOException { Configuration conf = new Configuration(false); schedulerHandler.init(conf); @@ -211,6 +239,8 @@ public class TestTaskSchedulerEventHandler { completedEvent.getDiagnostics()); Assert.assertFalse(completedEvent.isPreempted()); Assert.assertTrue(completedEvent.isDiskFailed()); + Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR, + completedEvent.getTerminationCause()); schedulerHandler.stop(); schedulerHandler.close(); http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index c0be044..f273896 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -67,17 +67,21 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminatedBySystem; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating; import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed; import org.apache.tez.dag.app.dag.event.TaskAttemptEventType; import org.apache.tez.dag.app.rm.AMSchedulerEventType; import org.apache.tez.dag.app.rm.NMCommunicatorEventType; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.HistoryEventHandler; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.impl.TaskSpec; +import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; @@ -87,9 +91,7 @@ import com.google.common.collect.Maps; public class TestAMContainer { - - - @Test + @Test (timeout=5000) // Assign before launch. public void tetSingleSuccessfulTaskFlow() { WrappedContainer wc = new WrappedContainer(); @@ -135,7 +137,7 @@ public class TestAMContainer { assertNull(wc.amContainer.getRunningTaskAttempt()); verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); @@ -146,7 +148,7 @@ public class TestAMContainer { assertFalse(wc.amContainer.isInErrorState()); } - @Test + @Test (timeout=5000) // Assign after launch. public void testSingleSuccessfulTaskFlow2() { WrappedContainer wc = new WrappedContainer(); @@ -191,7 +193,7 @@ public class TestAMContainer { assertNull(wc.amContainer.getRunningTaskAttempt()); verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); @@ -202,7 +204,7 @@ public class TestAMContainer { assertFalse(wc.amContainer.isInErrorState()); } - @Test + @Test (timeout=5000) public void testSingleSuccessfulTaskFlowStopRequest() { WrappedContainer wc = new WrappedContainer(); @@ -225,7 +227,7 @@ public class TestAMContainer { wc.verifyState(AMContainerState.STOPPING); wc.verifyNoOutgoingEvents(); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); @@ -238,7 +240,7 @@ public class TestAMContainer { assertFalse(wc.amContainer.isInErrorState()); } - @Test + @Test (timeout=5000) public void testSingleSuccessfulTaskFlowFailedNMStopRequest() { WrappedContainer wc = new WrappedContainer(); @@ -264,7 +266,7 @@ public class TestAMContainer { assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() == AMSchedulerEventType.S_CONTAINER_DEALLOCATE); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); @@ -278,7 +280,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testMultipleAllocationsAtIdle() { WrappedContainer wc = new WrappedContainer(); List outgoingEvents; @@ -303,7 +305,7 @@ public class TestAMContainer { assertTrue(wc.amContainer.isInErrorState()); wc.nmStopSent(); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); // 1 Inform scheduler. 2 TERMINATED to TaskAttempt. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); @@ -317,7 +319,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testAllocationAtRunning() { WrappedContainer wc = new WrappedContainer(); List outgoingEvents; @@ -343,7 +345,7 @@ public class TestAMContainer { assertTrue(wc.amContainer.isInErrorState()); wc.nmStopSent(); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); // 1 Inform scheduler. 2 TERMINATED to TaskAttempt. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); @@ -357,7 +359,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testMultipleAllocationsAtLaunching() { WrappedContainer wc = new WrappedContainer(); List outgoingEvents; @@ -382,7 +384,7 @@ public class TestAMContainer { assertTrue(wc.amContainer.isInErrorState()); wc.nmStopSent(); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); // 1 Inform scheduler. 2 TERMINATED to TaskAttempt. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); @@ -396,7 +398,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testContainerTimedOutAtRunning() { WrappedContainer wc = new WrappedContainer(); List outgoingEvents; @@ -418,7 +420,7 @@ public class TestAMContainer { NMCommunicatorEventType.CONTAINER_STOP_REQUEST); // TODO Should this be an RM DE-ALLOCATE instead ? - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -432,7 +434,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testLaunchFailure() { WrappedContainer wc = new WrappedContainer(); List outgoingEvents; @@ -449,22 +451,28 @@ public class TestAMContainer { verifyUnOrderedOutgoingEventTypes(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATING, AMSchedulerEventType.S_CONTAINER_DEALLOCATE); + for (Event e : outgoingEvents) { + if (e.getType() == TaskAttemptEventType.TA_CONTAINER_TERMINATING) { + Assert.assertEquals(TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED, + ((TaskAttemptEventContainerTerminating)e).getTerminationCause()); + } + } - wc.containerCompleted(false); + wc.containerCompleted(); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATED); - + // Valid transition. Container complete, but not with an error. assertFalse(wc.amContainer.isInErrorState()); } - @Test + @Test (timeout=5000) public void testContainerCompletedAtAllocated() { WrappedContainer wc = new WrappedContainer(); wc.verifyState(AMContainerState.ALLOCATED); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); @@ -472,7 +480,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) // Verify that incoming NM launched events to COMPLETED containers are // handled. public void testContainerCompletedAtLaunching() { @@ -484,7 +492,7 @@ public class TestAMContainer { wc.assignTaskAttempt(wc.taskAttemptID); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID); verify(wc.tal).unregisterRunningContainer(wc.containerID); @@ -492,6 +500,8 @@ public class TestAMContainer { outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATED); + Assert.assertEquals(TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED, + ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause()); assertFalse(wc.amContainer.isInErrorState()); @@ -501,9 +511,71 @@ public class TestAMContainer { assertFalse(wc.amContainer.isInErrorState()); } + + @SuppressWarnings("rawtypes") + @Test (timeout=5000) + public void testContainerCompletedAtLaunchingSpecificClusterError() { + WrappedContainer wc = new WrappedContainer(); + List outgoingEvents; + + wc.launchContainer(); + + + wc.assignTaskAttempt(wc.taskAttemptID); + + wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR); + wc.verifyState(AMContainerState.COMPLETED); + verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID); + + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM); + Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR, + ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause()); + assertFalse(wc.amContainer.isInErrorState()); + + // Container launched generated by NM call. + wc.containerLaunched(); + wc.verifyNoOutgoingEvents(); + + assertFalse(wc.amContainer.isInErrorState()); + } + @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) + public void testContainerCompletedAtLaunchingSpecificError() { + WrappedContainer wc = new WrappedContainer(); + List outgoingEvents; + + wc.launchContainer(); + + + wc.assignTaskAttempt(wc.taskAttemptID); + + wc.containerCompleted(ContainerExitStatus.ABORTED, TaskAttemptTerminationCause.NODE_FAILED); + wc.verifyState(AMContainerState.COMPLETED); + verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID); + + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_CONTAINER_TERMINATED); + Assert.assertEquals(TaskAttemptTerminationCause.NODE_FAILED, + ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause()); + + assertFalse(wc.amContainer.isInErrorState()); + + // Container launched generated by NM call. + wc.containerLaunched(); + wc.verifyNoOutgoingEvents(); + + assertFalse(wc.amContainer.isInErrorState()); + } + + @SuppressWarnings("rawtypes") + @Test (timeout=5000) public void testContainerCompletedAtIdle() { WrappedContainer wc = new WrappedContainer(); List outgoingEvents; @@ -514,7 +586,7 @@ public class TestAMContainer { wc.containerLaunched(); wc.verifyState(AMContainerState.IDLE); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID); verify(wc.tal).unregisterRunningContainer(wc.containerID); @@ -538,7 +610,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testContainerCompletedAtRunning() { WrappedContainer wc = new WrappedContainer(); List outgoingEvents; @@ -550,7 +622,7 @@ public class TestAMContainer { wc.pullTaskToRun(); wc.verifyState(AMContainerState.RUNNING); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID); verify(wc.tal).unregisterRunningContainer(wc.containerID); @@ -574,7 +646,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testContainerPreemptedAtRunning() { WrappedContainer wc = new WrappedContainer(); List outgoingEvents; @@ -586,7 +658,7 @@ public class TestAMContainer { wc.pullTaskToRun(); wc.verifyState(AMContainerState.RUNNING); - wc.containerCompleted(ContainerExitStatus.PREEMPTED); + wc.containerCompleted(ContainerExitStatus.PREEMPTED, TaskAttemptTerminationCause.EXTERNAL_PREEMPTION); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID); verify(wc.tal).unregisterRunningContainer(wc.containerID); @@ -594,6 +666,8 @@ public class TestAMContainer { verify(wc.chh).unregister(wc.containerID); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION, + ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause()); verifyUnOrderedOutgoingEventTypes(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM); @@ -608,9 +682,47 @@ public class TestAMContainer { assertFalse(wc.amContainer.isInErrorState()); } + + @SuppressWarnings("rawtypes") + @Test (timeout=5000) + public void testContainerInternallyPreemptedAtRunning() { + WrappedContainer wc = new WrappedContainer(); + List outgoingEvents; + + wc.launchContainer(); + + wc.assignTaskAttempt(wc.taskAttemptID); + wc.containerLaunched(); + wc.pullTaskToRun(); + wc.verifyState(AMContainerState.RUNNING); + + wc.containerCompleted(ContainerExitStatus.INVALID, TaskAttemptTerminationCause.INTERNAL_PREEMPTION); + wc.verifyState(AMContainerState.COMPLETED); + verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.chh).register(wc.containerID); + verify(wc.chh).unregister(wc.containerID); + + outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION, + ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause()); + verifyUnOrderedOutgoingEventTypes(outgoingEvents, + TaskAttemptEventType.TA_CONTAINER_TERMINATED); + + assertFalse(wc.amContainer.isInErrorState()); + + // Pending task complete. (Ideally, container should be dead at this point + // and this event should not be generated. Network timeout on NM-RM heartbeat + // can cause it to be genreated) + wc.taskAttemptSucceeded(wc.taskAttemptID); + wc.verifyNoOutgoingEvents(); + wc.verifyHistoryStopEvent(); + + assertFalse(wc.amContainer.isInErrorState()); + } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testContainerDiskFailedAtRunning() { WrappedContainer wc = new WrappedContainer(); List outgoingEvents; @@ -622,7 +734,7 @@ public class TestAMContainer { wc.pullTaskToRun(); wc.verifyState(AMContainerState.RUNNING); - wc.containerCompleted(ContainerExitStatus.DISKS_FAILED); + wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID); verify(wc.tal).unregisterRunningContainer(wc.containerID); @@ -630,6 +742,8 @@ public class TestAMContainer { verify(wc.chh).unregister(wc.containerID); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); + Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR, + ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause()); verifyUnOrderedOutgoingEventTypes(outgoingEvents, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM); @@ -646,7 +760,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testTaskAssignedToCompletedContainer() { WrappedContainer wc = new WrappedContainer(); List outgoingEvents; @@ -657,7 +771,7 @@ public class TestAMContainer { wc.pullTaskToRun(); wc.taskAttemptSucceeded(wc.taskAttemptID); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2); @@ -677,7 +791,7 @@ public class TestAMContainer { assertTrue(wc.amContainer.isInErrorState()); } - @Test + @Test (timeout=5000) public void testTaskPullAtLaunching() { WrappedContainer wc = new WrappedContainer(); @@ -690,7 +804,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testNodeFailedAtIdle() { WrappedContainer wc = new WrappedContainer(); List outgoingEvents; @@ -712,11 +826,11 @@ public class TestAMContainer { for (Event event : outgoingEvents) { if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) { TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event; - assertEquals("nodeFailed", nfEvent.getDiagnosticInfo()); + assertTrue(nfEvent.getDiagnosticInfo().contains("nodeFailed")); } } - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -726,7 +840,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testNodeFailedAtIdleMultipleAttempts() { WrappedContainer wc = new WrappedContainer(); List outgoingEvents; @@ -756,13 +870,13 @@ public class TestAMContainer { for (Event event : outgoingEvents) { if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) { TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event; - assertEquals("nodeFailed", nfEvent.getDiagnosticInfo()); + assertTrue(nfEvent.getDiagnosticInfo().contains("nodeFailed")); } } assertFalse(wc.amContainer.isInErrorState()); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyNoOutgoingEvents(); wc.verifyHistoryStopEvent(); @@ -772,7 +886,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testNodeFailedAtRunningMultipleAttempts() { WrappedContainer wc = new WrappedContainer(); List outgoingEvents; @@ -801,11 +915,11 @@ public class TestAMContainer { for (Event event : outgoingEvents) { if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) { TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event; - assertEquals("nodeFailed", nfEvent.getDiagnosticInfo()); + assertTrue(nfEvent.getDiagnosticInfo().contains("nodeFailed")); } } - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyHistoryStopEvent(); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -818,7 +932,7 @@ public class TestAMContainer { } @SuppressWarnings("rawtypes") - @Test + @Test (timeout=5000) public void testNodeFailedAtCompletedMultipleSuccessfulTAs() { WrappedContainer wc = new WrappedContainer(); List outgoingEvents; @@ -835,7 +949,7 @@ public class TestAMContainer { wc.taskAttemptSucceeded(taID2); wc.stopRequest(); wc.nmStopSent(); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); wc.nodeFailed(); @@ -849,7 +963,7 @@ public class TestAMContainer { assertEquals(2, wc.amContainer.getAllTaskAttempts().size()); } - @Test + @Test (timeout=5000) public void testDuplicateCompletedEvents() { WrappedContainer wc = new WrappedContainer(); @@ -865,17 +979,17 @@ public class TestAMContainer { wc.taskAttemptSucceeded(taID2); wc.stopRequest(); wc.nmStopSent(); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - wc.containerCompleted(false); + wc.containerCompleted(); wc.verifyNoOutgoingEvents(); wc.verifyHistoryStopEvent(); } - @Test + @Test (timeout=5000) public void testLocalResourceAddition() { WrappedContainer wc = new WrappedContainer(); @@ -926,13 +1040,13 @@ public class TestAMContainer { wc.taskAttemptSucceeded(taID3); // Verify references are cleared after a container completes. - wc.containerCompleted(false); + wc.containerCompleted(); assertNull(wc.amContainer.containerLocalResources); assertNull(wc.amContainer.additionalLocalResources); } @SuppressWarnings("unchecked") - @Test + @Test (timeout=5000) public void testCredentialsTransfer() { WrappedContainerMultipleDAGs wc = new WrappedContainerMultipleDAGs(); @@ -1183,15 +1297,15 @@ public class TestAMContainer { AMContainerEventType.C_NM_STOP_FAILED)); } - public void containerCompleted(boolean preempted) { + public void containerCompleted() { reset(eventHandler); - amContainer.handle(new AMContainerEventCompleted(containerID, - (preempted ? ContainerExitStatus.PREEMPTED : ContainerExitStatus.SUCCESS), null)); + amContainer.handle(new AMContainerEventCompleted(containerID, ContainerExitStatus.SUCCESS, null, + TaskAttemptTerminationCause.CONTAINER_EXITED)); } - public void containerCompleted(int exitStatus) { + public void containerCompleted(int exitStatus, TaskAttemptTerminationCause errCause) { reset(eventHandler); - amContainer.handle(new AMContainerEventCompleted(containerID, exitStatus, null)); + amContainer.handle(new AMContainerEventCompleted(containerID, exitStatus, null, errCause)); } public void containerTimedOut() { http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index 8913287..cd770a3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -21,6 +21,7 @@ package org.apache.tez.dag.history.events; import static org.junit.Assert.fail; import java.nio.ByteBuffer; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -44,6 +45,7 @@ import org.apache.tez.dag.app.dag.impl.VertexStats; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.SummaryEvent; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; @@ -471,7 +473,7 @@ public class TestHistoryEventsProtoConversion { TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1), "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, - null, null); + null, null, null); TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent) testProtoConversion(event); Assert.assertEquals(event.getTaskAttemptID(), @@ -491,7 +493,7 @@ public class TestHistoryEventsProtoConversion { TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1), "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, - "diagnose", new TezCounters()); + TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters()); TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent) testProtoConversion(event); Assert.assertEquals(event.getTaskAttemptID(), @@ -504,6 +506,8 @@ public class TestHistoryEventsProtoConversion { deserializedEvent.getState()); Assert.assertEquals(event.getCounters(), deserializedEvent.getCounters()); + Assert.assertEquals(event.getTaskAttemptError(), + deserializedEvent.getTaskAttemptError()); logEvents(event, deserializedEvent); } } http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java index a20c9fe..e0f8c21 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java @@ -59,6 +59,7 @@ import org.apache.tez.dag.history.events.VertexInitializedEvent; import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; import org.apache.tez.dag.history.events.VertexStartedEvent; import org.apache.tez.dag.history.utils.DAGUtils; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; @@ -155,7 +156,7 @@ public class TestHistoryEventJsonConversion { break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), - random.nextInt(), TaskAttemptState.FAILED, null, null); + random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null); break; case CONTAINER_LAUNCHED: event = new ContainerLaunchedEvent(containerId, random.nextInt(), http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index 4b6d648..91346ae 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -376,6 +376,9 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name()); + if (event.getTaskAttemptError() != null) { + atsEntity.addOtherInfo(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, event.getTaskAttemptError().name()); + } atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); atsEntity.addOtherInfo(ATSConstants.COUNTERS, DAGUtils.convertCountersToATSMap(event.getCounters())); http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index ce47820..0f2942c 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -63,6 +63,7 @@ import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent; import org.apache.tez.dag.history.events.VertexStartedEvent; import org.apache.tez.dag.history.logging.EntityTypes; import org.apache.tez.dag.history.utils.DAGUtils; +import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; @@ -157,7 +158,7 @@ public class TestHistoryEventTimelineConversion { break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), - random.nextInt(), TaskAttemptState.FAILED, null, null); + random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null); break; case CONTAINER_LAUNCHED: event = new ContainerLaunchedEvent(containerId, random.nextInt(),