tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject tez git commit: TEZ-2958. Recovered TA, whose commit cannot be recovered, should move to killed state (jlowe)
Date Thu, 17 Mar 2016 19:11:42 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 37ae6f561 -> d55cf45f2


TEZ-2958. Recovered TA, whose commit cannot be recovered, should move to killed state (jlowe)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d55cf45f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d55cf45f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d55cf45f

Branch: refs/heads/branch-0.7
Commit: d55cf45f20e674e00f09fd6bdf66f6a7ba04c499
Parents: 37ae6f5
Author: Jason Lowe <jlowe@apache.org>
Authored: Thu Mar 17 19:08:55 2016 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Thu Mar 17 19:08:55 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                        |  1 +
 .../dag/records/TaskAttemptTerminationCause.java   |  3 ++-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java      | 10 +++++++++-
 .../org/apache/tez/dag/app/dag/impl/TaskImpl.java  | 17 +++++++++++------
 .../tez/dag/app/dag/impl/TestTaskRecovery.java     | 17 +++++++++++++++++
 5 files changed, 40 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d55cf45f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bb7a56d..676d640 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES:
+  TEZ-2958. Recovered TA, whose commit cannot be recovered, should move to killed state
   TEZ-3105. TezMxBeanResourceCalculator does not work on IBM JDK 7 or 8 causing Tez failures.
   TEZ-2863. Container, node, and logs not available in UI for tasks that fail to launch
   TEZ-3140. Reduce AM memory usage during serialization

