tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject tez git commit: TEZ-2566. Allow TaskAttemptFinishedEvent without TaskAttemptStartedEvent when it is KILLED/FAILED (zjffdu)
Date Thu, 18 Jun 2015 08:01:58 GMT
Repository: tez
Updated Branches:
  refs/heads/master d1607816f -> ccecb9830


TEZ-2566. Allow TaskAttemptFinishedEvent without TaskAttemptStartedEvent when it is KILLED/FAILED
(zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ccecb983
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ccecb983
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ccecb983

Branch: refs/heads/master
Commit: ccecb9830caf40d23927fcbc4821a80bf1300031
Parents: d160781
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Thu Jun 18 16:01:42 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Thu Jun 18 16:01:42 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  6 +-
 .../tez/dag/app/dag/impl/TestTaskRecovery.java  | 61 ++++++++++++++++++++
 3 files changed, 66 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ccecb983/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bc8aaf5..fa2cbf4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -443,6 +443,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2566. Allow TaskAttemptFinishedEvent without TaskAttemptStartedEvent when it is KILLED/FAILED
   TEZ-2475. Fix a potential hang in Tez local mode caused by incorrectly handled interrupts.
   TEZ-2548. TezClient submitDAG can hang if the AM is in the process of shutting down.
   TEZ-2533. AM deadlock when shutdown

http://git-wip-us.apache.org/repos/asf/tez/blob/ccecb983/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 1251ac4..a5c507e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -621,13 +621,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
                 taskAttemptFinishedEvent.getTaskAttemptID());
             this.attempts.put(taskAttemptFinishedEvent.getTaskAttemptID(),
                 recoveredAttempt);
-            if (!taskAttemptFinishedEvent.getState().equals(TaskAttemptState.KILLED)) {
+            // Allow TaskAttemptFinishedEvent without TaskAttemptStartedEvent when it is
KILLED/FAILED
+            if (!taskAttemptFinishedEvent.getState().equals(TaskAttemptState.KILLED)
+                && !taskAttemptFinishedEvent.getState().equals(TaskAttemptState.FAILED))
{
               throw new TezUncheckedException("Could not find task attempt"
                   + " when trying to recover"
                   + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID()
                   + ", taskAttemptFinishState" + taskAttemptFinishedEvent.getState());
             }
-            return recoveredState;
+            taskAttempt = recoveredAttempt;
           }
           if (getUncompletedAttemptsCount() < 0) {
             throw new TezUncheckedException("Invalid recovery event for attempt finished"

http://git-wip-us.apache.org/repos/asf/tez/blob/ccecb983/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index a16fad2..f43f52c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.client.VertexStatus.State;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskState;
@@ -63,6 +64,7 @@ import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -276,6 +278,65 @@ public class TestTaskRecovery {
   }
 
   /**
+   * restoreFromTaskStartedEvent -> restoreFromTaskAttemptFinishedEvent (KILLED) ->
+   * RecoverTranstion
+   */
+  @Test(timeout = 5000)
+  public void testRecovery_OnlyTAFinishedEvent_KILLED() {
+    restoreFromTaskStartEvent();
+    TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
+    task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
+        0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"",
new TezCounters()));
+    task.handle(new TaskEventRecoverTask(task.getTaskId()));
+    // wait for the second task attempt is scheduled
+    dispatcher.await();
+    assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
+    // taskAttempt_1 is recovered to KILLED, and new task attempt is scheduled
+    assertEquals(2, task.getAttempts().size());
+    assertEquals(1, task.getFinishedAttemptsCount());
+    assertEquals(0, task.failedAttempts);
+    assertEquals(null, task.successfulAttempt);
+  }
+
+  /**
+   * restoreFromTaskStartedEvent -> restoreFromTaskAttemptFinishedEvent (FAILED) ->
+   * RecoverTranstion
+   */
+  @Test(timeout = 5000)
+  public void testRecovery_OnlyTAFinishedEvent_FAILED() {
+    restoreFromTaskStartEvent();
+    TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
+    task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
+        0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"",
new TezCounters()));
+    task.handle(new TaskEventRecoverTask(task.getTaskId()));
+    // wait for the second task attempt is scheduled
+    dispatcher.await();
+    assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
+    // taskAttempt_1 is recovered to FAILED, and new task attempt is scheduled
+    assertEquals(2, task.getAttempts().size());
+    assertEquals(1, task.getFinishedAttemptsCount());
+    assertEquals(1, task.failedAttempts);
+    assertEquals(null, task.successfulAttempt);
+  }
+
+  /**
+   * restoreFromTaskStartedEvent -> restoreFromTaskAttemptFinishedEvent (SUCCEEDED) ->
+   * RecoverTranstion
+   */
+  @Test(timeout = 5000)
+  public void testRecovery_OnlyTAFinishedEvent_SUCCEEDED() {
+    restoreFromTaskStartEvent();
+    TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
+    try {
+      task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
+          0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters()));
+      fail("Should fail due to no TaskAttemptStartedEvent but with TaskAttemptFinishedEvent(Succeeded)");
+    } catch (TezUncheckedException e) {
+      assertTrue(e.getMessage().contains("Could not find task attempt when trying to recover"));
+    }
+  }
+
+  /**
    * restoreFromTaskStartedEvent -> restoreFromTaskAttemptStartedEvent ->
    * RecoverTranstion
    */


Mime
View raw message