hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjs...@apache.org
Subject [37/50] hadoop git commit: YARN-2331. Distinguish shutdown during supervision vs. shutdown for rolling upgrade. Contributed by Jason Lowe
Date Sat, 09 May 2015 00:42:31 GMT
YARN-2331. Distinguish shutdown during supervision vs. shutdown for rolling upgrade. Contributed
by Jason Lowe

Conflicts:
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d6b89cb4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d6b89cb4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d6b89cb4

Branch: refs/heads/YARN-2928
Commit: d6b89cb4c60edbfffdd7262b19d75263fda8af4a
Parents: cf22f0b
Author: Xuan <xgong@apache.org>
Authored: Fri May 8 15:10:43 2015 -0700
Committer: Zhijie Shen <zjshen@apache.org>
Committed: Fri May 8 17:39:51 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |   4 +
 .../src/main/resources/yarn-default.xml         |   9 ++
 .../containermanager/ContainerManagerImpl.java  |   7 +-
 .../logaggregation/LogAggregationService.java   |   5 +-
 .../TestContainerManagerRecovery.java           | 119 +++++++++++++++----
 6 files changed, 124 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6b89cb4/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 08bd0d3..c54e9ad 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -276,6 +276,9 @@ Release 2.8.0 - UNRELEASED
     yarn.scheduler.capacity.node-locality-delay in code and default xml file.
     (Nijel SF via vinodkv)
 
+    YARN-2331. Distinguish shutdown during supervision vs. shutdown for
+    rolling upgrade. (Jason Lowe via xgong)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6b89cb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index ac4a37a..32cc9f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1179,6 +1179,10 @@ public class YarnConfiguration extends Configuration {
 
   public static final String NM_RECOVERY_DIR = NM_RECOVERY_PREFIX + "dir";
 
+  public static final String NM_RECOVERY_SUPERVISED =
+      NM_RECOVERY_PREFIX + "supervised";
+  public static final boolean DEFAULT_NM_RECOVERY_SUPERVISED = false;
+
   ////////////////////////////////
   // Web Proxy Configs
   ////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6b89cb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 247406a..5f1a42e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1214,6 +1214,15 @@
     <value>${hadoop.tmp.dir}/yarn-nm-recovery</value>
   </property>
 
+  <property>
+    <description>Whether the nodemanager is running under supervision. A
+      nodemanager that supports recovery and is running under supervision
+      will not try to cleanup containers as it exits with the assumption
+      it will be immediately be restarted and recover containers.</description>
+    <name>yarn.nodemanager.recovery.supervised</name>
+    <value>false</value>
+  </property>
+
   <!--Docker configuration-->
 
   <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6b89cb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index bf05565..4dd9fa6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -532,8 +532,11 @@ public class ContainerManagerImpl extends CompositeService implements
 
     if (this.context.getNMStateStore().canRecover()
         && !this.context.getDecommissioned()) {
-      // do not cleanup apps as they can be recovered on restart
-      return;
+      if (getConfig().getBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED,
+          YarnConfiguration.DEFAULT_NM_RECOVERY_SUPERVISED)) {
+        // do not cleanup apps as they can be recovered on restart
+        return;
+      }
     }
 
     List<ApplicationId> appIds =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6b89cb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index 0018d56..dbbfcd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -145,10 +145,13 @@ public class LogAggregationService extends AbstractService implements
    
   private void stopAggregators() {
     threadPool.shutdown();
+    boolean supervised = getConfig().getBoolean(
+        YarnConfiguration.NM_RECOVERY_SUPERVISED,
+        YarnConfiguration.DEFAULT_NM_RECOVERY_SUPERVISED);
     // if recovery on restart is supported then leave outstanding aggregations
     // to the next restart
     boolean shouldAbort = context.getNMStateStore().canRecover()
-        && !context.getDecommissioned();
+        && !context.getDecommissioned() && supervised;
     // politely ask to finish
     for (AppLogAggregator aggregator : appLogAggregators.values()) {
       if (shouldAbort) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d6b89cb4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index a5f7db6..41f82c6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -22,7 +22,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.isA;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
@@ -68,6 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@@ -82,27 +87,18 @@ public class TestContainerManagerRecovery {
   public void testApplicationRecovery() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
     conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234");
     conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
     conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user");
     NMStateStoreService stateStore = new NMMemoryStateStoreService();
     stateStore.init(conf);
     stateStore.start();
-    Context context = new NMContext(new NMContainerTokenSecretManager(
-        conf), new NMTokenSecretManagerInNM(), null,
-        new ApplicationACLsManager(conf), stateStore, conf);
+    Context context = createContext(conf, stateStore);
     ContainerManagerImpl cm = createContainerManager(context);
     cm.init(conf);
     cm.start();
 
-    // simulate registration with RM
-    MasterKey masterKey = new MasterKeyPBImpl();
-    masterKey.setKeyId(123);
-    masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
-      .byteValue() }));
-    context.getContainerTokenSecretManager().setMasterKey(masterKey);
-    context.getNMTokenSecretManager().setMasterKey(masterKey);
-
     // add an application by starting a container
     String appUser = "app_user1";
     String modUser = "modify_user1";
