hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject [49/50] [abbrv] hadoop git commit: YARN-5292. NM Container lifecycle and state transitions to support for PAUSED container state. (Hitesh Sharma via asuresh)
Date Thu, 24 Aug 2017 19:36:42 GMT
YARN-5292. NM Container lifecycle and state transitions to support for PAUSED container state.
(Hitesh Sharma via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/96423b50
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/96423b50
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/96423b50

Branch: refs/heads/YARN-5972
Commit: 96423b50e3c9f16fff8f5377859df562b95247be
Parents: 652dd43
Author: Arun Suresh <asuresh@apache.org>
Authored: Fri Dec 9 07:51:03 2016 -0800
Committer: Arun Suresh <asuresh@apache.org>
Committed: Thu Aug 24 12:05:42 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/api/records/ContainerState.java |   7 +-
 .../src/main/proto/yarn_protos.proto            |   1 +
 .../server/nodemanager/ContainerExecutor.java   |  22 +++
 .../container/ContainerEventType.java           |   6 +-
 .../container/ContainerImpl.java                | 170 ++++++++++++++++++-
 .../container/ContainerPauseEvent.java          |  40 +++++
 .../container/ContainerResumeEvent.java         |  39 +++++
 .../container/ContainerState.java               |   3 +-
 .../launcher/ContainerLaunch.java               |  90 +++++++++-
 .../launcher/ContainersLauncher.java            |  32 ++++
 .../launcher/ContainersLauncherEventType.java   |   3 +
 .../scheduler/ContainerSchedulerEventType.java  |   1 +
 .../container/TestContainer.java                |  51 ++++++
 13 files changed, 454 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/96423b50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
index 696fe06..45e5bd4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
@@ -33,11 +33,14 @@ public enum ContainerState {
   
   /** Running container */
   RUNNING, 
-  
+
   /** Completed container */
   COMPLETE,
 
   /** Scheduled (awaiting resources) at the NM. */
   @InterfaceStability.Unstable
-  SCHEDULED
+  SCHEDULED,
+
+  /** Paused at the NM. */
+  PAUSED
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96423b50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 81ebd79..b299c23 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -83,6 +83,7 @@ enum ContainerStateProto {
   C_RUNNING = 2;
   C_COMPLETE = 3;
   C_SCHEDULED = 4;
+  C_PAUSED = 5;
 }
 
 message ContainerProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96423b50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
index 9767fb9..6e409be 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
@@ -695,6 +695,28 @@ public abstract class ContainerExecutor implements Configurable {
   }
 
   /**
+   * Pause the container. The default implementation is to raise a kill event.
+   * Specific executor implementations can override this behavior.
+   * @param container
+   *          the Container
+   */
+  public void pauseContainer(Container container) {
+    LOG.warn(container.getContainerId() + " doesn't support pausing.");
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Resume the container from pause state. The default implementation ignores
+   * this event. Specific implementations can override this behavior.
+   * @param container
+   *          the Container
+   */
+  public void resumeContainer(Container container) {
+    LOG.warn(container.getContainerId() + " doesn't support resume.");
+    throw new UnsupportedOperationException();
+  }
+
+  /**
    * Get the process-identifier for the container.
    *
    * @param containerID the container ID

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96423b50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
index afea0e6..1475435 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
@@ -27,6 +27,8 @@ public enum ContainerEventType {
   CONTAINER_DONE,
   REINITIALIZE_CONTAINER,
   ROLLBACK_REINIT,
+  PAUSE_CONTAINER,
+  RESUME_CONTAINER,
 
   // DownloadManager
   CONTAINER_INITED,
@@ -38,5 +40,7 @@ public enum ContainerEventType {
   CONTAINER_LAUNCHED,
   CONTAINER_EXITED_WITH_SUCCESS,
   CONTAINER_EXITED_WITH_FAILURE,
-  CONTAINER_KILLED_ON_REQUEST
+  CONTAINER_KILLED_ON_REQUEST,
+  CONTAINER_PAUSED,
+  CONTAINER_RESUMED
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96423b50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 8e42133..031ce54 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -294,6 +294,8 @@ public class ContainerImpl implements Container {
         UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.NEW, ContainerState.DONE,
         ContainerEventType.KILL_CONTAINER, new KillOnNewTransition())
+    .addTransition(ContainerState.NEW, ContainerState.DONE,
+            ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
 
     // From LOCALIZING State
     .addTransition(ContainerState.LOCALIZING,
@@ -309,6 +311,8 @@ public class ContainerImpl implements Container {
     .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
         ContainerEventType.KILL_CONTAINER,
         new KillBeforeRunningTransition())
+    .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
+        ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
 
     // From LOCALIZATION_FAILED State
     .addTransition(ContainerState.LOCALIZATION_FAILED,
@@ -322,7 +326,8 @@ public class ContainerImpl implements Container {
     // container not launched so kill is a no-op
     .addTransition(ContainerState.LOCALIZATION_FAILED,
         ContainerState.LOCALIZATION_FAILED,
-        ContainerEventType.KILL_CONTAINER)
+        EnumSet.of(ContainerEventType.KILL_CONTAINER,
+            ContainerEventType.PAUSE_CONTAINER))
     // container cleanup triggers a release of all resources
     // regardless of whether they were localized or not
     // LocalizedResource handles release event in all states
@@ -378,6 +383,76 @@ public class ContainerImpl implements Container {
         ContainerState.EXITED_WITH_FAILURE,
         ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
         new KilledExternallyTransition())
+    .addTransition(ContainerState.RUNNING, ContainerState.PAUSING,
+    ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
+
+    // From PAUSING State
+    .addTransition(ContainerState.PAUSING, ContainerState.KILLING,
+        ContainerEventType.KILL_CONTAINER, new KillTransition())
+    .addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        UPDATE_DIAGNOSTICS_TRANSITION)
+    .addTransition(ContainerState.PAUSING, ContainerState.PAUSED,
+        ContainerEventType.CONTAINER_PAUSED, new PausedContainerTransition())
+    // In case something goes wrong then container will exit from the
+    // PAUSING state
+    .addTransition(ContainerState.PAUSING,
+        ContainerState.EXITED_WITH_SUCCESS,
+        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)
+    .addTransition(ContainerState.PAUSING,
+        ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+        new ExitedWithFailureTransition(true))
+    .addTransition(ContainerState.PAUSING, ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+        new KilledExternallyTransition())
+
+    // From PAUSED State
+    .addTransition(ContainerState.PAUSED, ContainerState.KILLING,
+        ContainerEventType.KILL_CONTAINER, new KillTransition())
+    .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        UPDATE_DIAGNOSTICS_TRANSITION)
+    .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
+        ContainerEventType.PAUSE_CONTAINER)
+    .addTransition(ContainerState.PAUSED, ContainerState.RESUMING,
+        ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition())
+    // In case something goes wrong then container will exit from the
+    // PAUSED state
+    .addTransition(ContainerState.PAUSED,
+        ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+        new ExitedWithFailureTransition(true))
+    .addTransition(ContainerState.PAUSED, ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+        new KilledExternallyTransition())
+    .addTransition(ContainerState.PAUSED,
+        ContainerState.EXITED_WITH_SUCCESS,
+        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+        new ExitedWithSuccessTransition(true))
+
+    // From RESUMING State
+    .addTransition(ContainerState.RESUMING, ContainerState.KILLING,
+        ContainerEventType.KILL_CONTAINER, new KillTransition())
+    .addTransition(ContainerState.RESUMING, ContainerState.RUNNING,
+        ContainerEventType.CONTAINER_RESUMED)
+    .addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        UPDATE_DIAGNOSTICS_TRANSITION)
+    // In case something goes wrong then container will exit from the
+    // RESUMING state
+    .addTransition(ContainerState.RESUMING,
+        ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+        new ExitedWithFailureTransition(true))
+    .addTransition(ContainerState.RESUMING,
+        ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+        new KilledExternallyTransition())
+    .addTransition(ContainerState.RESUMING,
+        ContainerState.EXITED_WITH_SUCCESS,
+        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+        new ExitedWithSuccessTransition(true))
 
     // From REINITIALIZING State
     .addTransition(ContainerState.REINITIALIZING,
@@ -401,6 +476,8 @@ public class ContainerImpl implements Container {
         UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
         ContainerEventType.KILL_CONTAINER, new KillTransition())
+    .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
+        ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
     .addTransition(ContainerState.REINITIALIZING,
         ContainerState.SCHEDULED,
         ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
@@ -418,6 +495,8 @@ public class ContainerImpl implements Container {
         UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
         ContainerEventType.KILL_CONTAINER, new KillTransition())
+    .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
+        ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
 
     // From CONTAINER_EXITED_WITH_SUCCESS State
     .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
@@ -429,7 +508,8 @@ public class ContainerImpl implements Container {
         UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.EXITED_WITH_SUCCESS,
         ContainerState.EXITED_WITH_SUCCESS,
-        ContainerEventType.KILL_CONTAINER)
+        EnumSet.of(ContainerEventType.KILL_CONTAINER,
+            ContainerEventType.PAUSE_CONTAINER))
 
     // From EXITED_WITH_FAILURE State
     .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
@@ -441,7 +521,8 @@ public class ContainerImpl implements Container {
         UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.EXITED_WITH_FAILURE,
                    ContainerState.EXITED_WITH_FAILURE,
-                   ContainerEventType.KILL_CONTAINER)
+        EnumSet.of(ContainerEventType.KILL_CONTAINER,
+            ContainerEventType.PAUSE_CONTAINER))
 
     // From KILLING State.
     .addTransition(ContainerState.KILLING,
@@ -475,7 +556,8 @@ public class ContainerImpl implements Container {
     // in the container launcher
     .addTransition(ContainerState.KILLING,
         ContainerState.KILLING,
-        ContainerEventType.CONTAINER_LAUNCHED)
+        EnumSet.of(ContainerEventType.CONTAINER_LAUNCHED,
+            ContainerEventType.PAUSE_CONTAINER))
 
     // From CONTAINER_CLEANEDUP_AFTER_KILL State.
     .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
@@ -491,11 +573,13 @@ public class ContainerImpl implements Container {
         EnumSet.of(ContainerEventType.KILL_CONTAINER,
             ContainerEventType.RESOURCE_FAILED,
             ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
-            ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
+            ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+            ContainerEventType.PAUSE_CONTAINER))
 
     // From DONE
     .addTransition(ContainerState.DONE, ContainerState.DONE,
-        ContainerEventType.KILL_CONTAINER)
+        EnumSet.of(ContainerEventType.KILL_CONTAINER,
+            ContainerEventType.PAUSE_CONTAINER))
     .addTransition(ContainerState.DONE, ContainerState.DONE,
         ContainerEventType.INIT_CONTAINER)
     .addTransition(ContainerState.DONE, ContainerState.DONE,
@@ -521,6 +605,8 @@ public class ContainerImpl implements Container {
     case LOCALIZING:
     case LOCALIZATION_FAILED:
     case SCHEDULED:
+    case PAUSED:
+    case RESUMING:
       return org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED;
     case RUNNING:
     case RELAUNCHING:
@@ -530,6 +616,7 @@ public class ContainerImpl implements Container {
     case KILLING:
     case CONTAINER_CLEANEDUP_AFTER_KILL:
     case CONTAINER_RESOURCES_CLEANINGUP:
+    case PAUSING:
       return org.apache.hadoop.yarn.api.records.ContainerState.RUNNING;
     case DONE:
     default:
@@ -1481,6 +1568,26 @@ public class ContainerImpl implements Container {
   }
 
   /**
+   * Transitions upon receiving PAUSE_CONTAINER.
+   * - LOCALIZED -> KILLING.
+   * - REINITIALIZING -> KILLING.
+   */
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  static class KillOnPauseTransition implements
+      SingleArcTransition<ContainerImpl, ContainerEvent> {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      // Kill the process/process-grp
+      container.setIsReInitializing(false);
+      container.dispatcher.getEventHandler().handle(
+          new ContainersLauncherEvent(container,
+              ContainersLauncherEventType.CLEANUP_CONTAINER));
+    }
+  }
+
+  /**
    * Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL
    * upon receiving CONTAINER_KILLED_ON_REQUEST.
    */
@@ -1670,6 +1777,57 @@ public class ContainerImpl implements Container {
     }
   }
 
+  /**
+   * Transitions upon receiving PAUSE_CONTAINER.
+   * - RUNNING -> PAUSED
+   */
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  static class PauseContainerTransition implements
+      SingleArcTransition<ContainerImpl, ContainerEvent> {
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      // Pause the process/process-grp if it is supported by the container
+      container.dispatcher.getEventHandler().handle(
+          new ContainersLauncherEvent(container,
+              ContainersLauncherEventType.PAUSE_CONTAINER));
+      ContainerPauseEvent pauseEvent = (ContainerPauseEvent) event;
+      container.addDiagnostics(pauseEvent.getDiagnostic(), "\n");
+    }
+  }
+
+  /**
+   * Transitions upon receiving PAUSED_CONTAINER.
+   */
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  static class PausedContainerTransition implements
+      SingleArcTransition<ContainerImpl, ContainerEvent> {
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      // Container was PAUSED so tell the scheduler
+      container.dispatcher.getEventHandler().handle(
+          new ContainerSchedulerEvent(container,
+              ContainerSchedulerEventType.CONTAINER_PAUSED));
+    }
+  }
+
+  /**
+   * Transitions upon receiving RESUME_CONTAINER.
+   * - PAUSED -> RUNNING
+   */
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  static class ResumeContainerTransition implements
+      SingleArcTransition<ContainerImpl, ContainerEvent> {
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      // Pause the process/process-grp if it is supported by the container
+      container.dispatcher.getEventHandler().handle(
+          new ContainersLauncherEvent(container,
+              ContainersLauncherEventType.RESUME_CONTAINER));
+      ContainerResumeEvent resumeEvent = (ContainerResumeEvent) event;
+      container.addDiagnostics(resumeEvent.getDiagnostic(), "\n");
+    }
+  }
+
   @Override
   public void handle(ContainerEvent event) {
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96423b50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java
new file mode 100644
index 0000000..898304e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+/**
+ * ContainerEvent for ContainerEventType.PAUSE_CONTAINER.
+ */
+public class ContainerPauseEvent extends ContainerEvent {
+
+  private final String diagnostic;
+
+  public ContainerPauseEvent(ContainerId cId,
+      String diagnostic) {
+    super(cId, ContainerEventType.PAUSE_CONTAINER);
+    this.diagnostic = diagnostic;
+  }
+
+  public String getDiagnostic() {
+    return this.diagnostic;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96423b50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java
new file mode 100644
index 0000000..d7c9e9a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+/**
+ * ContainerEvent for ContainerEventType.RESUME_CONTAINER.
+ */
+public class ContainerResumeEvent extends ContainerEvent {
+
+  private final String diagnostic;
+
+  public ContainerResumeEvent(ContainerId cId,
+      String diagnostic) {
+    super(cId, ContainerEventType.RESUME_CONTAINER);
+    this.diagnostic = diagnostic;
+  }
+
+  public String getDiagnostic() {
+    return this.diagnostic;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96423b50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
index 91d1356..7c3fea8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
@@ -21,5 +21,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
 public enum ContainerState {
   NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING,
   REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
-  CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE
+  CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE,
+  PAUSING, PAUSED, RESUMING
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96423b50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index a0055c5..182214f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
@@ -86,6 +87,7 @@ import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 
 public class ContainerLaunch implements Callable<Integer> {
 
@@ -106,8 +108,10 @@ public class ContainerLaunch implements Callable<Integer> {
   private final Configuration conf;
   private final Context context;
   private final ContainerManagerImpl containerManager;
-  
+
   protected AtomicBoolean containerAlreadyLaunched = new AtomicBoolean(false);
+  protected AtomicBoolean shouldPauseContainer = new AtomicBoolean(false);
+
   protected AtomicBoolean completed = new AtomicBoolean(false);
 
   private volatile boolean killedBeforeStart = false;
@@ -778,6 +782,90 @@ public class ContainerLaunch implements Callable<Integer> {
   }
 
   /**
+   * Pause the container.
+   * Cancels the launch if the container isn't launched yet. Otherwise asks the
+   * executor to pause the container.
+   * @throws IOException in case of errors.
+   */
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  public void pauseContainer() throws IOException {
+    ContainerId containerId = container.getContainerId();
+    String containerIdStr = containerId.toString();
+    LOG.info("Pausing the container " + containerIdStr);
+
+    // The pause event is only handled if the container is in the running state
+    // (the container state machine), so we don't check for
+    // shouldLaunchContainer over here
+
+    if (!shouldPauseContainer.compareAndSet(false, true)) {
+      LOG.info("Container " + containerId + " not paused as "
+          + "resume already called");
+      return;
+    }
+
+    try {
+      // Pause the container
+      exec.pauseContainer(container);
+
+      // PauseContainer is a blocking call. We are here almost means the
+      // container is paused, so send out the event.
+      dispatcher.getEventHandler().handle(new ContainerEvent(
+          containerId,
+          ContainerEventType.CONTAINER_PAUSED));
+    } catch (Exception e) {
+      String message =
+          "Exception when trying to pause container " + containerIdStr
+              + ": " + StringUtils.stringifyException(e);
+      LOG.info(message);
+      container.handle(new ContainerKillEvent(container.getContainerId(),
+          ContainerExitStatus.PREEMPTED, "Container preempted as there was "
+          + " an exception in pausing it."));
+    }
+  }
+
+  /**
+   * Resume the container.
+   * Cancels the launch if the container isn't launched yet. Otherwise asks the
+   * executor to pause the container.
+   * @throws IOException in case of error.
+   */
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  public void resumeContainer() throws IOException {
+    ContainerId containerId = container.getContainerId();
+    String containerIdStr = containerId.toString();
+    LOG.info("Resuming the container " + containerIdStr);
+
+    // The resume event is only handled if the container is in a paused state
+    // so we don't check for the launched flag here.
+
+    // paused flag will be set to true if process already paused
+    boolean alreadyPaused = !shouldPauseContainer.compareAndSet(false, true);
+    if (!alreadyPaused) {
+      LOG.info("Container " + containerIdStr + " not paused."
+          + " No resume necessary");
+      return;
+    }
+
+    // If the container has already started
+    try {
+        exec.resumeContainer(container);
+        // ResumeContainer is a blocking call. We are here almost means the
+        // container is resumed, so send out the event.
+        dispatcher.getEventHandler().handle(new ContainerEvent(
+            containerId,
+            ContainerEventType.CONTAINER_RESUMED));
+    } catch (Exception e) {
+      String message =
+          "Exception when trying to resume container " + containerIdStr
+              + ": " + StringUtils.stringifyException(e);
+      LOG.info(message);
+      container.handle(new ContainerKillEvent(container.getContainerId(),
+          ContainerExitStatus.PREEMPTED, "Container preempted as there was "
+          + " an exception in pausing it."));
+    }
+  }
+
+  /**
    * Loop through for a time-bounded interval waiting to
    * read the process id from a file generated by a running process.
    * @param pidFilePath File from which to read the process id

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96423b50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index 25909b9..ca69712 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -171,6 +173,36 @@ public class ContainersLauncher extends AbstractService
               + " with command " + signalEvent.getCommand());
         }
         break;
+      case PAUSE_CONTAINER:
+        ContainerLaunch launchedContainer = running.get(containerId);
+        if (launchedContainer == null) {
+          // Container not launched. So nothing needs to be done.
+          return;
+        }
+
+        // Pause the container
+        try {
+          launchedContainer.pauseContainer();
+        } catch (Exception e) {
+          LOG.info("Got exception while pausing container: " +
+            StringUtils.stringifyException(e));
+        }
+        break;
+      case RESUME_CONTAINER:
+        ContainerLaunch launchCont = running.get(containerId);
+        if (launchCont == null) {
+          // Container not launched. So nothing needs to be done.
+          return;
+        }
+
+        // Resume the container.
+        try {
+          launchCont.resumeContainer();
+        } catch (Exception e) {
+          LOG.info("Got exception while resuming container: " +
+            StringUtils.stringifyException(e));
+        }
+        break;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96423b50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
index 380a032..1054e06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
@@ -25,4 +25,7 @@ public enum ContainersLauncherEventType {
   CLEANUP_CONTAINER, // The process(grp) itself.
   CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself.
   SIGNAL_CONTAINER,
+  PAUSE_CONTAINER,
+  RESUME_CONTAINER
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96423b50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
index 917eda0..a9cbf74 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
@@ -27,4 +27,5 @@ public enum ContainerSchedulerEventType {
   UPDATE_CONTAINER,
   // Producer: Node HB response - RM has asked to shed the queue
   SHED_QUEUED_CONTAINERS,
+  CONTAINER_PAUSED
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96423b50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
index 33f4609..8909088 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
@@ -103,6 +103,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
 
 public class TestContainer {
 
@@ -207,6 +208,42 @@ public class TestContainer {
 
   @Test
   @SuppressWarnings("unchecked") // mocked generic
+  public void testContainerPauseAndResume() throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(13, 314159265358979L, 4344, "yak");
+      wc.initContainer();
+      wc.localizeResources();
+      int running = metrics.getRunningContainers();
+      wc.launchContainer();
+      assertEquals(running + 1, metrics.getRunningContainers());
+      reset(wc.localizerBus);
+      wc.pauseContainer();
+      assertEquals(ContainerState.PAUSED,
+          wc.c.getContainerState());
+      wc.resumeContainer();
+      assertEquals(ContainerState.RUNNING,
+          wc.c.getContainerState());
+      wc.containerKilledOnRequest();
+      assertEquals(ContainerState.EXITED_WITH_FAILURE,
+          wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
+      verifyCleanupCall(wc);
+      int failed = metrics.getFailedContainers();
+      wc.containerResourcesCleanup();
+      assertEquals(ContainerState.DONE, wc.c.getContainerState());
+      assertEquals(failed + 1, metrics.getFailedContainers());
+      assertEquals(running, metrics.getRunningContainers());
+    }
+    finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked") // mocked generic
   public void testCleanupOnFailure() throws Exception {
     WrappedContainer wc = null;
     try {
@@ -955,6 +992,8 @@ public class TestContainer {
       NodeStatusUpdater nodeStatusUpdater = mock(NodeStatusUpdater.class);
       when(context.getNodeStatusUpdater()).thenReturn(nodeStatusUpdater);
       ContainerExecutor executor = mock(ContainerExecutor.class);
+      Mockito.doNothing().when(executor).pauseContainer(any(Container.class));
+      Mockito.doNothing().when(executor).resumeContainer(any(Container.class));
       launcher =
           new ContainersLauncher(context, dispatcher, executor, null, null);
       // create a mock ExecutorService, which will not really launch
@@ -1143,6 +1182,18 @@ public class TestContainer {
       drainDispatcherEvents();
     }
 
+    public void pauseContainer() {
+      c.handle(new ContainerPauseEvent(cId,
+          "PauseRequest"));
+      drainDispatcherEvents();
+    }
+
+    public void resumeContainer() {
+      c.handle(new ContainerResumeEvent(cId,
+          "ResumeRequest"));
+      drainDispatcherEvents();
+    }
+
     public void containerKilledOnRequest() {
       int exitCode = ContainerExitStatus.KILLED_BY_RESOURCEMANAGER;
       String diagnosticMsg = "Container completed with exit code " + exitCode;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message