tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhiyu...@apache.org
Subject [04/11] tez git commit: TEZ-3791. Failed/Killed task can throw InvalidStateTransitonException when a new attempt is launched (Kuhu Shukla via jeagles)
Date Fri, 14 Jul 2017 18:05:30 GMT
TEZ-3791. Failed/Killed task can throw InvalidStateTransitonException when a new attempt is
launched (Kuhu Shukla via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/87b60237
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/87b60237
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/87b60237

Branch: refs/heads/branch-0.9.0
Commit: 87b6023717e8bff3fb76dcc17bc64852439b1123
Parents: 6d26a5c
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Tue Jul 11 15:29:09 2017 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Tue Jul 11 15:29:09 2017 -0500

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |   6 +-
 .../tez/dag/app/dag/impl/TestTaskImpl.java      | 108 +++++++++++++++++++
 2 files changed, 113 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/87b60237/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index c8e911e..bed4141 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -268,7 +268,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         EnumSet.of(
             TaskEventType.T_TERMINATE,
             TaskEventType.T_SCHEDULE,
-            TaskEventType.T_ADD_SPEC_ATTEMPT))
+            TaskEventType.T_ADD_SPEC_ATTEMPT,
+            TaskEventType.T_ATTEMPT_LAUNCHED))
 
     // Transitions from KILLED state
     // Ignorable event: T_ATTEMPT_KILLED
@@ -289,6 +290,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
             TaskEventType.T_TERMINATE,
             TaskEventType.T_SCHEDULE,
             TaskEventType.T_ADD_SPEC_ATTEMPT,
+            TaskEventType.T_ATTEMPT_LAUNCHED,
+            TaskEventType.T_ATTEMPT_SUCCEEDED,
+            TaskEventType.T_ATTEMPT_FAILED,
             TaskEventType.T_ATTEMPT_KILLED))
 
     // create the topology tables

http://git-wip-us.apache.org/repos/asf/tez/blob/87b60237/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index e5d564e..e03e282 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -78,6 +78,7 @@ import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.TaskStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
@@ -1081,6 +1082,113 @@ public class TestTaskImpl {
         expectedIncompleteAttempts, mockTask.getUncompletedAttemptsCount());
   }
 
+  @Test (timeout = 30000)
+  public void testFailedTaskTransitionWithLaunchedAttempt() throws InterruptedException {
+    Configuration newConf = new Configuration(conf);
+    newConf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
+    Vertex vertex = mock(Vertex.class);
+    doReturn(new VertexImpl.VertexConfigImpl(newConf)).when(vertex).getVertexConfig();
+    mockTask = new MockTaskImpl(vertexId, partition,
+        eventHandler, conf, taskCommunicatorManagerInterface, clock,
+        taskHeartbeatHandler, appContext, leafVertex,
+        taskResource, containerContext, vertex);
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(firstMockTaskAttempt.getID());
+    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
+    MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(secondMockTaskAttempt.getID());
+
+    firstMockTaskAttempt.handle(new TaskAttemptEventSchedule(
+        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10));
+    secondMockTaskAttempt.handle(new TaskAttemptEventSchedule(
+        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10));
+    firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
+        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId()));
+    secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
+        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId()));
+
+    secondMockTaskAttempt.handle(
+        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString())));
+    firstMockTaskAttempt.handle(
+        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString())));
+    secondMockTaskAttempt.handle(
+        new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()),
+            TaskAttemptEventType.TA_FAILED,TaskFailureType.NON_FATAL, "test",
+            TaskAttemptTerminationCause.NO_PROGRESS));
+    firstMockTaskAttempt.handle(
+        new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),
+            TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "test",
+            TaskAttemptTerminationCause.NO_PROGRESS));
+
+    firstMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId,
+        firstMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS));
+    secondMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId,
+        secondMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS));
+    mockTask.handle(new TaskEventTAFailed(secondMockTaskAttempt.getID(), TaskFailureType.NON_FATAL,
+        mock(TaskAttemptEvent.class)));
+    mockTask.handle(new TaskEventTAFailed(firstMockTaskAttempt.getID(), TaskFailureType.NON_FATAL,
+        mock(TaskAttemptEvent.class)));
+    assertTrue("Attempts should have failed!",
+        firstMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.FAILED
+            && secondMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.FAILED);
+    assertEquals("Task should have no uncompleted attempts!", 0, mockTask.getUncompletedAttemptsCount());
+    assertTrue("Task should have failed!", mockTask.getState() == TaskState.FAILED);
+    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
+    MockTaskAttemptImpl thirdMockTaskAttempt = mockTask.getLastAttempt();
+    mockTask.handle(createTaskTALauncherEvent(thirdMockTaskAttempt.getID()));
+  }
+
+  @Test (timeout = 30000)
+  public void testKilledTaskTransitionWithLaunchedAttempt() throws InterruptedException {
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(firstMockTaskAttempt.getID());
+    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
+    MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(secondMockTaskAttempt.getID());
+
+    firstMockTaskAttempt.handle(new TaskAttemptEventSchedule(
+        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10));
+    secondMockTaskAttempt.handle(new TaskAttemptEventSchedule(
+        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10));
+    firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
+        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId()));
+    secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
+        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId()));
+
+    secondMockTaskAttempt.handle(
+        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString())));
+    firstMockTaskAttempt.handle(
+        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString())));
+    mockTask.handle(new TaskEventTermination(mockTask.getTaskId(),
+        TaskAttemptTerminationCause.FRAMEWORK_ERROR, "test"));
+    secondMockTaskAttempt.handle(
+        new TaskAttemptEventAttemptKilled(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()),"test",
+            TaskAttemptTerminationCause.FRAMEWORK_ERROR));
+    mockTask.handle(new TaskEventTAKilled(secondMockTaskAttempt.getID(),
+        new TaskAttemptEvent(secondMockTaskAttempt.getID(), TaskAttemptEventType.TA_KILLED)));
+    firstMockTaskAttempt.handle(
+        new TaskAttemptEventAttemptKilled(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),"test",
+            TaskAttemptTerminationCause.FRAMEWORK_ERROR));
+    mockTask.handle(new TaskEventTAKilled(firstMockTaskAttempt.getID(),
+        new TaskAttemptEvent(firstMockTaskAttempt.getID(), TaskAttemptEventType.TA_KILLED)));
+    firstMockTaskAttempt.handle(
+        new TaskAttemptEventAttemptKilled(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),"test",
+            TaskAttemptTerminationCause.FRAMEWORK_ERROR));
+    assertEquals("Task should have been killed!", mockTask.getInternalState(), TaskStateInternal.KILLED);
+    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
+    MockTaskAttemptImpl thirdMockTaskAttempt = mockTask.getLastAttempt();
+    mockTask.handle(createTaskTALauncherEvent(thirdMockTaskAttempt.getID()));
+    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
+    MockTaskAttemptImpl fourthMockTaskAttempt = mockTask.getLastAttempt();
+    mockTask.handle(createTaskTASucceededEvent(fourthMockTaskAttempt.getID()));
+    MockTaskAttemptImpl fifthMockTaskAttempt = mockTask.getLastAttempt();
+    mockTask.handle(createTaskTAFailedEvent(fifthMockTaskAttempt.getID()));
+  }
+
   // TODO Add test to validate the correct commit attempt.
 
 


Mime
View raw message