tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-2475. Fix a potential hang in Tez local mode caused by incorrectly handled interrupts. (sseth)
Date Tue, 16 Jun 2015 18:12:10 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.6 36579fa21 -> caab9058a


TEZ-2475. Fix a potential hang in Tez local mode caused by incorrectly
handled interrupts. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/caab9058
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/caab9058
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/caab9058

Branch: refs/heads/branch-0.6
Commit: caab9058a25afebfced4f6c9536d4f36fb28f2c0
Parents: 36579fa
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Jun 16 11:12:05 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Jun 16 11:12:05 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../app/launcher/LocalContainerLauncher.java    | 41 +++++++++++++-------
 .../tez/runtime/task/ContainerReporter.java     |  4 +-
 .../org/apache/tez/runtime/task/TezChild.java   | 29 +++++++++++---
 4 files changed, 53 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/caab9058/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 01aa780..782f5e9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.2: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2475. Fix a potential hang in Tez local mode caused by incorrectly handled interrupts.
   TEZ-2548. TezClient submitDAG can hang if the AM is in the process of shutting down.
   TEZ-2534. Error handling summary event when shutting down AM.
   TEZ-2511. Add exitCode to diagnostics when container fails.

http://git-wip-us.apache.org/repos/asf/tez/blob/caab9058/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index bd4996b..c56c93b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -85,9 +85,9 @@ public class LocalContainerLauncher extends AbstractService implements
   private final AtomicBoolean serviceStopped = new AtomicBoolean(false);
   private final String workingDirectory;
 
-  private final ConcurrentHashMap<ContainerId, ListenableFuture<TezChild.ContainerExecutionResult>>
+  private final ConcurrentHashMap<ContainerId, RunningTaskCallback>
       runningContainers =
-      new ConcurrentHashMap<ContainerId, ListenableFuture<TezChild.ContainerExecutionResult>>();
+      new ConcurrentHashMap<ContainerId, RunningTaskCallback>();
 
   private final ExecutorService callbackExecutor = Executors.newFixedThreadPool(1,
       new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CallbackExecutor").build());
@@ -140,6 +140,7 @@ public class LocalContainerLauncher extends AbstractService implements
   public void serviceStop() throws Exception {
     if (!serviceStopped.compareAndSet(false, true)) {
       LOG.info("Service Already stopped. Ignoring additional stop");
+      return;
     }
     if (eventHandlingThread != null) {
       eventHandlingThread.interrupt();
@@ -172,6 +173,9 @@ public class LocalContainerLauncher extends AbstractService implements
             LOG.error("TezSubTaskRunner interrupted ", e);
           }
           return;
+        } catch (Throwable e) {
+          LOG.error("TezSubTaskRunner failed due to exception", e);
+          throw new RuntimeException(e);
         }
       }
     }
@@ -220,24 +224,29 @@ public class LocalContainerLauncher extends AbstractService implements
       }
       ListenableFuture<TezChild.ContainerExecutionResult> runningTaskFuture =
           taskExecutorService.submit(createSubTask(tezChild, event.getContainerId()));
