tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [30/51] [abbrv] tez git commit: TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks. (sseth)
Date Thu, 06 Aug 2015 09:26:22 GMT
TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/625220e9
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/625220e9
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/625220e9

Branch: refs/heads/TEZ-2003
Commit: 625220e97a7e912b1976eba95affc07d0d993dee
Parents: b51e271
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue May 12 14:27:42 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Thu Aug 6 01:26:09 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                                   |  1 +
 .../java/org/apache/tez/dag/api/TaskCommunicator.java  |  4 ++++
 .../tez/runtime/LogicalIOProcessorRuntimeTask.java     | 11 ++++++-----
 .../main/java/org/apache/tez/runtime/RuntimeTask.java  |  2 +-
 .../apache/tez/runtime/task/TaskRunner2Callable.java   | 13 +++++++------
 .../org/apache/tez/runtime/task/TezTaskRunner2.java    | 10 ++++++----
 6 files changed, 25 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/625220e9/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 5d2e40a..ed72d6b 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -25,5 +25,6 @@ ALL CHANGES:
   TEZ-2433. Fixes after rebase 05/08
   TEZ-2438. tez-tools version in the branch is incorrect.
   TEZ-2434. Allow tasks to be killed in the Runtime.
+  TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/625220e9/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index cadca0c..2651013 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -48,6 +48,10 @@ public abstract class TaskCommunicator extends AbstractService {
   // TODO TEZ-2003. Are additional APIs required to mark a container as completed ? - for
completeness.
 
   // TODO TEZ-2003 Remove reference to TaskAttemptID
+  // TODO TEZ-2003 This needs some information about why the attempt is being unregistered.
+  // e.g. preempted in which case the task may need to be informed. Alternately as a result
of
+  // a failed task.
+  // In case of preemption - a killTask API is likely a better bet than trying to overload
this method.
   public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID);
 
   // TODO TEZ-2003 This doesn't necessarily belong here. A server may not start within the
AM.

http://git-wip-us.apache.org/repos/asf/tez/blob/625220e9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 8263b3f..de08e56 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -704,7 +704,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   @Override
-  public synchronized void abortTask() throws Exception {
+  public synchronized void abortTask() {
     if (processor != null) {
       processor.abort();
     }
@@ -803,6 +803,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       LOG.debug("Num of inputs to be closed={}", initializedInputs.size());
       LOG.debug("Num of outputs to be closed={}", initializedOutputs.size());
     }
+
     // Close processor
     if (!processorClosed && processor != null) {
       try {
@@ -820,8 +821,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         Thread.currentThread().interrupt();
       } catch (Throwable e) {
         LOG.warn(
-            "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}",
-                e.getClass().getName(), e.getMessage());
+            "Ignoring Exception when closing processor(cleanup). Exception class={}, message={}"
+
+                e.getClass().getName(), e.getMessage(), e);
       }
     }
 
@@ -842,7 +843,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       } catch (Throwable e) {
         LOG.warn(
             "Ignoring exception when closing input {}(cleanup). Exception class={}, message={}",
-            srcVertexName, e.getClass().getName(), e.getMessage());
+            srcVertexName, e.getClass().getName(), e.getMessage(), e);
       } finally {
         LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
             .getContext().getTaskVertexName(), srcVertexName, Thread.currentThread().isInterrupted());
@@ -866,7 +867,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       } catch (Throwable e) {
         LOG.warn(
             "Ignoring exception when closing output {}(cleanup). Exception class={}, message={}",
-            destVertexName, e.getClass().getName(), e.getMessage());
+            destVertexName, e.getClass().getName(), e.getMessage(), e);
       } finally {
         LOG.info("Close input for vertex={}, sourceVertex={}, interruptedStatus={}", processor
             .getContext().getTaskVertexName(), destVertexName, Thread.currentThread().isInterrupted());

http://git-wip-us.apache.org/repos/asf/tez/blob/625220e9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
index cdfb46a..33c0113 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/RuntimeTask.java
@@ -167,5 +167,5 @@ public abstract class RuntimeTask {
     taskDone.set(true);
   }
 
-  public abstract void abortTask() throws Exception;
+  public abstract void abortTask();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/625220e9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
index 7315bbd..ab77635 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java
@@ -63,26 +63,26 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas
           if (stopRequested.get() || Thread.currentThread().isInterrupted()) {
             return new TaskRunner2CallableResult(null);
           }
-          LOG.info("Initializing task" + ", taskAttemptId=" + task.getTaskAttemptID());
+          LOG.info("Initializing task" + ", taskAttemptId={}", task.getTaskAttemptID());
           task.initialize();
 
           if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) {
-            LOG.info("Running task, taskAttemptId=" + task.getTaskAttemptID());
+            LOG.info("Running task, taskAttemptId={}", task.getTaskAttemptID());
             task.run();
           } else {
-            LOG.info("Stopped before running the processor.");
+            LOG.info("Stopped before running the processor taskAttemptId={}", task.getTaskAttemptID());
             return new TaskRunner2CallableResult(null);
           }
 
           if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) {
-            LOG.info("Closing task, taskAttemptId=" + task.getTaskAttemptID());
+            LOG.info("Closing task, taskAttemptId={}", task.getTaskAttemptID());
             task.close();
             task.setFrameworkCounters();
           } else {
-            LOG.info("Stopped before closing the processor");
+            LOG.info("Stopped before closing the processor, taskAttemptId={}", task.getTaskAttemptID());
             return new TaskRunner2CallableResult(null);
           }
-          LOG.info("Task completed, taskAttemptId=" + task.getTaskAttemptID() + ", askedToStop="
+ stopRequested.get());
+          LOG.info("Task completed, taskAttemptId={}, askedToStop={}", task.getTaskAttemptID(),
stopRequested.get());
 
 
           return new TaskRunner2CallableResult(null);
@@ -115,6 +115,7 @@ public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.Tas
   public void interruptTask() {
     // Ensure the task is only interrupted once.
     if (!stopRequested.getAndSet(true)) {
+      task.abortTask();
       if (ownThread != null) {
         ownThread.interrupt();
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/625220e9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index 73e5c76..ffbc6e8 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -250,10 +250,12 @@ public class TezTaskRunner2 {
   public void killTask() {
     synchronized (this) {
       if (isRunningState()) {
-        trySettingEndReason(EndReason.KILL_REQUESTED);
-        if (taskRunnerCallable != null) {
-          taskKillStartTime = System.currentTimeMillis();
-          taskRunnerCallable.interruptTask();
+        if (trySettingEndReason(EndReason.KILL_REQUESTED)) {
+          killTaskRequested.set(true);
+          if (taskRunnerCallable != null) {
+            taskKillStartTime = System.currentTimeMillis();
+            taskRunnerCallable.interruptTask();
+          }
         }
       }
     }


Mime
View raw message