hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject hadoop git commit: YARN-4117. End to end unit test with mini YARN cluster for AMRMProxy Service. Contributed by Giovanni Matteo Fumarola
Date Mon, 28 Mar 2016 20:25:23 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 0c84f9aee -> 7c81e374d


YARN-4117. End to end unit test with mini YARN cluster for AMRMProxy Service. Contributed
by Giovanni Matteo Fumarola


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7c81e374
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7c81e374
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7c81e374

Branch: refs/heads/branch-2.8
Commit: 7c81e374dac48594d4a0bf5aeef2ba3ff563e34d
Parents: 0c84f9a
Author: Jian He <jianhe@apache.org>
Authored: Sun Mar 27 20:22:12 2016 -0700
Committer: Jian He <jianhe@apache.org>
Committed: Mon Mar 28 13:23:53 2016 -0700

----------------------------------------------------------------------
 .../yarn/client/api/impl/TestAMRMProxy.java     | 413 +++++++++++++++++++
 .../nodemanager/amrmproxy/AMRMProxyService.java |  14 +-
 .../amrmproxy/DefaultRequestInterceptor.java    |   7 +
 .../containermanager/ContainerManagerImpl.java  |  49 ++-
 .../hadoop/yarn/server/MiniYARNCluster.java     |  75 +++-
 5 files changed, 535 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c81e374/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
