tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-128. Add additional unit tests for TaskAttempt.
Date Tue, 14 May 2013 22:38:42 GMT
Updated Branches:
  refs/heads/TEZ-1 4c89e15c6 -> d660948d7


TEZ-128. Add additional unit tests for TaskAttempt.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/d660948d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/d660948d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/d660948d

Branch: refs/heads/TEZ-1
Commit: d660948d7eaa3536a3bcd97e16524aa626b4ad7c
Parents: 4c89e15
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue May 14 15:31:07 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue May 14 15:31:07 2013 -0700

----------------------------------------------------------------------
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java      |    5 -
 .../tez/dag/app/dag/impl/TestTaskAttempt.java      |  409 ++++++++++++---
 .../apache/tez/dag/app/dag/impl/TestTaskImpl.java  |    2 +
 3 files changed, 325 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d660948d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 2564ffa..264b3ad 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -116,7 +116,6 @@ public class TaskAttemptImpl implements TaskAttempt,
   protected EventHandler eventHandler;
   private final TezTaskAttemptID attemptId;
   private final Clock clock;
-//  private final TaskAttemptListener taskAttemptListener;
   private final List<String> diagnostics = new ArrayList<String>();
   private final Lock readLock;
   private final Lock writeLock;
@@ -923,10 +922,6 @@ public class TaskAttemptImpl implements TaskAttempt,
       // Send out events to the Task - indicating TaskAttemptTermination(F/K)
       ta.sendEvent(new TaskEventTAUpdate(ta.attemptId, helper
           .getTaskEventType()));