@@ -155,9 +151,7 @@ public class TestContainerManagerRecovery {
 
     // reset container manager and verify app recovered with proper acls
     cm.stop();
-    context = new NMContext(new NMContainerTokenSecretManager(
-        conf), new NMTokenSecretManagerInNM(), null,
-        new ApplicationACLsManager(conf), stateStore, conf);
+    context = createContext(conf, stateStore);
     cm = createContainerManager(context);
     cm.init(conf);
     cm.start();
@@ -201,9 +195,7 @@ public class TestContainerManagerRecovery {
 
     // restart and verify app is marked for finishing
     cm.stop();
-    context = new NMContext(new NMContainerTokenSecretManager(
-        conf), new NMTokenSecretManagerInNM(), null,
-        new ApplicationACLsManager(conf), stateStore, conf);
+    context = createContext(conf, stateStore);
     cm = createContainerManager(context);
     cm.init(conf);
     cm.start();
@@ -233,9 +225,7 @@ public class TestContainerManagerRecovery {
 
     // restart and verify app is no longer present after recovery
     cm.stop();
-    context = new NMContext(new NMContainerTokenSecretManager(
-        conf), new NMTokenSecretManagerInNM(), null,
-        new ApplicationACLsManager(conf), stateStore, conf);
+    context = createContext(conf, stateStore);
     cm = createContainerManager(context);
     cm.init(conf);
     cm.start();
@@ -243,6 +233,95 @@ public class TestContainerManagerRecovery {
     cm.stop();
   }
 
+  @Test
+  public void testContainerCleanupOnShutdown() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId attemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId cid = ContainerId.newContainerId(attemptId, 1);
+    Map<String, LocalResource> localResources = Collections.emptyMap();
+    Map<String, String> containerEnv = Collections.emptyMap();
+    List<String> containerCmds = Collections.emptyList();
+    Map<String, ByteBuffer> serviceData = Collections.emptyMap();
+    Credentials containerCreds = new Credentials();
+    DataOutputBuffer dob = new DataOutputBuffer();
+    containerCreds.writeTokenStorageToStream(dob);
+    ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0,
+        dob.getLength());
+    Map<ApplicationAccessType, String> acls = Collections.emptyMap();
+    ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+        localResources, containerEnv, containerCmds, serviceData,
+        containerTokens, acls);
+    // create the logAggregationContext
+    LogAggregationContext logAggregationContext =
+        LogAggregationContext.newInstance("includePattern", "excludePattern");
+
+    // verify containers are stopped on shutdown without recovery
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, false);
+    conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234");
+    Context context = createContext(conf, new NMNullStateStoreService());
+    ContainerManagerImpl cm = spy(createContainerManager(context));
+    cm.init(conf);
+    cm.start();
+    StartContainersResponse startResponse = startContainer(context, cm, cid,
+        clc, logAggregationContext);
+    assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
+    cm.stop();
+    verify(cm).handle(isA(CMgrCompletedAppsEvent.class));
+
+    // verify containers are stopped on shutdown with unsupervised recovery
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, false);
+    NMMemoryStateStoreService memStore = new NMMemoryStateStoreService();
+    memStore.init(conf);
+    memStore.start();
+    context = createContext(conf, memStore);
+    cm = spy(createContainerManager(context));
+    cm.init(conf);
+    cm.start();
+    startResponse = startContainer(context, cm, cid,
+        clc, logAggregationContext);
+    assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
+    cm.stop();
+    memStore.close();
+    verify(cm).handle(isA(CMgrCompletedAppsEvent.class));
+
+    // verify containers are not stopped on shutdown with supervised recovery
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
+    memStore = new NMMemoryStateStoreService();
+    memStore.init(conf);
+    memStore.start();
+    context = createContext(conf, memStore);
+    cm = spy(createContainerManager(context));
+    cm.init(conf);
+    cm.start();
+    startResponse = startContainer(context, cm, cid,
+        clc, logAggregationContext);
+    assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
+    cm.stop();
+    memStore.close();
+    verify(cm, never()).handle(isA(CMgrCompletedAppsEvent.class));
+  }
+
+  private NMContext createContext(YarnConfiguration conf,
+      NMStateStoreService stateStore) {
+    NMContext context = new NMContext(new NMContainerTokenSecretManager(
+        conf), new NMTokenSecretManagerInNM(), null,
+        new ApplicationACLsManager(conf), stateStore, conf);
+
+    // simulate registration with RM
+    MasterKey masterKey = new MasterKeyPBImpl();
+    masterKey.setKeyId(123);
+    masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
+      .byteValue() }));
+    context.getContainerTokenSecretManager().setMasterKey(masterKey);
+    context.getNMTokenSecretManager().setMasterKey(masterKey);
+    return context;
+  }
+
   private StartContainersResponse startContainer(Context context,
       final ContainerManagerImpl cm, ContainerId cid,
       ContainerLaunchContext clc, LogAggregationContext logAggregationContext)


Mime
View raw message