tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-738. Hive query fails with Invalid event: TA_CONTAINER_PREEMPTED at SUCCEEDED (Hitesh Shah via bikas)
Date Fri, 17 Jan 2014 05:48:16 GMT
Updated Branches:
  refs/heads/master 9138f7906 -> 4ba6f07b9


TEZ-738. Hive query fails with Invalid event: TA_CONTAINER_PREEMPTED at SUCCEEDED (Hitesh
Shah via bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/4ba6f07b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/4ba6f07b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/4ba6f07b

Branch: refs/heads/master
Commit: 4ba6f07b9284b7c750c02710f0e5193a3d7bef72
Parents: 9138f79
Author: Bikas Saha <bikas@apache.org>
Authored: Thu Jan 16 21:48:03 2014 -0800
Committer: Bikas Saha <bikas@apache.org>
Committed: Thu Jan 16 21:48:03 2014 -0800

----------------------------------------------------------------------
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |   3 +-
 .../app/rm/TaskSchedulerAppCallbackWrapper.java |   4 +-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 107 +++++++++++++++++--
 3 files changed, 100 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4ba6f07b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 8ecabd7..dc806d7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -234,8 +234,7 @@ public class TaskAttemptImpl implements TaskAttempt,
         .addTransition(TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptStateInternal.KILLED,
TaskAttemptStateInternal.SUCCEEDED), TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedAfterSuccessTransition())
         .addTransition(TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptStateInternal.KILLED,
TaskAttemptStateInternal.SUCCEEDED), TaskAttemptEventType.TA_NODE_FAILED, new TerminatedAfterSuccessTransition())
         .addTransition(TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptStateInternal.FAILED,
TaskAttemptStateInternal.SUCCEEDED), TaskAttemptEventType.TA_OUTPUT_FAILED, new OutputReportedFailedTransition())
-        .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED,
EnumSet.of(TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_CONTAINER_TERMINATING,
TaskAttemptEventType.TA_CONTAINER_TERMINATED))
-
+        .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED,
EnumSet.of(TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_CONTAINER_TERMINATING,
TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_CONTAINER_PREEMPTED))
 
         .installTopology();
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4ba6f07b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
index 2debf06..c690926 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
@@ -60,11 +60,11 @@ class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback
{
       ExecutorService executorService) {
     this.real = real;
     this.executorService = executorService;
-    this.completionService = craeteAppCallbackCompletionService();
+    this.completionService = createAppCallbackCompletionService();
   }
 
   @VisibleForTesting
-  CompletionService craeteAppCallbackCompletionService() {
+  CompletionService createAppCallbackCompletionService() {
     return new ExecutorCompletionService(this.executorService);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4ba6f07b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index c9448a0..87a43fc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -118,7 +118,7 @@ public class TestTaskAttempt {
   // testMRAppHistory(app);
   // }
 
-  @Test
+  @Test(timeout = 5000)
   public void testLocalityRequest() {
 
     TaskAttemptImpl.ScheduleTaskattemptTransition sta =
@@ -160,7 +160,7 @@ public class TestTaskAttempt {
   }
 
 
-  @Test
+  @Test(timeout = 5000)
   // Tests that an attempt is made to resolve the localized hosts to racks.
   // TODO Move to the client post TEZ-125.
   public void testHostResolveAttempt() throws Exception {
@@ -208,7 +208,7 @@ public class TestTaskAttempt {
     assertEquals(0, expected.size());
   }
 
-  // @Test
+  // @Test(timeout = 5000)
   // // Verifies accounting of slot_milli counters. Time spent in running tasks.
   // // TODO Fix this test to work without MRApp.
   // public void testSlotMillisCounterUpdate() throws Exception {
@@ -295,7 +295,7 @@ public class TestTaskAttempt {
   // report.getTaskAttemptState());
   // }
 
-  @Test
+  @Test(timeout = 5000)
   // Ensure the dag does not go into an error state if a attempt kill is
   // received while STARTING
   public void testLaunchFailedWhileKilling() throws Exception {
@@ -345,7 +345,7 @@ public class TestTaskAttempt {
     assertFalse(eventHandler.internalError);
   }
 
-  @Test
+  @Test(timeout = 5000)
   // Ensure ContainerTerminating and ContainerTerminated is handled correctly by
   // the TaskAttempt
   public void testContainerTerminationWhileRunning() throws Exception {
@@ -437,7 +437,7 @@ public class TestTaskAttempt {
   }
 
 
-  @Test
+  @Test(timeout = 5000)
   // Ensure ContainerTerminated is handled correctly by the TaskAttempt
   public void testContainerTerminatedWhileRunning() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(1, 2);
@@ -498,7 +498,7 @@ public class TestTaskAttempt {
     // TODO Ensure TA_TERMINATING after this is ingored.
   }
 
-  @Test
+  @Test(timeout = 5000)
   // Ensure ContainerTerminating and ContainerTerminated is handled correctly by
   // the TaskAttempt
   public void testContainerTerminatedAfterSuccess() throws Exception {
@@ -585,7 +585,94 @@ public class TestTaskAttempt {
     assertEquals(0, taImpl.getDiagnostics().size());
   }
 
-  @Test
+  @Test(timeout = 5000)
+  // Ensure Container Preemption race with task completion is handled correctly by
+  // the TaskAttempt
+  public void testContainerPreemptedAfterSuccess() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+    TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
+
+    MockEventHandler eventHandler = spy(new MockEventHandler());
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
+
+    TaskLocationHint locationHint = new TaskLocationHint(
+        new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
+    Resource resource = Resource.newInstance(1024, 1);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AppContext appCtx = mock(AppContext.class);
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        new ContainerContextMatcher(), appCtx);
+    containers.addContainerIfNew(container);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, taskConf, new SystemClock(),
+        mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
+        resource, createFakeContainerContext(), false);
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+        null));
+    assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+
+    int expectedEventsAtRunning = 3;
+    verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
+
+    taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE));
+
+    assertEquals("Task attempt is not in the  SUCCEEDED state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+
+    assertEquals(0, taImpl.getDiagnostics().size());
+
+    int expectedEventsAfterTerminating = expectedEventsAtRunning + 3;
+    arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(expectedEventsAfterTerminating)).handle(arg.capture());
+
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEventsAfterTerminating), TaskEventTAUpdate.class, 1);
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtRunning,
+            expectedEventsAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+
+    taImpl.handle(new TaskAttemptEvent(taskAttemptID,
+        TaskAttemptEventType.TA_CONTAINER_PREEMPTED));
+    int expectedEventAfterTerminated = expectedEventsAfterTerminating + 0;
+    arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture());
+
+    // Verify that the diagnostic message included in the Terminated event is not
+    // captured - TA already succeeded.
+    assertEquals(0, taImpl.getDiagnostics().size());
+  }
+
+  @Test(timeout = 5000)
   // Ensure node failure on Successful Non-Leaf tasks cause them to be marked as KILLED
   public void testNodeFailedNonLeafVertex() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(1, 2);
@@ -676,7 +763,7 @@ public class TestTaskAttempt {
         taImpl.getState());
   }
   
-  @Test
+  @Test(timeout = 5000)
   // Ensure node failure on Successful Leaf tasks do not cause them to be marked as KILLED
   public void testNodeFailedLeafVertex() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(1, 2);
@@ -764,7 +851,7 @@ public class TestTaskAttempt {
         taImpl.getState());
   }
 
-  @Test
+  @Test(timeout = 5000)
   // Verifies that multiple TooManyFetchFailures are handled correctly by the
   // TaskAttempt.
   public void testMultipleOutputFailed() throws Exception {


Mime
View raw message