tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-657. Tez should process the Container exit status - specifically when the RM preempts a container (bikas)
Date Thu, 10 Jul 2014 21:53:01 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 7296793c8 -> 2e7641c69


TEZ-657. Tez should process the Container exit status - specifically when the RM preempts
a container (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/2e7641c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/2e7641c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/2e7641c6

Branch: refs/heads/master
Commit: 2e7641c69deb357926a3440c5d236ec65e0593be
Parents: 7296793
Author: Bikas Saha <bikas@apache.org>
Authored: Thu Jul 10 14:52:50 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Thu Jul 10 14:52:50 2014 -0700

----------------------------------------------------------------------
 .../TaskAttemptEventContainerPreempted.java     |  36 -----
 ...AttemptEventContainerTerminatedBySystem.java |  36 +++++
 .../dag/app/dag/event/TaskAttemptEventType.java |   2 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  16 +-
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  20 ++-
 .../dag/app/rm/YarnTaskSchedulerService.java    |   4 -
 .../rm/container/AMContainerEventCompleted.java |  35 ++--
 .../dag/app/rm/container/AMContainerImpl.java   |  34 ++--
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |   2 +-
 .../app/rm/TestTaskSchedulerEventHandler.java   | 159 +++++++++++++++++++
 .../dag/app/rm/container/TestAMContainer.java   |  57 +++++--
 11 files changed, 302 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e7641c6/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerPreempted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerPreempted.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerPreempted.java
deleted file mode 100644
index 1a29010..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerPreempted.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.dag.event;
-
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-public class TaskAttemptEventContainerPreempted extends TaskAttemptEvent 
-  implements DiagnosableEvent {
-
-  private final String diagnostics;
-  public TaskAttemptEventContainerPreempted(TezTaskAttemptID id, String diagnostics) {
-    super(id, TaskAttemptEventType.TA_CONTAINER_PREEMPTED);
-    this.diagnostics = diagnostics;
-  }
-
-  @Override
-  public String getDiagnosticInfo() {
-    return diagnostics;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e7641c6/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
new file mode 100644
index 0000000..a92aafd
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public class TaskAttemptEventContainerTerminatedBySystem extends TaskAttemptEvent 
+  implements DiagnosableEvent {
+
+  private final String diagnostics;
+  public TaskAttemptEventContainerTerminatedBySystem(TezTaskAttemptID id, String diagnostics)
{
+    super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+    this.diagnostics = diagnostics;
+  }
+
+  @Override
+  public String getDiagnosticInfo() {
+    return diagnostics;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e7641c6/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index db3fd3b..35bd144 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -53,7 +53,7 @@ public enum TaskAttemptEventType {
   TA_CONTAINER_TERMINATED,
 
   // Container has either been preempted or will be preempted
-  TA_CONTAINER_PREEMPTED,
+  TA_CONTAINER_TERMINATED_BY_SYSTEM,
 
   // The node running the task attempt failed.
   TA_NODE_FAILED,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e7641c6/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index ea3b5b7..d23b458 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -231,7 +231,7 @@ public class TaskAttemptImpl implements TaskAttempt,
           new ContainerCompletedBeforeRunningTransition())
       .addTransition(TaskAttemptStateInternal.START_WAIT,
           TaskAttemptStateInternal.KILLED,
-          TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+          TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
           new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
 
       .addTransition(TaskAttemptStateInternal.RUNNING,
@@ -279,7 +279,7 @@ public class TaskAttemptImpl implements TaskAttempt,
           new ContainerCompletedWhileRunningTransition())
       .addTransition(TaskAttemptStateInternal.RUNNING,
           TaskAttemptStateInternal.KILLED,
-          TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+          TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
           new ContainerCompletedWhileRunningTransition(KILLED_HELPER))
       .addTransition(
           TaskAttemptStateInternal.RUNNING,
@@ -334,7 +334,7 @@ public class TaskAttemptImpl implements TaskAttempt,
           new ContainerCompletedBeforeRunningTransition())
       .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
           TaskAttemptStateInternal.KILLED,
-          TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+          TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
           new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
       .addTransition(
           TaskAttemptStateInternal.OUTPUT_CONSUMABLE,
@@ -355,7 +355,7 @@ public class TaskAttemptImpl implements TaskAttempt,
           TaskAttemptStateInternal.KILL_IN_PROGRESS,
           TaskAttemptStateInternal.KILL_IN_PROGRESS,
           EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
-              TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+              TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
               TaskAttemptEventType.TA_STATUS_UPDATE,
               TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
               TaskAttemptEventType.TA_COMMIT_PENDING,
@@ -379,7 +379,7 @@ public class TaskAttemptImpl implements TaskAttempt,
           TaskAttemptStateInternal.FAIL_IN_PROGRESS,
           TaskAttemptStateInternal.FAIL_IN_PROGRESS,
           EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
-              TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+              TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
               TaskAttemptEventType.TA_STATUS_UPDATE,
               TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
               TaskAttemptEventType.TA_COMMIT_PENDING,
@@ -400,7 +400,7 @@ public class TaskAttemptImpl implements TaskAttempt,
           TaskAttemptStateInternal.KILLED,
           EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
               TaskAttemptEventType.TA_SCHEDULE,
-              TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+              TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
               TaskAttemptEventType.TA_STATUS_UPDATE,
               TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
               TaskAttemptEventType.TA_COMMIT_PENDING,
@@ -422,7 +422,7 @@ public class TaskAttemptImpl implements TaskAttempt,
           TaskAttemptStateInternal.FAILED,
           EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
               TaskAttemptEventType.TA_SCHEDULE,
-              TaskAttemptEventType.TA_CONTAINER_PREEMPTED,
+              TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
               TaskAttemptEventType.TA_STATUS_UPDATE,
               TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
               TaskAttemptEventType.TA_COMMIT_PENDING,
@@ -467,7 +467,7 @@ public class TaskAttemptImpl implements TaskAttempt,
               TaskAttemptEventType.TA_FAIL_REQUEST,
               TaskAttemptEventType.TA_CONTAINER_TERMINATING,
               TaskAttemptEventType.TA_CONTAINER_TERMINATED,
-              TaskAttemptEventType.TA_CONTAINER_PREEMPTED))
+              TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM))
 
         .installTopology();
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e7641c6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index ec35f28..ea845b5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -31,8 +31,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -393,7 +393,19 @@ public class TaskSchedulerEventHandler extends AbstractService
     // Inform the Containers about completion.
     AMContainer amContainer = appContext.getAllContainers().get(containerStatus.getContainerId());
     if (amContainer != null) {
-      sendEvent(new AMContainerEventCompleted(containerStatus));
+      String message = null;
+      int exitStatus = containerStatus.getExitStatus();
+      if (exitStatus == ContainerExitStatus.PREEMPTED) {
+        message = "Container preempted externally. ";
+      } else if (exitStatus == ContainerExitStatus.DISKS_FAILED) {
+        message = "Container disk failed. ";
+      } else {
+        message = "Container failed. ";
+      }
+      if (containerStatus.getDiagnostics() != null) {
+        message += containerStatus.getDiagnostics();
+      }
+      sendEvent(new AMContainerEventCompleted(amContainer.getContainerId(), exitStatus, message));
     }
   }
 
@@ -509,8 +521,8 @@ public class TaskSchedulerEventHandler extends AbstractService
   public void preemptContainer(ContainerId containerId) {
     taskScheduler.deallocateContainer(containerId);
     // Inform the Containers about completion.
-    sendEvent(new AMContainerEventCompleted(ContainerStatus.newInstance(
-        containerId, ContainerState.COMPLETE, "Container Preempted Internally", -1), true));
+    sendEvent(new AMContainerEventCompleted(containerId,
+        ContainerExitStatus.PREEMPTED, "Container preempted internally"));
   }
 
   public void setShouldUnregisterFlag() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e7641c6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 597b24f..f585c39 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -19,7 +19,6 @@
 package org.apache.tez.dag.app.rm;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -41,13 +40,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e7641c6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
index a6bcc14..e9649f3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
@@ -18,30 +18,35 @@
 
 package org.apache.tez.dag.app.rm.container;
 
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 
 public class AMContainerEventCompleted extends AMContainerEvent {
 
-  private final ContainerStatus containerStatus;
-  private final boolean isPreempted;
+  private final int exitStatus;
+  private final String diagnostics;
 
-  public AMContainerEventCompleted(ContainerStatus containerStatus) {
-    this(containerStatus, false);
+  public AMContainerEventCompleted(ContainerId containerId, 
+      int exitStatus, String diagnostics) {
+    super(containerId, AMContainerEventType.C_COMPLETED);
+    this.exitStatus = exitStatus;
+    this.diagnostics = diagnostics;
+  }
+
+  public boolean isPreempted() {
+    return (exitStatus == ContainerExitStatus.PREEMPTED);
   }
   
-  public AMContainerEventCompleted(ContainerStatus containerStatus, 
-      boolean isPreempted) {
-    super(containerStatus.getContainerId(), AMContainerEventType.C_COMPLETED);
-    this.containerStatus = containerStatus;
-    this.isPreempted = isPreempted;
+  public boolean isDiskFailed() {
+    return (exitStatus == ContainerExitStatus.DISKS_FAILED);
   }
-
-  public ContainerStatus getContainerStatus() {
-    return this.containerStatus;
+  
+  public String getDiagnostics() {
+    return diagnostics;
   }
   
-  public boolean isPreempted() {
-    return isPreempted;
+  public int getContainerExitStatus() {
+    return exitStatus;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e7641c6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 9d36c58..df1b65d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -49,7 +49,7 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerPreempted;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminatedBySystem;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
@@ -540,7 +540,7 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerEventCompleted event = (AMContainerEventCompleted)cEvent;
-      String diag = event.getContainerStatus().getDiagnostics();
+      String diag = event.getDiagnostics();
       if (!(diag == null || diag.equals(""))) {
         LOG.info("Container " + container.getContainerId()
             + " exited with diagnostics set to " + diag);
@@ -653,8 +653,8 @@ public class AMContainerImpl implements AMContainer {
       AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
       if (container.pendingAttempt != null) {
         String errorMessage = getMessage(container, event);
-        if (event.isPreempted()) {
-          container.sendPreemptedToTaskAttempt(container.pendingAttempt,
+        if (event.isPreempted() || event.isDiskFailed()) {
+          container.sendContainerTerminatedBySystemToTaskAttempt(container.pendingAttempt,
               errorMessage);
         } else {
           container.sendTerminatedToTaskAttempt(container.pendingAttempt,
@@ -667,22 +667,19 @@ public class AMContainerImpl implements AMContainer {
       container.containerLocalResources = null;
       container.additionalLocalResources = null;
       container.unregisterFromTAListener();
-      String diag = event.getContainerStatus().getDiagnostics();
+      String diag = event.getDiagnostics();
       if (!(diag == null || diag.equals(""))) {
         LOG.info("Container " + container.getContainerId()
             + " exited with diagnostics set to " + diag);
       }
-      container.logStopped(event.isPreempted() ?
-            ContainerExitStatus.PREEMPTED
-          : ContainerExitStatus.SUCCESS);
+      container.logStopped(event.getContainerExitStatus());
     }
 
     public String getMessage(AMContainerImpl container,
         AMContainerEventCompleted event) {
       return "Container" + container.getContainerId()
-          + (event.isPreempted() ? " PREEMPTED" : " COMPLETED")
-          + " while trying to launch. Diagnostics: ["
-          + event.getContainerStatus().getDiagnostics() +"]";
+          + " finished while trying to launch. Diagnostics: ["
+          + event.getDiagnostics() +"]";
     }
   }
 
@@ -835,9 +832,8 @@ public class AMContainerImpl implements AMContainer {
     public String getMessage(
         AMContainerImpl container, AMContainerEventCompleted event) {
       return "Container " + container.getContainerId()
-          + (event.isPreempted() ? " PREEMPTED" : " COMPLETED")
-          + " with diagnostics set to ["
-          + event.getContainerStatus().getDiagnostics() + "]";
+          + " finished with diagnostics set to ["
+          + event.getDiagnostics() + "]";
     }
   }
 
@@ -907,8 +903,8 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
-      if (event.isPreempted()) {
-        container.sendPreemptedToTaskAttempt(container.runningAttempt,
+      if (event.isPreempted() || event.isDiskFailed()) {
+        container.sendContainerTerminatedBySystemToTaskAttempt(container.runningAttempt,
             getMessage(container, event));
       } else {
         container.sendTerminatedToTaskAttempt(container.runningAttempt,
@@ -997,7 +993,7 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
-      String diag = event.getContainerStatus().getDiagnostics();
+      String diag = event.getDiagnostics();
       for (TezTaskAttemptID taId : container.failedAssignments) {
         container.sendTerminatedToTaskAttempt(taId, diag);
       }
@@ -1124,9 +1120,9 @@ public class AMContainerImpl implements AMContainer {
     sendEvent(new TaskAttemptEventContainerTerminated(taId, message));
   }
   
-  protected void sendPreemptedToTaskAttempt(
+  protected void sendContainerTerminatedBySystemToTaskAttempt(
     TezTaskAttemptID taId, String message) {
-      sendEvent(new TaskAttemptEventContainerPreempted(taId, message));
+      sendEvent(new TaskAttemptEventContainerTerminatedBySystem(taId, message));
   }
 
   protected void sendTerminatingToTaskAttempt(TezTaskAttemptID taId,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e7641c6/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 626b170..4f5af9a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -660,7 +660,7 @@ public class TestTaskAttempt {
             expectedEventsAfterTerminating), AMSchedulerEventTAEnded.class, 1);
 
     taImpl.handle(new TaskAttemptEvent(taskAttemptID,
-        TaskAttemptEventType.TA_CONTAINER_PREEMPTED));
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM));
     int expectedEventAfterTerminated = expectedEventsAfterTerminating + 0;
     arg = ArgumentCaptor.forClass(Event.class);
     verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e7641c6/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
new file mode 100644
index 0000000..edd9ebf
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.api.client.DAGClientServer;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
+import org.apache.tez.dag.app.rm.container.AMContainer;
+import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.app.rm.container.AMContainerMap;
+import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+@SuppressWarnings("rawtypes")
+public class TestTaskSchedulerEventHandler {
+  
+  class TestEventHandler implements EventHandler{
+    List<Event> events = Lists.newLinkedList();
+    @Override
+    public void handle(Event event) {
+      events.add(event);
+    }
+  }
+  
+  class MockTaskSchedulerEventHandler extends TaskSchedulerEventHandler {
+
+    public MockTaskSchedulerEventHandler(AppContext appContext,
+        DAGClientServer clientService, EventHandler eventHandler,
+        ContainerSignatureMatcher containerSignatureMatcher) {
+      super(appContext, clientService, eventHandler, containerSignatureMatcher);
+    }
+    
+    @Override
+    protected TaskSchedulerService createTaskScheduler(String host, int port,
+        String trackingUrl, AppContext appContext) {
+      return mockTaskScheduler;
+    }
+    
+  }
+
+  AppContext mockAppContext;
+  DAGClientServer mockClientService;
+  TestEventHandler mockEventHandler;
+  ContainerSignatureMatcher mockSigMatcher;
+  MockTaskSchedulerEventHandler schedulerHandler;
+  TaskSchedulerService mockTaskScheduler;
+  AMContainerMap mockAMContainerMap;
+
+  @Before
+  public void setup() {
+    mockAppContext = mock(AppContext.class);
+    mockClientService = mock(DAGClientServer.class);
+    mockEventHandler = new TestEventHandler();
+    mockSigMatcher = mock(ContainerSignatureMatcher.class);
+    mockTaskScheduler = mock(TaskSchedulerService.class);
+    mockAMContainerMap = mock(AMContainerMap.class);
+    when(mockAppContext.getAllContainers()).thenReturn(mockAMContainerMap);
+    when(mockClientService.getBindAddress()).thenReturn(new InetSocketAddress(10000));
+    schedulerHandler = new MockTaskSchedulerEventHandler(
+        mockAppContext, mockClientService, mockEventHandler, mockSigMatcher);
+  }
+  
+  @Test (timeout = 5000)
+  public void testContainerPreempted() throws IOException {
+    Configuration conf = new Configuration(false);
+    schedulerHandler.init(conf);
+    schedulerHandler.start();
+    
+    String diagnostics = "Container preempted by RM.";
+    TaskAttemptImpl mockTask = mock(TaskAttemptImpl.class);
+    ContainerStatus mockStatus = mock(ContainerStatus.class);
+    ContainerId mockCId = mock(ContainerId.class);
+    AMContainer mockAMContainer = mock(AMContainer.class);
+    when(mockAMContainerMap.get(mockCId)).thenReturn(mockAMContainer);
+    when(mockAMContainer.getContainerId()).thenReturn(mockCId);
+    when(mockStatus.getContainerId()).thenReturn(mockCId);
+    when(mockStatus.getDiagnostics()).thenReturn(diagnostics);
+    when(mockStatus.getExitStatus()).thenReturn(ContainerExitStatus.PREEMPTED);
+    schedulerHandler.containerCompleted(mockTask, mockStatus);
+    Assert.assertEquals(1, mockEventHandler.events.size());
+    Event event = mockEventHandler.events.get(0);
+    Assert.assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
+    AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
+    Assert.assertEquals(mockCId, completedEvent.getContainerId());
+    Assert.assertEquals("Container preempted externally. Container preempted by RM.", 
+        completedEvent.getDiagnostics());
+    Assert.assertTrue(completedEvent.isPreempted());
+    Assert.assertFalse(completedEvent.isDiskFailed());
+
+    schedulerHandler.stop();
+    schedulerHandler.close();
+  }
+  
+  @Test (timeout = 5000)
+  public void testContainerDiskFailed() throws IOException {
+    Configuration conf = new Configuration(false);
+    schedulerHandler.init(conf);
+    schedulerHandler.start();
+    
+    String diagnostics = "NM disk failed.";
+    TaskAttemptImpl mockTask = mock(TaskAttemptImpl.class);
+    ContainerStatus mockStatus = mock(ContainerStatus.class);
+    ContainerId mockCId = mock(ContainerId.class);
+    AMContainer mockAMContainer = mock(AMContainer.class);
+    when(mockAMContainerMap.get(mockCId)).thenReturn(mockAMContainer);
+    when(mockAMContainer.getContainerId()).thenReturn(mockCId);
+    when(mockStatus.getContainerId()).thenReturn(mockCId);
+    when(mockStatus.getDiagnostics()).thenReturn(diagnostics);
+    when(mockStatus.getExitStatus()).thenReturn(ContainerExitStatus.DISKS_FAILED);
+    schedulerHandler.containerCompleted(mockTask, mockStatus);
+    Assert.assertEquals(1, mockEventHandler.events.size());
+    Event event = mockEventHandler.events.get(0);
+    Assert.assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
+    AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
+    Assert.assertEquals(mockCId, completedEvent.getContainerId());
+    Assert.assertEquals("Container disk failed. NM disk failed.", 
+        completedEvent.getDiagnostics());
+    Assert.assertFalse(completedEvent.isPreempted());
+    Assert.assertTrue(completedEvent.isDiskFailed());
+
+    schedulerHandler.stop();
+    schedulerHandler.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2e7641c6/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 3b3fd37..c0be044 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -41,7 +41,6 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -49,10 +48,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -62,7 +59,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
@@ -75,7 +71,6 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.rm.AMSchedulerEventType;
 import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
-import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.records.TezDAGID;
@@ -591,7 +586,7 @@ public class TestAMContainer {
     wc.pullTaskToRun();
     wc.verifyState(AMContainerState.RUNNING);
 
-    wc.containerCompleted(true);
+    wc.containerCompleted(ContainerExitStatus.PREEMPTED);
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID);
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -600,7 +595,43 @@ public class TestAMContainer {
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
-        TaskAttemptEventType.TA_CONTAINER_PREEMPTED);
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+
+    assertFalse(wc.amContainer.isInErrorState());
+
+    // Pending task complete. (Ideally, container should be dead at this point
+    // and this event should not be generated. Network timeout on NM-RM heartbeat
+    // can cause it to be genreated)
+    wc.taskAttemptSucceeded(wc.taskAttemptID);
+    wc.verifyNoOutgoingEvents();
+    wc.verifyHistoryStopEvent();
+
+    assertFalse(wc.amContainer.isInErrorState());
+  }
+  
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testContainerDiskFailedAtRunning() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+
+    wc.launchContainer();
+
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.containerLaunched();
+    wc.pullTaskToRun();
+    wc.verifyState(AMContainerState.RUNNING);
+
+    wc.containerCompleted(ContainerExitStatus.DISKS_FAILED);
+    wc.verifyState(AMContainerState.COMPLETED);
+    verify(wc.tal).registerRunningContainer(wc.containerID);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.chh).register(wc.containerID);
+    verify(wc.chh).unregister(wc.containerID);
+
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
 
     assertFalse(wc.amContainer.isInErrorState());
 
@@ -1154,9 +1185,13 @@ public class TestAMContainer {
 
     public void containerCompleted(boolean preempted) {
       reset(eventHandler);
-      ContainerStatus cStatus = ContainerStatus.newInstance(containerID,
-          ContainerState.COMPLETE, "", 100);
-      amContainer.handle(new AMContainerEventCompleted(cStatus, preempted));
+      amContainer.handle(new AMContainerEventCompleted(containerID, 
+          (preempted ? ContainerExitStatus.PREEMPTED : ContainerExitStatus.SUCCESS), null));
+    }
+    
+    public void containerCompleted(int exitStatus) {
+      reset(eventHandler);
+      amContainer.handle(new AMContainerEventCompleted(containerID, exitStatus, null));
     }
 
     public void containerTimedOut() {


Mime
View raw message