-      runningContainers.put(event.getContainerId(), runningTaskFuture);
-      Futures.addCallback(runningTaskFuture,
-          new RunningTaskCallback(context, event.getContainerId(), tezChild), callbackExecutor);
+      RunningTaskCallback callback = new RunningTaskCallback(context, event.getContainerId());
+      runningContainers.put(event.getContainerId(), callback);
+      Futures.addCallback(runningTaskFuture, callback, callbackExecutor);
     } catch (RejectedExecutionException e) {
       handleLaunchFailed(e, event.getContainerId());
     }
   }
 
   private void stop(NMCommunicatorStopRequestEvent event) {
-    ListenableFuture<TezChild.ContainerExecutionResult> future =
+    // A stop_request will come in when a task completes and reports back or a preemption
decision
+    // is made. Currently the LocalTaskScheduler does not support preemption. Also preemption
+    // will not work in local mode till Tez supports task preemption instead of container
preemption.
+    RunningTaskCallback callback =
         runningContainers.get(event.getContainerId());
-    if (future == null) {
+    if (callback == null) {
       LOG.info("Ignoring stop request for containerId: " + event.getContainerId());
     } else {
-      LOG.info("Interrupting running/queued container with id: " + event.getContainerId());
-      future.cancel(true);
-      // This will work only if the running task respects Interrupts - which at the moment
is
-      // not the case for parts of the Runtime.
+      LOG.info(
+          "Ignoring stop request for containerId " + event.getContainerId() +
+              ". Relying on regular task shutdown for it to end");
+      // Allow the tezChild thread to run it's course. It'll receive a shutdown request from
the
+      // AM eventually since the task and container will be unregistered.
+      // This will need to be fixed once interrupting tasks is supported.
     }
     // Send this event to maintain regular control flow. This isn't of much use though.
     context.getEventHandler().handle(
@@ -249,17 +258,16 @@ public class LocalContainerLauncher extends AbstractService implements
 
     private final AppContext appContext;
     private final ContainerId containerId;
-    private final TezChild tezChild;
 
-    RunningTaskCallback(AppContext appContext, ContainerId containerId, TezChild tezChild)
{
+    RunningTaskCallback(AppContext appContext, ContainerId containerId) {
       this.appContext = appContext;
       this.containerId = containerId;
-      this.tezChild = tezChild;
     }
 
     @Override
     public void onSuccess(TezChild.ContainerExecutionResult result) {
       runningContainers.remove(containerId);
+      LOG.info("ContainerExecutionResult for: " + containerId + " = " + result);
       if (result.getExitStatus() == TezChild.ContainerExecutionResult.ExitStatus.SUCCESS
||
           result.getExitStatus() ==
               TezChild.ContainerExecutionResult.ExitStatus.ASKED_TO_DIE) {
@@ -280,8 +288,8 @@ public class LocalContainerLauncher extends AbstractService implements
     @Override
     public void onFailure(Throwable t) {
       runningContainers.remove(containerId);
-      tezChild.shutdown();
       // Ignore CancellationException since that is triggered by the LocalContainerLauncher
itself
+      // TezChild would have exited by this time. There's no need to invoke shutdown again.
       if (!(t instanceof CancellationException)) {
         LOG.info("Container: " + containerId + ": Execution Failed: ", t);
         // Inform of failure with exit code 1.
@@ -308,6 +316,9 @@ public class LocalContainerLauncher extends AbstractService implements
       @Override
       public TezChild.ContainerExecutionResult call() throws InterruptedException, TezException,
           IOException {
+        // Reset the interrupt status. Idaelly the thread should not be in an interrupted
state.
+        // TezTaskRunner needs to be fixed to ensure this.
+        Thread.interrupted();
         // Inform about the launch request now that the container has been allocated a thread
to execute in.
         context.getEventHandler().handle(new AMContainerEventLaunched(containerId));
         ContainerLaunchedEvent lEvt =

http://git-wip-us.apache.org/repos/asf/tez/blob/caab9058/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
index a68c7c1..a0ba2d9 100644
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java
@@ -52,7 +52,7 @@ public class ContainerReporter implements Callable<ContainerTask>
{
   @Override
   public ContainerTask call() throws Exception {
     ContainerTask containerTask = null;
-    LOG.info("Attempting to fetch new task");
+    LOG.info("Attempting to fetch new task for container " + containerContext.getContainerIdentifier());
     containerTask = umbilical.getTask(containerContext);
     long getTaskPollStartTime = System.currentTimeMillis();
     nextGetTaskPrintTime = getTaskPollStartTime + LOG_INTERVAL;
@@ -62,7 +62,7 @@ public class ContainerReporter implements Callable<ContainerTask>
{
       TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSecs);
       containerTask = umbilical.getTask(containerContext);
     }
-    LOG.info("Got TaskUpdate: "
+    LOG.info("Got TaskUpdate for containerId= " + containerContext.getContainerIdentifier()
+ ": "
         + (System.currentTimeMillis() - getTaskPollStartTime)
         + " ms after starting to poll."
         + " TaskInfo: shouldDie: "

http://git-wip-us.apache.org/repos/asf/tez/blob/caab9058/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 2748c1c..6223c51 100644
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -178,7 +179,7 @@ public class TezChild {
 
     UserGroupInformation childUGI = null;
 
-    while (!executor.isTerminated()) {
+    while (!executor.isTerminated() && !isShutdown.get()) {
       if (taskCount > 0) {
         TezUtilsInternal.updateLoggers("");
       }
@@ -194,7 +195,7 @@ public class TezChild {
             cause, "Execution Exception while fetching new work: " + e.getMessage());
       } catch (InterruptedException e) {
         error = true;
-        LOG.info("Interrupted while waiting for new work");
+        LOG.info("Interrupted while waiting for new work for container " + containerIdString);
         return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.INTERRUPTED,
e,
             "Interrupted while waiting for new work");
       } finally {
@@ -203,7 +204,7 @@ public class TezChild {
         }
       }
       if (containerTask.shouldDie()) {
-        LOG.info("ContainerTask returned shouldDie=true, Exiting");
+        LOG.info("ContainerTask returned shouldDie=true for container " + containerIdString
+ ", Exiting");
         shutdown();
         return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS,
null,
             "Asked to die by the AM");
@@ -230,7 +231,9 @@ public class TezChild {
         try {
           shouldDie = !taskRunner.run();
           if (shouldDie) {
-            LOG.info("Got a shouldDie notification via hearbeats. Shutting down");
+            LOG.info(
+                "Got a shouldDie notification via heartbeats for container " + containerIdString
+
+                    ". Shutting down");
             shutdown();
             return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS,
null,
                 "Asked to die by the AM");
@@ -337,8 +340,15 @@ public class TezChild {
   }
 
   public void shutdown() {
+    LOG.info("Shutdown invoked for container " + containerIdString);
     if (!isShutdown.getAndSet(true)) {
-      executor.shutdownNow();
+      LOG.info("Shutting down container " + containerIdString);
+      // It's possible that there's pending tasks on the executor. Those should be cancelled.
+      List<Runnable> pendingRunnables = executor.shutdownNow();
+      for (Runnable r : pendingRunnables) {
+        LOG.info("Cancelling pending runnables during TezChild shutdown for containerId="
+ containerIdString);
+        ((FutureTask)r).cancel(false);
+      }
       if (taskReporter != null) {
         taskReporter.shutdown();
       }
@@ -396,6 +406,15 @@ public class TezChild {
     public String getErrorMessage() {
       return this.errorMessage;
     }
+
+    @Override
+    public String toString() {
+      return "ContainerExecutionResult{" +
+          "exitStatus=" + exitStatus +
+          ", throwable=" + throwable +
+          ", errorMessage='" + errorMessage + '\'' +
+          '}';
+    }
   }
 
   public static TezChild newTezChild(Configuration conf, String host, int port, String containerIdentifier,


Mime
View raw message