new file mode 100644
index 0000000..b92538a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
@@ -0,0 +1,413 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAMRMProxy {
+
+  private static final Log LOG = LogFactory.getLog(TestAMRMProxy.class);
+
+  /*
+   * This test validates register, allocate and finish of an application through
+   * the AMRMPRoxy.
+   */
+  @Test(timeout = 60000)
+  public void testAMRMProxyE2E() throws Exception {
+    MiniYARNCluster cluster = new MiniYARNCluster("testAMRMProxyE2E", 1, 1, 1);
+    YarnClient rmClient = null;
+    ApplicationMasterProtocol client;
+
+    try {
+      Configuration conf = new YarnConfiguration();
+      conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+      cluster.init(conf);
+      cluster.start();
+      final Configuration yarnConf = cluster.getConfig();
+
+      // the client has to connect to AMRMProxy
+
+      yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+          YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
+      rmClient = YarnClient.createYarnClient();
+      rmClient.init(yarnConf);
+      rmClient.start();
+
+      // Submit application
+
+      ApplicationId appId = createApp(rmClient, cluster);
+
+      client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
+
+      LOG.info("testAMRMProxyE2E - Register Application Master");
+
+      RegisterApplicationMasterResponse responseRegister =
+          client.registerApplicationMaster(RegisterApplicationMasterRequest
+              .newInstance(NetUtils.getHostname(), 1024, ""));
+
+      Assert.assertNotNull(responseRegister);
+      Assert.assertNotNull(responseRegister.getQueue());
+      Assert.assertNotNull(responseRegister.getApplicationACLs());
+      Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
+      Assert
+          .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
+      Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
+      Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
+
+      RMApp rmApp =
+          cluster.getResourceManager().getRMContext().getRMApps().get(appId);
+      Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
+
+      LOG.info("testAMRMProxyE2E - Allocate Resources Application Master");
+
+      AllocateRequest request =
+          createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
+
+      AllocateResponse allocResponse = client.allocate(request);
+      Assert.assertNotNull(allocResponse);
+      Assert.assertEquals(0, allocResponse.getAllocatedContainers().size());
+
+      request.setAskList(new ArrayList<ResourceRequest>());
+      request.setResponseId(request.getResponseId() + 1);
+
+      Thread.sleep(1000);
+
+      // RM should allocate container within 2 calls to allocate()
+      allocResponse = client.allocate(request);
+      Assert.assertNotNull(allocResponse);
+      Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+
+      LOG.info("testAMRMPRoxy - Finish Application Master");
+
+      FinishApplicationMasterResponse responseFinish =
+          client.finishApplicationMaster(FinishApplicationMasterRequest
+              .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
+
+      Assert.assertNotNull(responseFinish);
+
+      Thread.sleep(500);
+      Assert.assertNotEquals(RMAppState.FINISHED, rmApp.getState());
+
+    } finally {
+      if (rmClient != null) {
+        rmClient.stop();
+      }
+      cluster.stop();
+    }
+  }
+
+  /*
+   * This test validates the token renewal from the AMRMPRoxy. The test verifies
+   * that the received token it is different from the previous one within 5
+   * requests.
+   */
+  @Test(timeout = 60000)
+  public void testE2ETokenRenewal() throws Exception {
+    MiniYARNCluster cluster =
+        new MiniYARNCluster("testE2ETokenRenewal", 1, 1, 1);
+    YarnClient rmClient = null;
+    ApplicationMasterProtocol client;
+
+    try {
+      Configuration conf = new YarnConfiguration();
+      conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+      conf.setInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 1500);
+      conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1500);
+      conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 1500);
+      // RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS should be at least
+      // RM_AM_EXPIRY_INTERVAL_MS * 1.5 *3
+      conf.setInt(
+          YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, 6);
+      cluster.init(conf);
+      cluster.start();
+      final Configuration yarnConf = cluster.getConfig();
+      yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+          YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
+      rmClient = YarnClient.createYarnClient();
+      rmClient.init(yarnConf);
+      rmClient.start();
+
+      // Submit
+
+      ApplicationId appId = createApp(rmClient, cluster);
+
+      client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
+
+      client.registerApplicationMaster(RegisterApplicationMasterRequest
+          .newInstance(NetUtils.getHostname(), 1024, ""));
+
+      LOG.info("testAMRMPRoxy - Allocate Resources Application Master");
+
+      AllocateRequest request =
+          createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
+
+      Token lastToken = null;
+      AllocateResponse response = null;
+
+      for (int i = 0; i < 5; i++) {
+
+        response = client.allocate(request);
+        request.setResponseId(request.getResponseId() + 1);
+
+        if (response.getAMRMToken() != null
+            && !response.getAMRMToken().equals(lastToken)) {
+          break;
+        }
+
+        lastToken = response.getAMRMToken();
+
+        // Time slot to be sure the RM renew the token
+        Thread.sleep(1500);
+
+      }
+
+      Assert.assertFalse(response.getAMRMToken().equals(lastToken));
+
+      LOG.info("testAMRMPRoxy - Finish Application Master");
+
+      client.finishApplicationMaster(FinishApplicationMasterRequest
+          .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
+
+    } finally {
+      if (rmClient != null) {
+        rmClient.stop();
+      }
+      cluster.stop();
+    }
+  }
+
+  /*
+   * This test validates that an AM cannot register directly to the RM, with the
+   * token provided by the AMRMProxy.
+   */
+  @Test(timeout = 60000)
+  public void testE2ETokenSwap() throws Exception {
+    MiniYARNCluster cluster = new MiniYARNCluster("testE2ETokenSwap", 1, 1, 1);
+    YarnClient rmClient = null;
+    ApplicationMasterProtocol client;
+
+    try {
+      Configuration conf = new YarnConfiguration();
+      conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+      cluster.init(conf);
+      cluster.start();
+
+      // the client will connect to the RM with the token provided by AMRMProxy
+      final Configuration yarnConf = cluster.getConfig();
+      rmClient = YarnClient.createYarnClient();
+      rmClient.init(yarnConf);
+      rmClient.start();
+
+      ApplicationId appId = createApp(rmClient, cluster);
+
+      client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
+
+      try {
+        client.registerApplicationMaster(RegisterApplicationMasterRequest
+            .newInstance(NetUtils.getHostname(), 1024, ""));
+        Assert.fail();
+      } catch (IOException e) {
+        Assert.assertTrue(
+            e.getMessage().startsWith("Invalid AMRMToken from appattempt_"));
+      }
+
+    } finally {
+      if (rmClient != null) {
+        rmClient.stop();
+      }
+      cluster.stop();
+    }
+  }
+
+  private ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient,
+      ApplicationId appId, MiniYARNCluster cluster,
+      final Configuration yarnConf)
+          throws IOException, InterruptedException, YarnException {
+
+    UserGroupInformation user = null;
+
+    // Get the AMRMToken from AMRMProxy
+
+    ApplicationReport report = rmClient.getApplicationReport(appId);
+
+    user = UserGroupInformation.createProxyUser(
+        report.getCurrentApplicationAttemptId().toString(),
+        UserGroupInformation.getCurrentUser());
+
+    ContainerManagerImpl containerManager = (ContainerManagerImpl) cluster
+        .getNodeManager(0).getNMContext().getContainerManager();
+
+    AMRMProxyTokenSecretManager amrmTokenSecretManager =
+        containerManager.getAMRMProxyService().getSecretManager();
+    org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
+        amrmTokenSecretManager
+            .createAndGetAMRMToken(report.getCurrentApplicationAttemptId());
+
+    SecurityUtil.setTokenService(token,
+        containerManager.getAMRMProxyService().getBindAddress());
+    user.addToken(token);
+
+    // Start Application Master
+
+    return user
+        .doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
+          @Override
+          public ApplicationMasterProtocol run() throws Exception {
+            return ClientRMProxy.createRMProxy(yarnConf,
+                ApplicationMasterProtocol.class);
+          }
+        });
+  }
+
+  private AllocateRequest createAllocateRequest(List<NodeReport> listNode) {
+    // The test needs AMRMClient to create a real allocate request
+    AMRMClientImpl<ContainerRequest> amClient =
+        new AMRMClientImpl<ContainerRequest>();
+
+    Resource capability = Resource.newInstance(1024, 2);
+    Priority priority = Priority.newInstance(1);
+    List<NodeReport> nodeReports = listNode;
+    String node = nodeReports.get(0).getNodeId().getHost();
+    String[] nodes = new String[] { node };
+
+    ContainerRequest storedContainer1 =
+        new ContainerRequest(capability, nodes, null, priority);
+    amClient.addContainerRequest(storedContainer1);
+    amClient.addContainerRequest(storedContainer1);
+
+    List<ResourceRequest> resourceAsk = new ArrayList<ResourceRequest>();
+    for (ResourceRequest rr : amClient.ask) {
+      resourceAsk.add(rr);
+    }
+
+    ResourceBlacklistRequest resourceBlacklistRequest = ResourceBlacklistRequest
+        .newInstance(new ArrayList<String>(), new ArrayList<String>());
+
+    int responseId = 1;
+
+    return AllocateRequest.newInstance(responseId, 0, resourceAsk,
+        new ArrayList<ContainerId>(), resourceBlacklistRequest);
+  }
+
+  private ApplicationId createApp(YarnClient yarnClient,
+      MiniYARNCluster yarnCluster) throws Exception {
+
+    ApplicationSubmissionContext appContext =
+        yarnClient.createApplication().getApplicationSubmissionContext();
+    ApplicationId appId = appContext.getApplicationId();
+
+    appContext.setApplicationName("Test");
+
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(0);
+    appContext.setPriority(pri);
+
+    appContext.setQueue("default");
+
+    ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext(
+        Collections.<String, LocalResource> emptyMap(),
+        new HashMap<String, String>(), Arrays.asList("sleep", "10000"),
+        new HashMap<String, ByteBuffer>(), null,
+        new HashMap<ApplicationAccessType, String>());
+    appContext.setAMContainerSpec(amContainer);
+    appContext.setResource(Resource.newInstance(1024, 1));
+
+    SubmitApplicationRequest appRequest =
+        Records.newRecord(SubmitApplicationRequest.class);
+    appRequest.setApplicationSubmissionContext(appContext);
+
+    yarnClient.submitApplication(appContext);
+
+    RMAppAttempt appAttempt = null;
+    while (true) {
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      if (appReport
+          .getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
+        ApplicationAttemptId attemptId =
+            appReport.getCurrentApplicationAttemptId();
+        appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
+            .get(attemptId.getApplicationId()).getCurrentAppAttempt();
+        while (true) {
+          if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
+            break;
+          }
+        }
+        break;
+      }
+    }
+    Thread.sleep(1000);
+    return appId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c81e374/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
index bd6538c..038c697 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -512,6 +513,16 @@ public class AMRMProxyService extends AbstractService implements
     return null;
   }
 