-
-      if (event instanceof DiagnosableEvent) {
-        ta.addDiagnosticInfo(((DiagnosableEvent) event).getDiagnosticInfo());
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d660948d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index a18bd3b..369bced 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -20,13 +20,18 @@ package org.apache.tez.dag.app.dag.impl;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -61,10 +66,14 @@ import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
+import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
+import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -72,6 +81,7 @@ import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 @SuppressWarnings({ "unchecked", "rawtypes" })
 public class TestTaskAttempt {
@@ -102,88 +112,96 @@ public class TestTaskAttempt {
   // testMRAppHistory(app);
   // }
 
-  // @Test
-  // // Verifies that the launch request is based on the hosts.
-  // // TODO Move to the client.
-  // // TODO Add a test that verifies that the LocationHint is used as it should
-  // be.
-  // public void testSingleRackRequest() throws Exception {
-  // TaskAttemptImpl.ScheduleTaskattemptTransition sta =
-  // new TaskAttemptImpl.ScheduleTaskattemptTransition();
-  //
-  // EventHandler eventHandler = mock(EventHandler.class);
-  // String[] hosts = new String[3];
-  // hosts[0] = "host1";
-  // hosts[1] = "host2";
-  // hosts[2] = "host3";
-  // TaskSplitMetaInfo splitInfo = new TaskSplitMetaInfo(hosts, 0,
-  // 128 * 1024 * 1024l);
-  //
-  // TaskAttemptImpl mockTaskAttempt = createMapTaskAttemptImpl2ForTest(
-  // eventHandler, splitInfo);
-  // TaskAttemptEventSchedule mockTAEvent =
-  // mock(TaskAttemptEventSchedule.class);
-  // doReturn(false).when(mockTAEvent).isRescheduled();
-  //
-  // sta.transition(mockTaskAttempt, mockTAEvent);
-  //
-  // ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
-  // verify(eventHandler, times(2)).handle(arg.capture());
-  // if (!(arg.getAllValues().get(1) instanceof
-  // AMSchedulerTALaunchRequestEvent)) {
-  // Assert.fail("Second Event not of type ContainerRequestEvent");
-  // }
-  // AMSchedulerTALaunchRequestEvent tlrE = (AMSchedulerTALaunchRequestEvent)
-  // arg
-  // .getAllValues().get(1);
-  // String[] requestedRacks = tlrE.getRacks();
-  // // Only a single occurrence of /DefaultRack
-  // assertEquals(1, requestedRacks.length);
-  // }
+  @Test
+  public void testLocalityRequest() {
 
-  // @Test
-  // // Tests that an attempt is made to resolve the localized hosts to racks.
-  // // TODO Move to the client.
-  // public void testHostResolveAttempt() throws Exception {
-  // TaskAttemptImpl.ScheduleTaskattemptTransition sta =
-  // new TaskAttemptImpl.ScheduleTaskattemptTransition();
-  //
-  // EventHandler eventHandler = mock(EventHandler.class);
-  // String hosts[] = new String[] {"192.168.1.1", "host2", "host3"};
-  // String resolved[] = new String[] {"host1", "host2", "host3"};
-  // TaskSplitMetaInfo splitInfo =
-  // new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l);
-  //
-  // TaskAttemptImpl mockTaskAttempt =
-  // createMapTaskAttemptImpl2ForTest(eventHandler, splitInfo);
-  // TaskAttemptImpl spyTa = spy(mockTaskAttempt);
-  // when(spyTa.resolveHosts(hosts)).thenReturn(resolved);
-  //
-  // TaskAttemptEventSchedule mockTAEvent =
-  // mock(TaskAttemptEventSchedule.class);
-  // doReturn(false).when(mockTAEvent).isRescheduled();
-  //
-  // sta.transition(spyTa, mockTAEvent);
-  // verify(spyTa).resolveHosts(hosts);
-  // ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
-  // verify(eventHandler, times(2)).handle(arg.capture());
-  // if (!(arg.getAllValues().get(1) instanceof
-  // AMSchedulerTALaunchRequestEvent)) {
-  // Assert.fail("Second Event not of type ContainerRequestEvent");
-  // }
-  // Map<String, Boolean> expected = new HashMap<String, Boolean>();
-  // expected.put("host1", true);
-  // expected.put("host2", true);
-  // expected.put("host3", true);
-  // AMSchedulerTALaunchRequestEvent cre =
-  // (AMSchedulerTALaunchRequestEvent) arg.getAllValues().get(1);
-  // String[] requestedHosts = cre.getHosts();
-  // for (String h : requestedHosts) {
-  // expected.remove(h);
-  // }
-  // assertEquals(0, expected.size());
-  // }
-  //
+    TaskAttemptImpl.ScheduleTaskattemptTransition sta =
+        new TaskAttemptImpl.ScheduleTaskattemptTransition();
+
+    EventHandler eventHandler = mock(EventHandler.class);
+    String[] hosts = new String[3];
+    hosts[0] = "host1";
+    hosts[1] = "host2";
+    hosts[2] = "host3";
+    TaskLocationHint locationHint = new TaskLocationHint(hosts, null);
+
+    TezTaskID taskID = new TezTaskID(
+        new TezVertexID(new TezDAGID("1", 1, 1), 1), 1);
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        mock(TaskAttemptListener.class), 1, new TezConfiguration(),
+        mock(Token.class), new Credentials(), new SystemClock(),
+        mock(TaskHeartbeatHandler.class), mock(AppContext.class),
+        MAP_PROCESSOR_NAME, locationHint, BuilderUtils.newResource(1024, 1),
+        new HashMap<String, LocalResource>(), new HashMap<String, String>(),
+        "", false);
+
+    TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class);
+
+    sta.transition(taImpl, sEvent);
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(1)).handle(arg.capture());
+    if (!(arg.getAllValues().get(0) instanceof AMSchedulerEventTALaunchRequest)) {
+      fail("Second event not of type "
+          + AMSchedulerEventTALaunchRequest.class.getName());
+    }
+    // TODO Move the Rack request check to the client after TEZ-125 is fixed.
+    AMSchedulerEventTALaunchRequest lre = (AMSchedulerEventTALaunchRequest) arg
+        .getAllValues().get(0);
+    String[] requestedRacks = lre.getRacks();
+    assertEquals(1, requestedRacks.length);
+    assertEquals(3, lre.getHosts().length);
+    for (int i = 0; i < 3; i++) {
+      assertEquals("host" + (i + 1), lre.getHosts()[i]);
+    }
+  }
+
+
+  @Test
+  // Tests that an attempt is made to resolve the localized hosts to racks.
+  // TODO Move to the client post TEZ-125.
+  public void testHostResolveAttempt() throws Exception {
+    TaskAttemptImpl.ScheduleTaskattemptTransition sta =
+        new TaskAttemptImpl.ScheduleTaskattemptTransition();
+
+    EventHandler eventHandler = mock(EventHandler.class);
+    String hosts[] = new String[] { "192.168.1.1", "host2", "host3" };
+    String resolved[] = new String[] { "host1", "host2", "host3" };
+    TaskLocationHint locationHint = new TaskLocationHint(hosts, null);
+
+    TezTaskID taskID = new TezTaskID(
+        new TezVertexID(new TezDAGID("1", 1, 1), 1), 1);
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        mock(TaskAttemptListener.class), 1, new TezConfiguration(),
+        mock(Token.class), new Credentials(), new SystemClock(),
+        mock(TaskHeartbeatHandler.class), mock(AppContext.class),
+        MAP_PROCESSOR_NAME, locationHint, BuilderUtils.newResource(1024, 1),
+        new HashMap<String, LocalResource>(), new HashMap<String, String>(),
+        "", false);
+    TaskAttemptImpl spyTa = spy(taImpl);
+    when(spyTa.resolveHosts(hosts)).thenReturn(resolved);
+
+    TaskAttemptEventSchedule mockTAEvent = mock(TaskAttemptEventSchedule.class);
+
+    sta.transition(spyTa, mockTAEvent);
+    verify(spyTa).resolveHosts(hosts);
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(1)).handle(arg.capture());
+    if (!(arg.getAllValues().get(0) instanceof AMSchedulerEventTALaunchRequest)) {
+      fail("Second Event not of type ContainerRequestEvent");
+    }
+    Map<String, Boolean> expected = new HashMap<String, Boolean>();
+    expected.put("host1", true);
+    expected.put("host2", true);
+    expected.put("host3", true);
+    AMSchedulerEventTALaunchRequest cre = (AMSchedulerEventTALaunchRequest) arg
+        .getAllValues().get(0);
+    String[] requestedHosts = cre.getHosts();
+    for (String h : requestedHosts) {
+      expected.remove(h);
+    }
+    assertEquals(0, expected.size());
+  }
 
   // @Test
   // // Verifies accounting of slot_milli counters. Time spent in running tasks.