http://git-wip-us.apache.org/repos/asf/tez/blob/d55cf45f/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
index c8396de..d0c6798 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
@@ -23,6 +23,7 @@ public enum TaskAttemptTerminationCause {
   
   TERMINATED_BY_CLIENT, // Killed by client command
   TERMINATED_AT_SHUTDOWN, // Killed due execution shutdown
+  TERMINATED_AT_RECOVERY, // Killed in recovery, due to can not recover running task attempt
   INTERNAL_PREEMPTION, // Killed by Tez to makes space for higher pri work
   EXTERNAL_PREEMPTION, // Killed by the cluster to make space for other work
   TERMINATED_INEFFECTIVE_SPECULATION, // Killed speculative attempt because original succeeded
@@ -42,5 +43,5 @@ public enum TaskAttemptTerminationCause {
   CONTAINER_STOPPED, // Container stopped or released by Tez
   NODE_FAILED, // Node for the container failed
   NODE_DISK_ERROR, // Disk failed on the node runnign the task
-  
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/d55cf45f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index aa1b39c..9b8fd80 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1633,7 +1633,15 @@ public class TaskAttemptImpl implements TaskAttempt,
       MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal>
{
     @Override
     public TaskAttemptStateInternal transition(TaskAttemptImpl attempt, TaskAttemptEvent
event) {
-      if (attempt.leafVertex) {
+      boolean fromRecovery = false;
+      if (event instanceof TaskAttemptEventTerminationCauseEvent) {
+        TaskAttemptEventTerminationCauseEvent termEvent =
+            (TaskAttemptEventTerminationCauseEvent) event;
+        if (termEvent.getTerminationCause() == TaskAttemptTerminationCause.TERMINATED_AT_RECOVERY)
{
+          fromRecovery = true;
+        }
+      }
+      if (!fromRecovery && attempt.leafVertex) {
         return TaskAttemptStateInternal.SUCCEEDED;
       }
       // TODO - TEZ-834. This assumes that the outputs were on that node

http://git-wip-us.apache.org/repos/asf/tez/blob/d55cf45f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index e9ef69f..ad678d7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -35,6 +35,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Maps;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1199,7 +1200,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
           if (task.successfulAttempt != null) {
             //Found successful attempt
             //Recover data
-            boolean recoveredData = true;
+            String recoverErrorMsg = null;
             if (task.getVertex().getOutputCommitters() != null
                 && !task.getVertex().getOutputCommitters().isEmpty()) {
               for (Entry<String, OutputCommitter> entry
@@ -1209,28 +1210,32 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
                     + ", output=" + entry.getKey());
                 OutputCommitter committer = entry.getValue();
                 if (!committer.isTaskRecoverySupported()) {
-                  LOG.info("Task recovery not supported by committer"
-                      + ", failing task attempt"
+                  recoverErrorMsg = "Task recovery not supported by committer"
+                      + ", failing task attempt";
+                  LOG.info(recoverErrorMsg
                       + ", taskId=" + task.getTaskId()
                       + ", attemptId=" + task.successfulAttempt
                       + ", output=" + entry.getKey());
-                  recoveredData = false;
                   break;
                 }
                 try {
                   committer.recoverTask(task.getTaskId().getId(),
                       task.appContext.getApplicationAttemptId().getAttemptId()-1);
                 } catch (Exception e) {
+                  recoverErrorMsg = "Task recovery failed by committer: "
+                      + ExceptionUtils.getStackTrace(e);
                   LOG.warn("Task recovery failed by committer"
                       + ", taskId=" + task.getTaskId()
                       + ", attemptId=" + task.successfulAttempt
                       + ", output=" + entry.getKey(), e);
-                  recoveredData = false;
                   break;
                 }
               }
             }
-            if (!recoveredData) {
+            if (recoverErrorMsg != null) {
+              task.eventHandler.handle(
+                  new TaskAttemptEventKillRequest(task.successfulAttempt, recoverErrorMsg,
+                      TaskAttemptTerminationCause.TERMINATED_AT_RECOVERY));
               task.successfulAttempt = null;
             } else {
               LOG.info("Recovered a successful attempt"

http://git-wip-us.apache.org/repos/asf/tez/blob/d55cf45f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index 52421ba..fbf815d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -54,6 +54,7 @@ import org.apache.tez.dag.app.dag.TaskStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
@@ -623,6 +624,7 @@ public class TestTaskRecovery {
     assertEquals(taId, task.successfulAttempt);
 
     task.handle(new TaskEventRecoverTask(task.getTaskId()));
+    dispatcher.await();
     assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
     // new task attempt is scheduled
     assertEquals(2, task.getAttempts().size());
@@ -630,6 +632,13 @@ public class TestTaskRecovery {
     assertEquals(0, task.failedAttempts);
     assertEquals(1, task.getUncompletedAttemptsCount());
     assertEquals(null, task.successfulAttempt);
+    // verify kill event is sent to original task attempt
+    assertEquals(2, taEventHandler.getEvents().size());
+    TaskAttemptEvent taEvent = taEventHandler.getEvents().get(1);
+    assertEquals(TaskAttemptEventType.TA_KILL_REQUEST, taEvent.getType());
+    assertEquals(taId, taEvent.getTaskAttemptID());
+    TaskAttemptEventKillRequest taKillEvent = (TaskAttemptEventKillRequest) taEvent;
+    assertEquals(TaskAttemptTerminationCause.TERMINATED_AT_RECOVERY, taKillEvent.getTerminationCause());
   }
 
   /**
@@ -663,6 +672,7 @@ public class TestTaskRecovery {
     assertEquals(taId, task.successfulAttempt);
 
     task.handle(new TaskEventRecoverTask(task.getTaskId()));
+    dispatcher.await();
     assertEquals(TaskStateInternal.RUNNING, task.getInternalState());
     // new task attempt is scheduled
     assertEquals(2, task.getAttempts().size());
@@ -670,6 +680,13 @@ public class TestTaskRecovery {
     assertEquals(0, task.failedAttempts);
     assertEquals(1, task.getUncompletedAttemptsCount());
     assertEquals(null, task.successfulAttempt);
+    // verify kill event is sent to original task attempt
+    assertEquals(2, taEventHandler.getEvents().size());
+    TaskAttemptEvent taEvent = taEventHandler.getEvents().get(1);
+    assertEquals(TaskAttemptEventType.TA_KILL_REQUEST, taEvent.getType());
+    assertEquals(taId, taEvent.getTaskAttemptID());
+    TaskAttemptEventKillRequest taKillEvent = (TaskAttemptEventKillRequest) taEvent;
+     assertEquals(TaskAttemptTerminationCause.TERMINATED_AT_RECOVERY, taKillEvent.getTerminationCause());
   }
 
   @Test(timeout = 5000)


Mime
View raw message