+  @Private
+  public InetSocketAddress getBindAddress() {
+    return this.listenerEndpoint;
+  }
+
+  @Private
+  public AMRMProxyTokenSecretManager getSecretManager() {
+    return this.secretManager;
+  }
+
   /**
    * Private class for handling application stop events.
    *
@@ -546,7 +557,8 @@ public class AMRMProxyService extends AbstractService implements
    * ApplicationAttemptId instances.
    *
    */
-  private static class RequestInterceptorChainWrapper {
+  @Private
+  public static class RequestInterceptorChainWrapper {
     private RequestInterceptor rootInterceptor;
     private ApplicationAttemptId applicationAttemptId;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c81e374/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
index 2c7939b..4457dd8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Extends the AbstractRequestInterceptor class and provides an implementation
  * that simply forwards the AM requests to the cluster resource manager.
@@ -135,4 +137,9 @@ public final class DefaultRequestInterceptor extends
     user.addToken(amrmToken);
     amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf()));
   }
+
+  @VisibleForTesting
+  public void setRMClient(ApplicationMasterProtocol rmClient) {
+    this.rmClient = rmClient;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c81e374/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index e3d8f1d..a156dfc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -183,7 +183,7 @@ public class ContainerManagerImpl extends CompositeService implements
   private final ReadLock readLock;
   private final WriteLock writeLock;
   private AMRMProxyService amrmProxyService;
-  private boolean amrmProxyEnabled = false;
+  protected boolean amrmProxyEnabled = false;
 
   private long waitForContainersOnShutdownMillis;
 
@@ -247,19 +247,7 @@ public class ContainerManagerImpl extends CompositeService implements
     addService(sharedCacheUploader);
     dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);
 