@@ -325,10 +343,105 @@ public class TestTaskAttempt {
     // null));
     assertFalse(eventHandler.internalError);
   }
+  
+  @Test
+  // Ensure ContainerTerminating and ContainerTerminated is handled correctly by
+  // the TaskAttempt
+  public void testContainerTerminationWhileRunning() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 0);
+    TezDAGID dagID = new TezDAGID(appId, 1);
+    TezVertexID vertexID = new TezVertexID(dagID, 1);
+    TezTaskID taskID = new TezTaskID(vertexID, 1);
+    TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
 
-  // TODO Add a similar test for TERMINATING.
-  // Ensure ContainerTerminated is handled correctly by the TaskAttempt
+    MockEventHandler eventHandler = spy(new MockEventHandler());
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    TezConfiguration tezConf = new TezConfiguration();
+    tezConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    tezConf.setBoolean("fs.file.impl.disable.cache", true);
+
+    TaskLocationHint locationHint = new TaskLocationHint(
+        new String[] { "127.0.0.1" }, null);
+    Resource resource = BuilderUtils.newResource(1024, 1);
+    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+    Map<String, String> environment = new HashMap<String, String>();
+    String javaOpts = "";
+
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AppContext appCtx = mock(AppContext.class);
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        appCtx);
+    containers.addContainerIfNew(container);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, 1, tezConf, mock(Token.class), new Credentials(),
+        new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
+        MAP_PROCESSOR_NAME, locationHint, resource, localResources,
+        environment, javaOpts, false);
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+        null, -1));
+    assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+
+    int expectedEventsAtRunning = 3;
+    verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
+
+    taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID,
+        "Terminating"));
+    assertFalse(
+        "InternalError occurred trying to handle TA_CONTAINER_TERMINATING",
+        eventHandler.internalError);
+    
+    assertEquals("Task attempt is not in the  FAILED state", taImpl.getState(),
+        TaskAttemptState.FAILED);
+
+    assertEquals(1, taImpl.getDiagnostics().size());
+    assertEquals("Terminating", taImpl.getDiagnostics().get(0));
+
+    int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3;
+    arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
+    
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+    
+    taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
+        "Terminated"));
+    int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0;
+    arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture());
+
+    assertEquals(2, taImpl.getDiagnostics().size());
+    assertEquals("Terminated", taImpl.getDiagnostics().get(1));
+  }
+
+  
   @Test
+  // Ensure ContainerTerminated is handled correctly by the TaskAttempt
   public void testContainerTerminatedWhileRunning() throws Exception {
     ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
     ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
@@ -382,11 +495,14 @@ public class TestTaskAttempt {
         null, -1));
     assertEquals("Task attempt is not in running state", taImpl.getState(),
         TaskAttemptState.RUNNING);
-    taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, null));
+    taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "Terminated"));
     assertFalse(
         "InternalError occurred trying to handle TA_CONTAINER_TERMINATED",
         eventHandler.internalError);
-    // TODO Verify diagnostics
+    
+    assertEquals("Terminated", taImpl.getDiagnostics().get(0));
+    
+    // TODO Ensure TA_TERMINATING after this is ingored.
   }
 
   @Test
