tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject [2/2] tez git commit: TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED (jlowe)
Date Thu, 21 Jan 2016 19:04:40 GMT
TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED (jlowe)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/238c3ad8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/238c3ad8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/238c3ad8

Branch: refs/heads/branch-0.7
Commit: 238c3ad8fb007447830ce40fa2b4c498fb80a7f5
Parents: 09d8c82
Author: Jason Lowe <jlowe@apache.org>
Authored: Thu Jan 21 19:04:17 2016 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Thu Jan 21 19:04:17 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  4 +-
 .../tez/dag/app/dag/impl/TestTaskImpl.java      | 73 ++++++++++++++++++++
 3 files changed, 77 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/238c3ad8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4457dc4..bd3e64a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES
+  TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED
   TEZ-3046. Compilation issue in tez-runtime-internals of branch-0.7
   TEZ-2937. Can Processor.close() be called after closing inputs and outputs?
   TEZ-3037. History URL should be set regardless of which history logging service is enabled.

http://git-wip-us.apache.org/repos/asf/tez/blob/238c3ad8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index e5e0a37..f78932b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -261,7 +261,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
             TaskEventType.T_TERMINATE,
             TaskEventType.T_SCHEDULE,
             TaskEventType.T_ADD_SPEC_ATTEMPT,
-            TaskEventType.T_ATTEMPT_KILLED))
+            TaskEventType.T_ATTEMPT_FAILED,
+            TaskEventType.T_ATTEMPT_KILLED,
+            TaskEventType.T_ATTEMPT_SUCCEEDED))
 
     // Transitions from KILLED state
     // Ignorable event: T_ATTEMPT_KILLED

http://git-wip-us.apache.org/repos/asf/tez/blob/238c3ad8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 1ee6c4e..8852d93 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.app.AppContext;
@@ -677,6 +678,78 @@ public class TestTaskImpl {
     assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN.name()));
   }
 
+  @Test(timeout = 20000)
+  public void testFailedThenSpeculativeFailed() {
+    conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
+    mockTask = new MockTaskImpl(vertexId, partition,
+        eventHandler, conf, taskAttemptListener, clock,
+        taskHeartbeatHandler, appContext, leafVertex,
+        taskResource, containerContext, mock(Vertex.class));
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(firstAttempt.getID());
+    updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
+
+    // Add a speculative task attempt
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(specAttempt.getID());
+    updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
+    assertEquals(2, mockTask.getAttemptList().size());
+
+    // Fail the first attempt
+    updateAttemptState(firstAttempt, TaskAttemptState.FAILED);
+    mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
+        TaskEventType.T_ATTEMPT_FAILED));
+    assertEquals(TaskState.FAILED, mockTask.getState());
+    assertEquals(2, mockTask.getAttemptList().size());
+
+    // Now fail the speculative attempt
+    updateAttemptState(specAttempt, TaskAttemptState.FAILED);
+    mockTask.handle(new TaskEventTAUpdate(specAttempt.getID(),
+        TaskEventType.T_ATTEMPT_FAILED));
+    assertEquals(TaskState.FAILED, mockTask.getState());
+    assertEquals(2, mockTask.getAttemptList().size());
+  }
+
+  @Test(timeout = 20000)
+  public void testFailedThenSpeculativeSucceeded() {
+    conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
+    mockTask = new MockTaskImpl(vertexId, partition,
+        eventHandler, conf, taskAttemptListener, clock,
+        taskHeartbeatHandler, appContext, leafVertex,
+        taskResource, containerContext, mock(Vertex.class));
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(firstAttempt.getID());
+    updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
+
+    // Add a speculative task attempt
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(specAttempt.getID());
+    updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
+    assertEquals(2, mockTask.getAttemptList().size());
+
+    // Fail the first attempt
+    updateAttemptState(firstAttempt, TaskAttemptState.FAILED);
+    mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
+        TaskEventType.T_ATTEMPT_FAILED));
+    assertEquals(TaskState.FAILED, mockTask.getState());
+    assertEquals(2, mockTask.getAttemptList().size());
+
+    // Now succeed the speculative attempt
+    updateAttemptState(specAttempt, TaskAttemptState.SUCCEEDED);
+    mockTask.handle(new TaskEventTAUpdate(specAttempt.getID(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    assertEquals(TaskState.FAILED, mockTask.getState());
+    assertEquals(2, mockTask.getAttemptList().size());
+  }
+
   // TODO Add test to validate the correct commit attempt.
 
 


Mime
View raw message