-    amrmProxyEnabled =
-        conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
-            YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
-
-    if (amrmProxyEnabled) {
-      LOG.info("AMRMProxyService is enabled. "
-          + "All the AM->RM requests will be intercepted by the proxy");
-      this.amrmProxyService =
-          new AMRMProxyService(this.context, this.dispatcher);
-      addService(this.amrmProxyService);
-    } else {
-      LOG.info("AMRMProxyService is disabled");
-    }
+    createAMRMProxyService(conf);
 
     waitForContainersOnShutdownMillis =
         conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
@@ -272,8 +260,20 @@ public class ContainerManagerImpl extends CompositeService implements
     recover();
   }
 
-  public boolean isARMRMProxyEnabled() {
-    return amrmProxyEnabled;
+  protected void createAMRMProxyService(Configuration conf) {
+    this.amrmProxyEnabled =
+        conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
+            YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
+
+    if (amrmProxyEnabled) {
+      LOG.info("AMRMProxyService is enabled. "
+          + "All the AM->RM requests will be intercepted by the proxy");
+      this.setAMRMProxyService(
+          new AMRMProxyService(this.context, this.dispatcher));
+      addService(this.getAMRMProxyService());
+    } else {
+      LOG.info("AMRMProxyService is disabled");
+    }
   }
 
   @SuppressWarnings("unchecked")
@@ -796,9 +796,9 @@ public class ContainerManagerImpl extends CompositeService implements
 
           // Initialize the AMRMProxy service instance only if the container is of
           // type AM and if the AMRMProxy service is enabled
-          if (isARMRMProxyEnabled() && containerTokenIdentifier
-              .getContainerType().equals(ContainerType.APPLICATION_MASTER)) {
-            this.amrmProxyService.processApplicationStartRequest(request);
+          if (amrmProxyEnabled && containerTokenIdentifier.getContainerType()
+              .equals(ContainerType.APPLICATION_MASTER)) {
+            this.getAMRMProxyService().processApplicationStartRequest(request);
           }
 
           startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
@@ -1399,4 +1399,15 @@ public class ContainerManagerImpl extends CompositeService implements
   public Map<String, ByteBuffer> getAuxServiceMetaData() {
     return this.auxiliaryServices.getMetaData();
   }
+
+  @Private
+  public AMRMProxyService getAMRMProxyService() {
+    return this.amrmProxyService;
+  }
+
+  @Private
+  protected void setAMRMProxyService(AMRMProxyService amrmProxyService) {
+    this.amrmProxyService = amrmProxyService;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7c81e374/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
index 3d557d3..9f18dcc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
@@ -35,21 +35,23 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -61,24 +63,31 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
-import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
 import org.apache.hadoop.yarn.server.timeline.TimelineStore;
 import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
 import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -697,6 +706,15 @@ public class MiniYARNCluster extends CompositeService {
         protected void stopRMProxy() { }
       };
     }
+
+    @Override
+    protected ContainerManagerImpl createContainerManager(Context context,
+        ContainerExecutor exec, DeletionService del,
+        NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
+        LocalDirsHandlerService dirsHandler) {
+      return new CustomContainerManagerImpl(context, exec, del,
+          nodeStatusUpdater, metrics, dirsHandler);
+    }
   }
 
   /**
@@ -798,4 +816,55 @@ public class MiniYARNCluster extends CompositeService {
   public int getNumOfResourceManager() {
     return this.resourceManagers.length;
   }
+
+  private class CustomContainerManagerImpl extends ContainerManagerImpl {
+
+    public CustomContainerManagerImpl(Context context, ContainerExecutor exec,
+        DeletionService del, NodeStatusUpdater nodeStatusUpdater,
+        NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
+      super(context, exec, del, nodeStatusUpdater, metrics, dirsHandler);
+    }
+
+    @Override
+    protected void createAMRMProxyService(Configuration conf) {
+      this.amrmProxyEnabled =
+          conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
+              YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
+
+      if (this.amrmProxyEnabled) {
+        LOG.info("CustomAMRMProxyService is enabled. "
+            + "All the AM->RM requests will be intercepted by the proxy");
+        AMRMProxyService amrmProxyService =
+            useRpc ? new AMRMProxyService(getContext(), dispatcher)
+                : new ShortCircuitedAMRMProxy(getContext(), dispatcher);
+        this.setAMRMProxyService(amrmProxyService);
+        addService(this.getAMRMProxyService());
+      } else {
+        LOG.info("CustomAMRMProxyService is disabled");
+      }
+    }
+  }
+
+  private class ShortCircuitedAMRMProxy extends AMRMProxyService {
+
+    public ShortCircuitedAMRMProxy(Context context,
+        AsyncDispatcher dispatcher) {
+      super(context, dispatcher);
+    }
+
+    @Override
+    protected void initializePipeline(ApplicationAttemptId applicationAttemptId,
+        String user, Token<AMRMTokenIdentifier> amrmToken,
+        Token<AMRMTokenIdentifier> localToken) {
+      super.initializePipeline(applicationAttemptId, user, amrmToken,
+          localToken);
+      RequestInterceptor rt = getPipelines()
+          .get(applicationAttemptId.getApplicationId()).getRootInterceptor();
+      if (rt instanceof DefaultRequestInterceptor) {
+        ((DefaultRequestInterceptor) rt)
+            .setRMClient(getResourceManager().getApplicationMasterService());
+      }
+    }
+
+  }
 }


Mime
View raw message