@@ -456,6 +572,98 @@ public class TestTaskAttempt {
   }
 
   @Test
+  // Ensure ContainerTerminating and ContainerTerminated is handled correctly by
+  // the TaskAttempt
+  public void testContainerTerminatedAfterSuccess() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 0);
+    TezDAGID dagID = new TezDAGID(appId, 1);
+    TezVertexID vertexID = new TezVertexID(dagID, 1);
+    TezTaskID taskID = new TezTaskID(vertexID, 1);
+    TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
+
+    MockEventHandler eventHandler = spy(new MockEventHandler());
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    TezConfiguration tezConf = new TezConfiguration();
+    tezConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    tezConf.setBoolean("fs.file.impl.disable.cache", true);
+
+    TaskLocationHint locationHint = new TaskLocationHint(
+        new String[] { "127.0.0.1" }, null);
+    Resource resource = BuilderUtils.newResource(1024, 1);
+    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+    Map<String, String> environment = new HashMap<String, String>();
+    String javaOpts = "";
+
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AppContext appCtx = mock(AppContext.class);
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        appCtx);
+    containers.addContainerIfNew(container);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, 1, tezConf, mock(Token.class), new Credentials(),
+        new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
+        MAP_PROCESSOR_NAME, locationHint, resource, localResources,
+        environment, javaOpts, false);
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+        null, -1));
+    assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+
+    int expectedEventsAtRunning = 3;
+    verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
+
+    taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE));
+
+    assertEquals("Task attempt is not in the  SUCCEEDED state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+
+    assertEquals(0, taImpl.getDiagnostics().size());
+
+    int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3;
+    arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
+    
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+    
+    taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
+        "Terminated"));
+    int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0;
+    arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture());
+
+    // Verify that the diagnostic message included in the Terminated event is not
+    // captured - TA already succeeded.
+    assertEquals(0, taImpl.getDiagnostics().size());
+  }
+
+  
+  @Test
   // Verifies that multiple TooManyFetchFailures are handled correctly by the
   // TaskAttempt.
   public void testMultipleTooManyFetchFailures() throws Exception {
@@ -467,7 +675,8 @@ public class TestTaskAttempt {
     TezTaskID taskID = new TezTaskID(vertexID, 1);
     TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
 
-    MockEventHandler eventHandler = new MockEventHandler();
+    MockEventHandler mockEh = new MockEventHandler();
+    MockEventHandler eventHandler = spy(mockEh);
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
     when(taListener.getAddress()).thenReturn(
         new InetSocketAddress("localhost", 0));
@@ -513,8 +722,20 @@ public class TestTaskAttempt {
         TaskAttemptEventType.TA_DONE));
     assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
         TaskAttemptState.SUCCEEDED);
+    
+    int expectedEventsTillSucceeded = 6;
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(expectedEventsTillSucceeded)).handle(arg.capture());
+    verifyEventType(arg.getAllValues(), TaskEventTAUpdate.class, 2);
+    
     taImpl.handle(new TaskAttemptEvent(taskAttemptID,
         TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES));
+    int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 3;
+    verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle(arg.capture());
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsTillSucceeded,
+            expectedEventsAfterFetchFailure), TaskEventTAUpdate.class, 1);
+
     assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
         TaskAttemptState.FAILED);
     taImpl.handle(new TaskAttemptEvent(taskAttemptID,
@@ -524,8 +745,24 @@ public class TestTaskAttempt {
     assertFalse(
         "InternalError occurred trying to handle TA_TOO_MANY_FETCH_FAILURES",
         eventHandler.internalError);
+    // No new events.
+    verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle(
+        arg.capture());
   }
 
+  private void verifyEventType(List<Event> events,
+      Class<? extends Event> eventClass, int expectedOccurences) {
+    int count = 0;
+    for (Event e : events) {
+      if (eventClass.isInstance(e)) {
+        count++;
+      }
+    }
+    assertEquals(
+        "Mismatch in num occurences of event: " + eventClass.getCanonicalName(),
+        expectedOccurences, count);
+  }
+  
   public static class MockEventHandler implements EventHandler {
     public boolean internalError;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d660948d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 516ea5e..100f2aa 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -363,6 +363,8 @@ public class TestTaskImpl {
     assertTaskSucceededState();
 
   }
+  
+  // TODO Add test to validate the correct commit attempt.
 
   @SuppressWarnings("rawtypes")
   private class MockTaskImpl extends TaskImpl {


Mime
View raw message