tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [50/50] [abbrv] tez git commit: TEZ-2502. Fix for TezTaskRunner2 not killing tasks properly in all situations. (sseth)
Date Wed, 15 Jul 2015 00:26:23 GMT
TEZ-2502. Fix for TezTaskRunner2 not killing tasks properly in all situations. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6355116f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6355116f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6355116f

Branch: refs/heads/TEZ-2003
Commit: 6355116f647c049d8d835701159fd914642a71c1
Parents: 468785c
Author: Siddharth Seth <sseth@apache.org>
Authored: Thu May 28 18:29:12 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Jul 14 16:24:16 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  1 +
 .../apache/tez/runtime/task/TezTaskRunner2.java | 83 ++++++++++++--------
 2 files changed, 53 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/6355116f/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index e333832..42c2e1e 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -29,5 +29,6 @@ ALL CHANGES:
   TEZ-2465. Retrun the status of a kill request in TaskRunner2.
   TEZ-2471. NPE in LogicalIOProcessorRuntimeTask while printing thread info.
   TEZ-2495. Inform TaskCommunicaor about Task and Container termination reasons.
+  TEZ-2502. Fix for TezTaskRunner2 not killing tasks properly in all situations.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/6355116f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index 15629fd..a5fabb5 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -124,6 +124,8 @@ public class TezTaskRunner2 {
     try {
       ListenableFuture<TaskRunner2CallableResult> future = null;
       synchronized (this) {
+        // All running state changes must be made within a synchronized block to ensure
+        // kills are issued or the task is not setup.
         if (isRunningState()) {
           // Safe to do this within a synchronized block because we're providing
           // the handler on which the Reporter will communicate back. Assuming
@@ -252,27 +254,34 @@ public class TezTaskRunner2 {
    * @return true if the task kill was honored, false otherwise
    */
   public boolean killTask() {
+    boolean isFirstError = false;
     synchronized (this) {
       if (isRunningState()) {
         if (trySettingEndReason(EndReason.KILL_REQUESTED)) {
+          isFirstError = true;
           killTaskRequested.set(true);
-          if (taskRunnerCallable != null) {
-            taskKillStartTime = System.currentTimeMillis();
-            taskRunnerCallable.interruptTask();
-          }
-          return true;
         } else {
-          LOG.info("Ignoring killTask request for {} since end reason is already set to {}",
-              task.getTaskAttemptID(), firstEndReason);
+          logErrorIngored("killTask", null);
         }
       } else {
-        LOG.info("Ignoring killTask request for {} since it is not in a running state",
-            task.getTaskAttemptID());
+        logErrorIngored("killTask", null);
       }
     }
-    return false;
+    if (isFirstError) {
+      logAborting("killTask");
+      killTaskInternal();
+      return true;
+    } else {
+      return false;
+    }
   }
 
+  private void killTaskInternal() {
+    if (taskRunnerCallable != null) {
+      taskKillStartTime = System.currentTimeMillis();
+      taskRunnerCallable.interruptTask();
+    }
+  }
 
   // Checks and changes on these states should happen within a synchronized block,
   // to ensure the first event is the one that is captured and causes specific behaviour.
@@ -310,17 +319,18 @@ public class TezTaskRunner2 {
             errorReporterToAm.set(true);
             oobSignalErrorInProgress = true;
           } else {
-            LOG.info(
-                "Ignoring fatal error since the task has ended for reason: {}. IgnoredError:
{} ",
-                firstEndReason, (t == null ? message : t.getMessage()));
+            logErrorIngored("signalFatalError", message);
           }
+        } else {
+          logErrorIngored("signalFatalError", message);
         }
       }
 
       // Informing the TaskReporter here because the running task may not be interruptable.
       // Has to be outside the lock.
       if (isFirstError) {
-        killTask();
+        logAborting("signalFatalError");
+        killTaskInternal();
         try {
           taskReporter.taskFailed(taskAttemptID, t, getTaskDiagnosticsString(t, message),
sourceInfo);
         } catch (IOException e) {
@@ -371,19 +381,22 @@ public class TezTaskRunner2 {
           if (trySettingEndReason(EndReason.COMMUNICATION_FAILURE)) {
             registerFirstException(t, null);
             isFirstError = true;
+          } else {
+            logErrorIngored("umbilicalFatalError", null);
           }
           // A race is possible between a task succeeding, and a subsequent timed heartbeat
failing.
           // These errors can be ignored, since a task can only succeed if the synchronous
taskSucceeded
           // method does not throw an exception, in which case task success is registered
with the AM.
           // Leave subsequent heartbeat errors to the next entity to communicate using the
TaskReporter
         } else {
-          LOG.info("Ignoring Communication failure since task with id=" + task.getTaskAttemptID()
-              + " is already complete, is failing or has been asked to terminate");
+          logErrorIngored("umbilicalFatalError", null);
         }
+        // Since this error came from the taskReporter - there's no point attempting to report
a failure back to it.
+        // However, the task does need to be cleaned up
       }
-      // Since this error came from the taskReporter - there's no point attempting to report
a failure back to it.
       if (isFirstError) {
-        killTask();
+        logAborting("umbilicalFatalError");
+        killTaskInternal();
       }
     }
 
@@ -395,18 +408,12 @@ public class TezTaskRunner2 {
         isFirstTerminate = trySettingEndReason(EndReason.CONTAINER_STOP_REQUESTED);
         // Respect stopContainerRequested since it can come in at any point, despite a previous
failure.
         stopContainerRequested.set(true);
-
-        if (isFirstTerminate) {
-          LOG.info("Attempting to abort {} since a shutdown request was received",
-              task.getTaskAttemptID());
-          if (taskRunnerCallable != null) {
-            taskKillStartTime = System.currentTimeMillis();
-            taskRunnerCallable.interruptTask();
-          }
-        } else {
-          LOG.info("Not acting on shutdown request for {} since the task is not in running
state",
-              task.getTaskAttemptID());
-        }
+      }
+      if (isFirstTerminate) {
+        logAborting("shutdownRequested");
+        killTaskInternal();
+      } else {
+        logErrorIngored("shutdownRequested", null);
       }
     }
   }
@@ -451,6 +458,20 @@ public class TezTaskRunner2 {
 
   private void handleFinalStatusUpdateFailure(Throwable t, boolean successReportAttempted)
{
     // TODO Ideally differentiate between FAILED/KILLED
-    LOG.warn("Failure while reporting state= {} to AM", (successReportAttempted ? "success"
: "failure/killed"), t);
+    LOG.warn("Failure while reporting state= {} to AM",
+        (successReportAttempted ? "success" : "failure/killed"), t);
+  }
+
+  private void logErrorIngored(String ignoredEndReason, String errorMessage) {
+    LOG.info(
+        "Ignoring {} request since the task with id {} has ended for reason: {}. IgnoredError:
{} ",
+        ignoredEndReason, task.getTaskAttemptID(),
+        firstEndReason, (firstException == null ? (errorMessage == null ? "" : errorMessage)
:
+            firstException.getMessage()));
+  }
+
+  private void logAborting(String abortReason) {
+    LOG.info("Attempting to abort {} due to an invocation of {}", task.getTaskAttemptID(),
+        abortReason);
   }
 }
\ No newline at end of file


Mime
View raw message