Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E9953200D44 for ; Mon, 20 Nov 2017 23:22:14 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E843E160BE1; Mon, 20 Nov 2017 22:22:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A0B68160BF9 for ; Mon, 20 Nov 2017 23:22:12 +0100 (CET) Received: (qmail 1548 invoked by uid 500); 20 Nov 2017 22:22:11 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 1539 invoked by uid 99); 20 Nov 2017 22:22:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Nov 2017 22:22:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 78B04DF9FD; Mon, 20 Nov 2017 22:22:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: subru@apache.org To: common-commits@hadoop.apache.org Date: Mon, 20 Nov 2017 22:22:10 -0000 Message-Id: <1cd4d9c3c2e64b00af07b44e15e4726c@git.apache.org> In-Reply-To: <39aa34478c2944078e53faf7fd8f4533@git.apache.org> References: <39aa34478c2944078e53faf7fd8f4533@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hadoop git commit: YARN-6128. Add support for AMRMProxy HA. (Botong Huang via Subru). archived-at: Mon, 20 Nov 2017 22:22:15 -0000 YARN-6128. Add support for AMRMProxy HA. (Botong Huang via Subru). Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ed310913 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ed310913 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ed310913 Branch: refs/heads/branch-2 Commit: ed3109136100a21d971484f242d80f2a7e7d337d Parents: ea8a121 Author: Subru Krishnan Authored: Mon Nov 20 14:21:58 2017 -0800 Committer: Subru Krishnan Committed: Mon Nov 20 14:21:58 2017 -0800 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 13 + .../src/main/resources/yarn-default.xml | 21 ++ .../hadoop-yarn-server-common/pom.xml | 5 + .../utils/FederationRegistryClient.java | 338 +++++++++++++++++++ .../yarn/server/uam/UnmanagedAMPoolManager.java | 140 ++++++-- .../server/uam/UnmanagedApplicationManager.java | 212 +++++++----- .../yarn/server/utils/AMRMClientUtils.java | 30 +- .../yarn/server/MockResourceManagerFacade.java | 100 +++--- .../utils/TestFederationRegistryClient.java | 90 +++++ .../uam/TestUnmanagedApplicationManager.java | 100 +++++- .../amrmproxy/AMRMProxyApplicationContext.java | 16 + .../AMRMProxyApplicationContextImpl.java | 35 +- .../nodemanager/amrmproxy/AMRMProxyService.java | 83 ++++- .../amrmproxy/FederationInterceptor.java | 221 +++++++++++- .../containermanager/ContainerManagerImpl.java | 9 +- .../amrmproxy/BaseAMRMProxyTest.java | 12 +- .../amrmproxy/TestAMRMProxyService.java | 21 +- .../amrmproxy/TestFederationInterceptor.java | 126 ++++++- .../TestableFederationInterceptor.java | 29 +- .../hadoop/yarn/server/MiniYARNCluster.java | 6 +- .../src/site/markdown/Federation.md | 11 +- 21 files changed, 1341 insertions(+), 277 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed310913/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a97dc57..edeec9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1948,6 +1948,9 @@ public class YarnConfiguration extends Configuration { public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE = "org.apache.hadoop.yarn.server.nodemanager.amrmproxy." + "DefaultRequestInterceptor"; + public static final String AMRM_PROXY_HA_ENABLED = NM_PREFIX + + "amrmproxy.ha.enable"; + public static final boolean DEFAULT_AMRM_PROXY_HA_ENABLED = false; /** * Default platform-agnostic CLASSPATH for YARN applications. A @@ -2790,6 +2793,11 @@ public class YarnConfiguration extends Configuration { public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS = FEDERATION_PREFIX + "cache-ttl.secs"; + public static final String FEDERATION_REGISTRY_BASE_KEY = + FEDERATION_PREFIX + "registry.base-dir"; + public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY = + "yarnfederation/"; + // 5 minutes public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60; @@ -2947,6 +2955,11 @@ public class YarnConfiguration extends Configuration { // Other Configs //////////////////////////////// + public static final String YARN_REGISTRY_CLASS = + YARN_PREFIX + "registry.class"; + public static final String DEFAULT_YARN_REGISTRY_CLASS = + "org.apache.hadoop.registry.client.impl.FSRegistryOperationsService"; + /** * Use YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS instead. * The interval of the yarn client's querying application state after http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed310913/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 46fb7c7..71dd72a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2815,7 +2815,20 @@ 300 + + The registry base directory for federation. + yarn.federation.registry.base-dir + yarnfederation/ + + + + + The registry implementation to use. + yarn.registry.class + org.apache.hadoop.registry.client.impl.FSRegistryOperationsService + + The interval that the yarn client library uses to poll the completion status of the asynchronous API of application client protocol. @@ -2978,6 +2991,14 @@ + Whether AMRMProxy HA is enabled. + + yarn.nodemanager.amrmproxy.ha.enable + false + + + + Setting that controls whether distributed scheduling is enabled. yarn.nodemanager.distributed-scheduling.enabled http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed310913/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 4726553..0af5a9a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -68,6 +68,11 @@ + org.apache.hadoop + hadoop-yarn-registry + + + com.google.guava guava http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed310913/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java new file mode 100644 index 0000000..6624318 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.utils; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.registry.client.api.BindFlags; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Helper class that handles reads and writes to Yarn Registry to support UAM HA + * and second attempt. + */ +public class FederationRegistryClient { + private static final Logger LOG = + LoggerFactory.getLogger(FederationRegistryClient.class); + + private RegistryOperations registry; + + private UserGroupInformation user; + + // AppId -> SubClusterId -> UAM token + private Map>> + appSubClusterTokenMap; + + // Structure in registry: // -> UAMToken + private String registryBaseDir; + + public FederationRegistryClient(Configuration conf, + RegistryOperations registry, UserGroupInformation user) { + this.registry = registry; + this.user = user; + this.appSubClusterTokenMap = new ConcurrentHashMap<>(); + this.registryBaseDir = + conf.get(YarnConfiguration.FEDERATION_REGISTRY_BASE_KEY, + YarnConfiguration.DEFAULT_FEDERATION_REGISTRY_BASE_KEY); + LOG.info("Using registry {} with base directory: {}", + this.registry.getClass().getName(), this.registryBaseDir); + } + + /** + * Get the list of known applications in the registry. + * + * @return the list of known applications + */ + public List getAllApplications() { + // Suppress the exception here because it is valid that the entry does not + // exist + List applications = null; + try { + applications = listDirRegistry(this.registry, this.user, + getRegistryKey(null, null), false); + } catch (YarnException e) { + LOG.warn("Unexpected exception from listDirRegistry", e); + } + if (applications == null) { + // It is valid for listDirRegistry to return null + return new ArrayList<>(); + } + return applications; + } + + /** + * For testing, delete all application records in registry. + */ + @VisibleForTesting + public void cleanAllApplications() { + try { + removeKeyRegistry(this.registry, this.user, getRegistryKey(null, null), + true, false); + } catch (YarnException e) { + LOG.warn("Unexpected exception from removeKeyRegistry", e); + } + } + + /** + * Write/update the UAM token for an application and a sub-cluster. + * + * @param subClusterId sub-cluster id of the token + * @param token the UAM of the application + * @return whether the amrmToken is added or updated to a new value + */ + public boolean writeAMRMTokenForUAM(ApplicationId appId, + String subClusterId, Token token) { + Map> subClusterTokenMap = + this.appSubClusterTokenMap.get(appId); + if (subClusterTokenMap == null) { + subClusterTokenMap = new ConcurrentHashMap<>(); + this.appSubClusterTokenMap.put(appId, subClusterTokenMap); + } + + boolean update = !token.equals(subClusterTokenMap.get(subClusterId)); + if (!update) { + LOG.debug("Same amrmToken received from {}, skip writing registry for {}", + subClusterId, appId); + return update; + } + + LOG.info("Writing/Updating amrmToken for {} to registry for {}", + subClusterId, appId); + try { + // First, write the token entry + writeRegistry(this.registry, this.user, + getRegistryKey(appId, subClusterId), token.encodeToUrlString(), true); + + // Then update the subClusterTokenMap + subClusterTokenMap.put(subClusterId, token); + } catch (YarnException | IOException e) { + LOG.error( + "Failed writing AMRMToken to registry for subcluster " + subClusterId, + e); + } + return update; + } + + /** + * Load the information of one application from registry. + * + * @param appId application id + * @return the sub-cluster to UAM token mapping + */ + public Map> + loadStateFromRegistry(ApplicationId appId) { + Map> retMap = new HashMap<>(); + // Suppress the exception here because it is valid that the entry does not + // exist + List subclusters = null; + try { + subclusters = listDirRegistry(this.registry, this.user, + getRegistryKey(appId, null), false); + } catch (YarnException e) { + LOG.warn("Unexpected exception from listDirRegistry", e); + } + + if (subclusters == null) { + LOG.info("Application {} does not exist in registry", appId); + return retMap; + } + + // Read the amrmToken for each sub-cluster with an existing UAM + for (String scId : subclusters) { + LOG.info("Reading amrmToken for subcluster {} for {}", scId, appId); + String key = getRegistryKey(appId, scId); + try { + String tokenString = readRegistry(this.registry, this.user, key, true); + if (tokenString == null) { + throw new YarnException("Null string from readRegistry key " + key); + } + Token amrmToken = new Token<>(); + amrmToken.decodeFromUrlString(tokenString); + // Clear the service field, as if RM just issued the token + amrmToken.setService(new Text()); + + retMap.put(scId, amrmToken); + } catch (Exception e) { + LOG.error("Failed reading registry key " + key + + ", skipping subcluster " + scId, e); + } + } + + // Override existing map if there + this.appSubClusterTokenMap.put(appId, new ConcurrentHashMap<>(retMap)); + return retMap; + } + + /** + * Remove an application from registry. + * + * @param appId application id + */ + public void removeAppFromRegistry(ApplicationId appId) { + Map> subClusterTokenMap = + this.appSubClusterTokenMap.get(appId); + LOG.info("Removing all registry entries for {}", appId); + + if (subClusterTokenMap == null || subClusterTokenMap.size() == 0) { + return; + } + + // Lastly remove the application directory + String key = getRegistryKey(appId, null); + try { + removeKeyRegistry(this.registry, this.user, key, true, true); + subClusterTokenMap.clear(); + } catch (YarnException e) { + LOG.error("Failed removing registry directory key " + key, e); + } + } + + private String getRegistryKey(ApplicationId appId, String fileName) { + if (appId == null) { + return this.registryBaseDir; + } + if (fileName == null) { + return this.registryBaseDir + appId.toString(); + } + return this.registryBaseDir + appId.toString() + "/" + fileName; + } + + private String readRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final boolean throwIfFails) + throws YarnException { + // Use the ugi loaded with app credentials to access registry + String result = ugi.doAs(new PrivilegedAction() { + @Override + public String run() { + try { + ServiceRecord value = registryImpl.resolve(key); + if (value != null) { + return value.description; + } + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry resolve key " + key + " failed", e); + } + } + return null; + } + }); + if (result == null && throwIfFails) { + throw new YarnException("Registry resolve key " + key + " failed"); + } + return result; + } + + private void removeKeyRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final boolean recursive, + final boolean throwIfFails) throws YarnException { + // Use the ugi loaded with app credentials to access registry + boolean success = ugi.doAs(new PrivilegedAction() { + @Override + public Boolean run() { + try { + registryImpl.delete(key, recursive); + return true; + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry remove key " + key + " failed", e); + } + } + return false; + } + }); + if (!success && throwIfFails) { + throw new YarnException("Registry remove key " + key + " failed"); + } + } + + /** + * Write registry entry, override if exists. + */ + private void writeRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final String value, + final boolean throwIfFails) throws YarnException { + + final ServiceRecord recordValue = new ServiceRecord(); + recordValue.description = value; + // Use the ugi loaded with app credentials to access registry + boolean success = ugi.doAs(new PrivilegedAction() { + @Override + public Boolean run() { + try { + registryImpl.bind(key, recordValue, BindFlags.OVERWRITE); + return true; + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry write key " + key + " failed", e); + } + } + return false; + } + }); + if (!success && throwIfFails) { + throw new YarnException("Registry write key " + key + " failed"); + } + } + + /** + * List the sub directories in the given directory. + */ + private List listDirRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final boolean throwIfFails) + throws YarnException { + List result = ugi.doAs(new PrivilegedAction>() { + @Override + public List run() { + try { + return registryImpl.list(key); + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry list key " + key + " failed", e); + } + } + return null; + } + }); + if (result == null && throwIfFails) { + throw new YarnException("Registry list key " + key + " failed"); + } + return result; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed310913/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index 08aee77..0c01217 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -44,9 +45,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.util.AsyncCallback; import org.slf4j.Logger; @@ -67,7 +68,7 @@ public class UnmanagedAMPoolManager extends AbstractService { // Map from uamId to UAM instances private Map unmanagedAppMasterMap; - private Map attemptIdMap; + private Map appIdMap; private ExecutorService threadpool; @@ -82,7 +83,7 @@ public class UnmanagedAMPoolManager extends AbstractService { this.threadpool = Executors.newCachedThreadPool(); } this.unmanagedAppMasterMap = new ConcurrentHashMap<>(); - this.attemptIdMap = new ConcurrentHashMap<>(); + this.appIdMap = new ConcurrentHashMap<>(); super.serviceStart(); } @@ -114,7 +115,7 @@ public class UnmanagedAMPoolManager extends AbstractService { public KillApplicationResponse call() throws Exception { try { LOG.info("Force-killing UAM id " + uamId + " for application " - + attemptIdMap.get(uamId)); + + appIdMap.get(uamId)); return unmanagedAppMasterMap.remove(uamId).forceKillApplication(); } catch (Exception e) { LOG.error("Failed to kill unmanaged application master", e); @@ -132,7 +133,7 @@ public class UnmanagedAMPoolManager extends AbstractService { LOG.error("Failed to kill unmanaged application master", e); } } - this.attemptIdMap.clear(); + this.appIdMap.clear(); super.serviceStop(); } @@ -145,13 +146,18 @@ public class UnmanagedAMPoolManager extends AbstractService { * @param queueName queue of the application * @param submitter submitter name of the UAM * @param appNameSuffix application name suffix for the UAM + * @param keepContainersAcrossApplicationAttempts keep container flag for UAM + * recovery. + * @see ApplicationSubmissionContext + * #setKeepContainersAcrossApplicationAttempts(boolean) * @return uamId for the UAM * @throws YarnException if registerApplicationMaster fails * @throws IOException if registerApplicationMaster fails */ public String createAndRegisterNewUAM( RegisterApplicationMasterRequest registerRequest, Configuration conf, - String queueName, String submitter, String appNameSuffix) + String queueName, String submitter, String appNameSuffix, + boolean keepContainersAcrossApplicationAttempts) throws YarnException, IOException { ApplicationId appId = null; ApplicationClientProtocol rmClient; @@ -173,45 +179,93 @@ public class UnmanagedAMPoolManager extends AbstractService { rmClient = null; } - createAndRegisterNewUAM(appId.toString(), registerRequest, conf, appId, - queueName, submitter, appNameSuffix); + // Launch the UAM in RM + launchUAM(appId.toString(), conf, appId, queueName, submitter, + appNameSuffix, keepContainersAcrossApplicationAttempts); + + // Register the UAM application + registerApplicationMaster(appId.toString(), registerRequest); + + // Returns the appId as uamId return appId.toString(); } /** - * Create a new UAM and register the application, using the provided uamId and - * appId. + * Launch a new UAM, using the provided uamId and appId. * - * @param uamId identifier for the UAM - * @param registerRequest RegisterApplicationMasterRequest + * @param uamId uam Id * @param conf configuration for this UAM * @param appId application id for the UAM * @param queueName queue of the application * @param submitter submitter name of the UAM * @param appNameSuffix application name suffix for the UAM - * @return RegisterApplicationMasterResponse - * @throws YarnException if registerApplicationMaster fails - * @throws IOException if registerApplicationMaster fails + * @param keepContainersAcrossApplicationAttempts keep container flag for UAM + * recovery. + * @see ApplicationSubmissionContext + * #setKeepContainersAcrossApplicationAttempts(boolean) + * @return UAM token + * @throws YarnException if fails + * @throws IOException if fails */ - public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId, - RegisterApplicationMasterRequest registerRequest, Configuration conf, + public Token launchUAM(String uamId, Configuration conf, + ApplicationId appId, String queueName, String submitter, + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) + throws YarnException, IOException { + + if (this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " already exists"); + } + UnmanagedApplicationManager uam = createUAM(conf, appId, queueName, + submitter, appNameSuffix, keepContainersAcrossApplicationAttempts); + // Put the UAM into map first before initializing it to avoid additional UAM + // for the same uamId being created concurrently + this.unmanagedAppMasterMap.put(uamId, uam); + + Token amrmToken = null; + try { + LOG.info("Launching UAM id {} for application {}", uamId, appId); + amrmToken = uam.launchUAM(); + } catch (Exception e) { + // Add the map earlier and remove here if register failed because we want + // to make sure there is only one uam instance per uamId at any given time + this.unmanagedAppMasterMap.remove(uamId); + throw e; + } + + this.appIdMap.put(uamId, uam.getAppId()); + return amrmToken; + } + + /** + * Re-attach to an existing UAM, using the provided uamIdentifier. + * + * @param uamId uam Id + * @param conf configuration for this UAM + * @param appId application id for the UAM + * @param queueName queue of the application + * @param submitter submitter name of the UAM + * @param appNameSuffix application name suffix for the UAM + * @param uamToken UAM token + * @throws YarnException if fails + * @throws IOException if fails + */ + public void reAttachUAM(String uamId, Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) throws YarnException, IOException { + String appNameSuffix, Token uamToken) + throws YarnException, IOException { if (this.unmanagedAppMasterMap.containsKey(uamId)) { throw new YarnException("UAM " + uamId + " already exists"); } UnmanagedApplicationManager uam = - createUAM(conf, appId, queueName, submitter, appNameSuffix); + createUAM(conf, appId, queueName, submitter, appNameSuffix, true); // Put the UAM into map first before initializing it to avoid additional UAM // for the same uamId being created concurrently this.unmanagedAppMasterMap.put(uamId, uam); - RegisterApplicationMasterResponse response = null; try { - LOG.info("Creating and registering UAM id {} for application {}", uamId, - appId); - response = uam.createAndRegisterApplicationMaster(registerRequest); + LOG.info("Reattaching UAM id {} for application {}", uamId, appId); + uam.reAttachUAM(uamToken); } catch (Exception e) { // Add the map earlier and remove here if register failed because we want // to make sure there is only one uam instance per uamId at any given time @@ -219,8 +273,7 @@ public class UnmanagedAMPoolManager extends AbstractService { throw e; } - this.attemptIdMap.put(uamId, uam.getAttemptId()); - return response; + this.appIdMap.put(uamId, uam.getAppId()); } /** @@ -231,20 +284,42 @@ public class UnmanagedAMPoolManager extends AbstractService { * @param queueName queue of the application * @param submitter submitter name of the application * @param appNameSuffix application name suffix + * @param keepContainersAcrossApplicationAttempts keep container flag for UAM * @return the UAM instance */ @VisibleForTesting protected UnmanagedApplicationManager createUAM(Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) { + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { return new UnmanagedApplicationManager(conf, appId, queueName, submitter, - appNameSuffix); + appNameSuffix, keepContainersAcrossApplicationAttempts); + } + + /** + * Register application master for the UAM. + * + * @param uamId uam Id + * @param registerRequest RegisterApplicationMasterRequest + * @return register response + * @throws YarnException if register fails + * @throws IOException if register fails + */ + public RegisterApplicationMasterResponse registerApplicationMaster( + String uamId, RegisterApplicationMasterRequest registerRequest) + throws YarnException, IOException { + if (!this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " does not exist"); + } + LOG.info("Registering UAM id {} for application {}", uamId, + this.appIdMap.get(uamId)); + return this.unmanagedAppMasterMap.get(uamId) + .registerApplicationMaster(registerRequest); } /** * AllocateAsync to an UAM. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @param request AllocateRequest * @param callback callback for response * @throws YarnException if allocate fails @@ -262,7 +337,7 @@ public class UnmanagedAMPoolManager extends AbstractService { /** * Finish an UAM/application. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @param request FinishApplicationMasterRequest * @return FinishApplicationMasterResponse * @throws YarnException if finishApplicationMaster call fails @@ -274,14 +349,15 @@ public class UnmanagedAMPoolManager extends AbstractService { if (!this.unmanagedAppMasterMap.containsKey(uamId)) { throw new YarnException("UAM " + uamId + " does not exist"); } - LOG.info("Finishing application for UAM id {} ", uamId); + LOG.info("Finishing UAM id {} for application {}", uamId, + this.appIdMap.get(uamId)); FinishApplicationMasterResponse response = this.unmanagedAppMasterMap.get(uamId).finishApplicationMaster(request); if (response.getIsUnregistered()) { // Only remove the UAM when the unregister finished this.unmanagedAppMasterMap.remove(uamId); - this.attemptIdMap.remove(uamId); + this.appIdMap.remove(uamId); LOG.info("UAM id {} is unregistered", uamId); } return response; @@ -301,7 +377,7 @@ public class UnmanagedAMPoolManager extends AbstractService { /** * Return whether an UAM exists. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @return UAM exists or not */ public boolean hasUAMId(String uamId) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed310913/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 6531a75..3f4a110 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -50,7 +50,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -90,7 +92,6 @@ public class UnmanagedApplicationManager { private AMRequestHandlerThread handlerThread; private ApplicationMasterProtocol rmProxy; private ApplicationId applicationId; - private ApplicationAttemptId attemptId; private String submitter; private String appNameSuffix; private Configuration conf; @@ -101,9 +102,31 @@ public class UnmanagedApplicationManager { private ApplicationClientProtocol rmClient; private long asyncApiPollIntervalMillis; private RecordFactory recordFactory; + private boolean keepContainersAcrossApplicationAttempts; + /* + * This flag is used as an indication that this method launchUAM/reAttachUAM + * is called (and perhaps blocked in initializeUnmanagedAM below due to RM + * connection/failover issue and not finished yet). Set the flag before + * calling the blocking call to RM. + */ + private boolean connectionInitiated; + + /** + * Constructor. + * + * @param conf configuration + * @param appId application Id to use for this UAM + * @param queueName the queue of the UAM + * @param submitter user name of the app + * @param appNameSuffix the app name suffix to use + * @param keepContainersAcrossApplicationAttempts keep container flag for UAM + * recovery. See {@link ApplicationSubmissionContext + * #setKeepContainersAcrossApplicationAttempts(boolean)} + */ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, - String queueName, String submitter, String appNameSuffix) { + String queueName, String submitter, String appNameSuffix, + boolean keepContainersAcrossApplicationAttempts) { Preconditions.checkNotNull(conf, "Configuration cannot be null"); Preconditions.checkNotNull(appId, "ApplicationId cannot be null"); Preconditions.checkNotNull(submitter, "App submitter cannot be null"); @@ -116,6 +139,7 @@ public class UnmanagedApplicationManager { this.handlerThread = new AMRequestHandlerThread(); this.requestQueue = new LinkedBlockingQueue<>(); this.rmProxy = null; + this.connectionInitiated = false; this.registerRequest = null; this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); this.asyncApiPollIntervalMillis = conf.getLong( @@ -123,45 +147,84 @@ public class UnmanagedApplicationManager { YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, YarnConfiguration. DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); + this.keepContainersAcrossApplicationAttempts = + keepContainersAcrossApplicationAttempts; + } + + /** + * Launch a new UAM in the resource manager. + * + * @return identifier uam identifier + * @throws YarnException if fails + * @throws IOException if fails + */ + public Token launchUAM() + throws YarnException, IOException { + this.connectionInitiated = true; + + // Blocking call to RM + Token amrmToken = + initializeUnmanagedAM(this.applicationId); + + // Creates the UAM connection + createUAMProxy(amrmToken); + return amrmToken; + } + + /** + * Re-attach to an existing UAM in the resource manager. + * + * @param amrmToken the UAM token + * @throws IOException if re-attach fails + * @throws YarnException if re-attach fails + */ + public void reAttachUAM(Token amrmToken) + throws IOException, YarnException { + this.connectionInitiated = true; + + // Creates the UAM connection + createUAMProxy(amrmToken); + } + + protected void createUAMProxy(Token amrmToken) + throws IOException { + this.userUgi = UserGroupInformation.createProxyUser( + this.applicationId.toString(), UserGroupInformation.getCurrentUser()); + this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf, + this.userUgi, amrmToken); } /** * Registers this {@link UnmanagedApplicationManager} with the resource * manager. * - * @param request the register request - * @return the register response + * @param request RegisterApplicationMasterRequest + * @return register response * @throws YarnException if register fails * @throws IOException if register fails */ - public RegisterApplicationMasterResponse createAndRegisterApplicationMaster( + public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException { - // This need to be done first in this method, because it is used as an - // indication that this method is called (and perhaps blocked due to RM - // connection and not finished yet) + // Save the register request for re-register later this.registerRequest = request; - // attemptId will be available after this call - UnmanagedAMIdentifier identifier = - initializeUnmanagedAM(this.applicationId); - - try { - this.userUgi = UserGroupInformation.createProxyUser( - identifier.getAttemptId().toString(), - UserGroupInformation.getCurrentUser()); - } catch (IOException e) { - LOG.error("Exception while trying to get current user", e); - throw new YarnRuntimeException(e); - } - - this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf, - this.userUgi, identifier.getToken()); - - LOG.info("Registering the Unmanaged application master {}", this.attemptId); + // Since we have setKeepContainersAcrossApplicationAttempts = true for UAM. + // We do not expect application already registered exception here + LOG.info("Registering the Unmanaged application master {}", + this.applicationId); RegisterApplicationMasterResponse response = this.rmProxy.registerApplicationMaster(this.registerRequest); + for (Container container : response.getContainersFromPreviousAttempts()) { + LOG.info("RegisterUAM returned existing running container " + + container.getId()); + } + for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) { + LOG.info("RegisterUAM returned existing NM token for node " + + nmToken.getNodeId()); + } + // Only when register succeed that we start the heartbeat thread this.handlerThread.setUncaughtExceptionHandler( new HeartBeatThreadUncaughtExceptionHandler()); @@ -187,11 +250,11 @@ public class UnmanagedApplicationManager { this.handlerThread.shutdown(); if (this.rmProxy == null) { - if (this.registerRequest != null) { - // This is possible if the async registerApplicationMaster is still + if (this.connectionInitiated) { + // This is possible if the async launchUAM is still // blocked and retrying. Return a dummy response in this case. LOG.warn("Unmanaged AM still not successfully launched/registered yet." - + " Stopping the UAM client thread anyways."); + + " Stopping the UAM heartbeat thread anyways."); return FinishApplicationMasterResponse.newInstance(false); } else { throw new YarnException("finishApplicationMaster should not " @@ -199,7 +262,7 @@ public class UnmanagedApplicationManager { } } return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy, - this.registerRequest, this.attemptId); + this.registerRequest, this.applicationId); } /** @@ -212,7 +275,7 @@ public class UnmanagedApplicationManager { public KillApplicationResponse forceKillApplication() throws IOException, YarnException { KillApplicationRequest request = - KillApplicationRequest.newInstance(this.attemptId.getApplicationId()); + KillApplicationRequest.newInstance(this.applicationId); this.handlerThread.shutdown(); @@ -240,29 +303,29 @@ public class UnmanagedApplicationManager { LOG.debug("Interrupted while waiting to put on response queue", ex); } // Two possible cases why the UAM is not successfully registered yet: - // 1. registerApplicationMaster is not called at all. Should throw here. - // 2. registerApplicationMaster is called but hasn't successfully returned. + // 1. launchUAM is not called at all. Should throw here. + // 2. launchUAM is called but hasn't successfully returned. // // In case 2, we have already save the allocate request above, so if the // registration succeed later, no request is lost. if (this.rmProxy == null) { - if (this.registerRequest != null) { + if (this.connectionInitiated) { LOG.info("Unmanaged AM still not successfully launched/registered yet." + " Saving the allocate request and send later."); } else { throw new YarnException( - "AllocateAsync should not be called before createAndRegister"); + "AllocateAsync should not be called before launchUAM"); } } } /** - * Returns the application attempt id of the UAM. + * Returns the application id of the UAM. * - * @return attempt id of the UAM + * @return application id of the UAM */ - public ApplicationAttemptId getAttemptId() { - return this.attemptId; + public ApplicationId getAppId() { + return this.applicationId; } /** @@ -287,15 +350,15 @@ public class UnmanagedApplicationManager { * Launch and initialize an unmanaged AM. First, it creates a new application * on the RM and negotiates a new attempt id. Then it waits for the RM * application attempt state to reach YarnApplicationAttemptState.LAUNCHED - * after which it returns the AM-RM token and the attemptId. + * after which it returns the AM-RM token. * * @param appId application id - * @return the UAM identifier + * @return the UAM token * @throws IOException if initialize fails * @throws YarnException if initialize fails */ - protected UnmanagedAMIdentifier initializeUnmanagedAM(ApplicationId appId) - throws IOException, YarnException { + protected Token initializeUnmanagedAM( + ApplicationId appId) throws IOException, YarnException { try { UserGroupInformation appSubmitter = UserGroupInformation.createRemoteUser(this.submitter); @@ -306,13 +369,12 @@ public class UnmanagedApplicationManager { submitUnmanagedApp(appId); // Monitor the application attempt to wait for launch state - ApplicationAttemptReport attemptReport = monitorCurrentAppAttempt(appId, + monitorCurrentAppAttempt(appId, EnumSet.of(YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING, YarnApplicationState.KILLED, YarnApplicationState.FAILED, YarnApplicationState.FINISHED), YarnApplicationAttemptState.LAUNCHED); - this.attemptId = attemptReport.getApplicationAttemptId(); - return getUAMIdentifier(); + return getUAMToken(); } finally { this.rmClient = null; } @@ -343,6 +405,8 @@ public class UnmanagedApplicationManager { submitRequest.setApplicationSubmissionContext(context); context.setUnmanagedAM(true); + context.setKeepContainersAcrossApplicationAttempts( + this.keepContainersAcrossApplicationAttempts); LOG.info("Submitting unmanaged application {}", appId); this.rmClient.submitApplication(submitRequest); @@ -374,8 +438,10 @@ public class UnmanagedApplicationManager { if (appStates.contains(state)) { if (state != YarnApplicationState.ACCEPTED) { throw new YarnRuntimeException( - "Received non-accepted application state: " + state - + ". Application " + appId + " not the first attempt?"); + "Received non-accepted application state: " + state + " for " + + appId + ". This is likely because this is not the first " + + "app attempt in home sub-cluster, and AMRMProxy HA " + + "(yarn.nodemanager.amrmproxy.ha.enable) is not enabled."); } appAttemptId = getApplicationReport(appId).getCurrentApplicationAttemptId(); @@ -415,25 +481,25 @@ public class UnmanagedApplicationManager { } /** - * Gets the identifier of the unmanaged AM. + * Gets the amrmToken of the unmanaged AM. * - * @return the identifier of the unmanaged AM. + * @return the amrmToken of the unmanaged AM. * @throws IOException if getApplicationReport fails * @throws YarnException if getApplicationReport fails */ - protected UnmanagedAMIdentifier getUAMIdentifier() + protected Token getUAMToken() throws IOException, YarnException { Token token = null; org.apache.hadoop.yarn.api.records.Token amrmToken = - getApplicationReport(this.attemptId.getApplicationId()).getAMRMToken(); + getApplicationReport(this.applicationId).getAMRMToken(); if (amrmToken != null) { token = ConverterUtils.convertFromYarn(amrmToken, (Text) null); } else { LOG.warn( "AMRMToken not found in the application report for application: {}", - this.attemptId.getApplicationId()); + this.applicationId); } - return new UnmanagedAMIdentifier(this.attemptId, token); + return token; } private ApplicationReport getApplicationReport(ApplicationId appId) @@ -445,29 +511,6 @@ public class UnmanagedApplicationManager { } /** - * Data structure that encapsulates the application attempt identifier and the - * AMRMTokenIdentifier. Make it public because clients with HA need it. - */ - public static class UnmanagedAMIdentifier { - private ApplicationAttemptId attemptId; - private Token token; - - public UnmanagedAMIdentifier(ApplicationAttemptId attemptId, - Token token) { - this.attemptId = attemptId; - this.token = token; - } - - public ApplicationAttemptId getAttemptId() { - return this.attemptId; - } - - public Token getToken() { - return this.token; - } - } - - /** * Data structure that encapsulates AllocateRequest and AsyncCallback * instance. */ @@ -549,8 +592,10 @@ public class UnmanagedApplicationManager { } request.setResponseId(lastResponseId); + AllocateResponse response = AMRMClientUtils.allocateWithReRegister( - request, rmProxy, registerRequest, attemptId); + request, rmProxy, registerRequest, applicationId); + if (response == null) { throw new YarnException("Null allocateResponse from allocate"); } @@ -578,18 +623,17 @@ public class UnmanagedApplicationManager { LOG.debug("Interrupted while waiting for queue", ex); } } catch (IOException ex) { - LOG.warn( - "IO Error occurred while processing heart beat for " + attemptId, - ex); + LOG.warn("IO Error occurred while processing heart beat for " + + applicationId, ex); } catch (Throwable ex) { LOG.warn( - "Error occurred while processing heart beat for " + attemptId, + "Error occurred while processing heart beat for " + applicationId, ex); } } LOG.info("UnmanagedApplicationManager has been stopped for {}. " - + "AMRequestHandlerThread thread is exiting", attemptId); + + "AMRequestHandlerThread thread is exiting", applicationId); } } @@ -600,8 +644,8 @@ public class UnmanagedApplicationManager { implements UncaughtExceptionHandler { @Override public void uncaughtException(Thread t, Throwable e) { - LOG.error("Heartbeat thread {} for application attempt {} crashed!", - t.getName(), attemptId, e); + LOG.error("Heartbeat thread {} for application {} crashed!", + t.getName(), applicationId, e); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed310913/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java index 9f15d90..e1f08e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java @@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; @@ -63,16 +63,16 @@ public final class AMRMClientUtils { /** * Handle ApplicationNotRegistered exception and re-register. * - * @param attemptId app attemptId + * @param appId application Id * @param rmProxy RM proxy instance * @param registerRequest the AM re-register request * @throws YarnException if re-register fails */ public static void handleNotRegisteredExceptionAndReRegister( - ApplicationAttemptId attemptId, ApplicationMasterProtocol rmProxy, + ApplicationId appId, ApplicationMasterProtocol rmProxy, RegisterApplicationMasterRequest registerRequest) throws YarnException { LOG.info("App attempt {} not registered, most likely due to RM failover. " - + " Trying to re-register.", attemptId); + + " Trying to re-register.", appId); try { rmProxy.registerApplicationMaster(registerRequest); } catch (Exception e) { @@ -93,25 +93,24 @@ public final class AMRMClientUtils { * @param request allocate request * @param rmProxy RM proxy * @param registerRequest the register request for re-register - * @param attemptId application attempt id + * @param appId application id * @return allocate response * @throws YarnException if RM call fails * @throws IOException if RM call fails */ public static AllocateResponse allocateWithReRegister(AllocateRequest request, ApplicationMasterProtocol rmProxy, - RegisterApplicationMasterRequest registerRequest, - ApplicationAttemptId attemptId) throws YarnException, IOException { + RegisterApplicationMasterRequest registerRequest, ApplicationId appId) + throws YarnException, IOException { try { return rmProxy.allocate(request); } catch (ApplicationMasterNotRegisteredException e) { - handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy, + handleNotRegisteredExceptionAndReRegister(appId, rmProxy, registerRequest); // reset responseId after re-register request.setResponseId(0); // retry allocate - return allocateWithReRegister(request, rmProxy, registerRequest, - attemptId); + return allocateWithReRegister(request, rmProxy, registerRequest, appId); } } @@ -123,23 +122,22 @@ public final class AMRMClientUtils { * @param request finishApplicationMaster request * @param rmProxy RM proxy * @param registerRequest the register request for re-register - * @param attemptId application attempt id + * @param appId application id * @return finishApplicationMaster response * @throws YarnException if RM call fails * @throws IOException if RM call fails */ public static FinishApplicationMasterResponse finishAMWithReRegister( FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy, - RegisterApplicationMasterRequest registerRequest, - ApplicationAttemptId attemptId) throws YarnException, IOException { + RegisterApplicationMasterRequest registerRequest, ApplicationId appId) + throws YarnException, IOException { try { return rmProxy.finishApplicationMaster(request); } catch (ApplicationMasterNotRegisteredException ex) { - handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy, + handleNotRegisteredExceptionAndReRegister(appId, rmProxy, registerRequest); // retry finishAM after re-register - return finishAMWithReRegister(request, rmProxy, registerRequest, - attemptId); + return finishAMWithReRegister(request, rmProxy, registerRequest, appId); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed310913/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index c49d6e8..c509994 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; @@ -171,10 +172,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, LoggerFactory.getLogger(MockResourceManagerFacade.class); private HashSet applicationMap = new HashSet<>(); - private HashMap> applicationContainerIdMap = - new HashMap>(); - private HashMap allocatedContainerMap = - new HashMap(); + private HashSet keepContainerOnUams = new HashSet<>(); + private HashMap> + applicationContainerIdMap = new HashMap<>(); private AtomicInteger containerIndex = new AtomicInteger(0); private Configuration conf; private int subClusterId; @@ -215,7 +215,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, this.isRunning = mode; } - private static String getAppIdentifier() throws IOException { + private static ApplicationAttemptId getAppIdentifier() throws IOException { AMRMTokenIdentifier result = null; UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser(); Set tokenIds = remoteUgi.getTokenIdentifiers(); @@ -225,7 +225,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, break; } } - return result != null ? result.getApplicationAttemptId().toString() : ""; + return result != null ? result.getApplicationAttemptId() + : ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0); } private void validateRunning() throws ConnectException { @@ -240,19 +241,32 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, throws YarnException, IOException { validateRunning(); - - String amrmToken = getAppIdentifier(); - LOG.info("Registering application attempt: " + amrmToken); + ApplicationAttemptId attemptId = getAppIdentifier(); + LOG.info("Registering application attempt: " + attemptId); shouldReRegisterNext = false; + List containersFromPreviousAttempt = null; + synchronized (applicationContainerIdMap) { - if (applicationContainerIdMap.containsKey(amrmToken)) { - throw new InvalidApplicationMasterRequestException( - AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE); + if (applicationContainerIdMap.containsKey(attemptId)) { + if (keepContainerOnUams.contains(attemptId.getApplicationId())) { + // For UAM with the keepContainersFromPreviousAttempt flag, return all + // running containers + containersFromPreviousAttempt = new ArrayList<>(); + for (ContainerId containerId : applicationContainerIdMap + .get(attemptId)) { + containersFromPreviousAttempt.add(Container.newInstance(containerId, + null, null, null, null, null)); + } + } else { + throw new InvalidApplicationMasterRequestException( + AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE); + } + } else { + // Keep track of the containers that are returned to this application + applicationContainerIdMap.put(attemptId, new ArrayList()); } - // Keep track of the containers that are returned to this application - applicationContainerIdMap.put(amrmToken, new ArrayList()); } // Make sure we wait for certain test cases last in the method @@ -272,7 +286,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, } return RegisterApplicationMasterResponse.newInstance(null, null, null, null, - null, request.getHost(), null); + containersFromPreviousAttempt, request.getHost(), null); } @Override @@ -282,8 +296,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, validateRunning(); - String amrmToken = getAppIdentifier(); - LOG.info("Finishing application attempt: " + amrmToken); + ApplicationAttemptId attemptId = getAppIdentifier(); + LOG.info("Finishing application attempt: " + attemptId); if (shouldReRegisterNext) { String message = "AM is not registered, should re-register."; @@ -293,12 +307,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, synchronized (applicationContainerIdMap) { // Remove the containers that were being tracked for this application - Assert.assertTrue("The application id is NOT registered: " + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); - List ids = applicationContainerIdMap.remove(amrmToken); - for (ContainerId c : ids) { - allocatedContainerMap.remove(c); - } + Assert.assertTrue("The application id is NOT registered: " + attemptId, + applicationContainerIdMap.containsKey(attemptId)); + applicationContainerIdMap.remove(attemptId); } return FinishApplicationMasterResponse.newInstance( @@ -328,8 +339,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, + "askList and releaseList in the same heartbeat"); } - String amrmToken = getAppIdentifier(); - LOG.info("Allocate from application attempt: " + amrmToken); + ApplicationAttemptId attemptId = getAppIdentifier(); + LOG.info("Allocate from application attempt: " + attemptId); if (shouldReRegisterNext) { String message = "AM is not registered, should re-register."; @@ -361,16 +372,16 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, // will need it in future Assert.assertTrue( "The application id is Not registered before allocate(): " - + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); - List ids = applicationContainerIdMap.get(amrmToken); + + attemptId, + applicationContainerIdMap.containsKey(attemptId)); + List ids = applicationContainerIdMap.get(attemptId); ids.add(containerId); - this.allocatedContainerMap.put(containerId, container); } } } } + List completedList = new ArrayList<>(); if (request.getReleaseList() != null && request.getReleaseList().size() > 0) { LOG.info("Releasing containers: " + request.getReleaseList().size()); @@ -378,9 +389,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, Assert .assertTrue( "The application id is not registered before allocate(): " - + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); - List ids = applicationContainerIdMap.get(amrmToken); + + attemptId, + applicationContainerIdMap.containsKey(attemptId)); + List ids = applicationContainerIdMap.get(attemptId); for (ContainerId id : request.getReleaseList()) { boolean found = false; @@ -396,18 +407,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, + conf.get("AMRMTOKEN"), found); ids.remove(id); - - // Return the released container back to the AM with new fake Ids. The - // test case does not care about the IDs. The IDs are faked because - // otherwise the LRM will throw duplication identifier exception. This - // returning of fake containers is ONLY done for testing purpose - for - // the test code to get confirmation that the sub-cluster resource - // managers received the release request - ContainerId fakeContainerId = ContainerId.newInstance( - getApplicationAttemptId(1), containerIndex.incrementAndGet()); - Container fakeContainer = allocatedContainerMap.get(id); - fakeContainer.setId(fakeContainerId); - containerList.add(fakeContainer); + completedList.add( + ContainerStatus.newInstance(id, ContainerState.COMPLETE, "", 0)); } } } @@ -418,8 +419,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, // Always issue a new AMRMToken as if RM rolled master key Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], ""); - return AllocateResponse.newInstance(0, - new ArrayList(), containerList, + return AllocateResponse.newInstance(0, completedList, containerList, new ArrayList(), null, AMCommand.AM_RESYNC, 1, null, new ArrayList(), newAMRMToken, new ArrayList(), null); @@ -438,6 +438,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, report.setApplicationId(request.getApplicationId()); report.setCurrentApplicationAttemptId( ApplicationAttemptId.newInstance(request.getApplicationId(), 1)); + report.setAMRMToken(Token.newInstance(new byte[0], "", new byte[0], "")); response.setApplicationReport(report); return response; } @@ -481,6 +482,12 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, } LOG.info("Application submitted: " + appId); applicationMap.add(appId); + + if (request.getApplicationSubmissionContext().getUnmanagedAM() + || request.getApplicationSubmissionContext() + .getKeepContainersAcrossApplicationAttempts()) { + keepContainerOnUams.add(appId); + } return SubmitApplicationResponse.newInstance(); } @@ -497,6 +504,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, throw new ApplicationNotFoundException( "Trying to kill an absent application: " + appId); } + keepContainerOnUams.remove(appId); } LOG.info("Force killing application: " + appId); return KillApplicationResponse.newInstance(true); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed310913/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java new file mode 100644 index 0000000..42be851 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit test for FederationRegistryClient. + */ +public class TestFederationRegistryClient { + private Configuration conf; + private UserGroupInformation user; + private RegistryOperations registry; + private FederationRegistryClient registryClient; + + @Before + public void setup() throws Exception { + this.conf = new YarnConfiguration(); + + this.registry = new FSRegistryOperationsService(); + this.registry.init(this.conf); + this.registry.start(); + + this.user = UserGroupInformation.getCurrentUser(); + this.registryClient = + new FederationRegistryClient(this.conf, this.registry, this.user); + this.registryClient.cleanAllApplications(); + Assert.assertEquals(0, this.registryClient.getAllApplications().size()); + } + + @After + public void breakDown() { + registryClient.cleanAllApplications(); + Assert.assertEquals(0, registryClient.getAllApplications().size()); + registry.stop(); + } + + @Test + public void testBasicCase() { + ApplicationId appId = ApplicationId.newInstance(0, 0); + String scId1 = "subcluster1"; + String scId2 = "subcluster2"; + + this.registryClient.writeAMRMTokenForUAM(appId, scId1, + new Token()); + this.registryClient.writeAMRMTokenForUAM(appId, scId2, + new Token()); + // Duplicate entry, should overwrite + this.registryClient.writeAMRMTokenForUAM(appId, scId1, + new Token()); + + Assert.assertEquals(1, this.registryClient.getAllApplications().size()); + Assert.assertEquals(2, + this.registryClient.loadStateFromRegistry(appId).size()); + + this.registryClient.removeAppFromRegistry(appId); + + Assert.assertEquals(0, this.registryClient.getAllApplications().size()); + Assert.assertEquals(0, + this.registryClient.loadStateFromRegistry(appId).size()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed310913/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java index 9159cf7..5848d3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java @@ -65,7 +65,7 @@ public class TestUnmanagedApplicationManager { ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1); uam = new TestableUnmanagedApplicationManager(conf, - attemptId.getApplicationId(), null, "submitter", "appNameSuffix"); + attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true); } protected void waitForCallBackCountAndCheckZeroPending( @@ -88,7 +88,8 @@ public class TestUnmanagedApplicationManager { public void testBasicUsage() throws YarnException, IOException, InterruptedException { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, @@ -102,11 +103,48 @@ public class TestUnmanagedApplicationManager { attemptId); } + /* + * Test re-attaching of an existing UAM. This is for HA of UAM client. + */ + @Test(timeout = 5000) + public void testUAMReAttach() + throws YarnException, IOException, InterruptedException { + + launchUAM(attemptId); + registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + // Wait for outstanding async allocate callback + waitForCallBackCountAndCheckZeroPending(callback, 1); + + MockResourceManagerFacade rmProxy = uam.getRMProxy(); + uam = new TestableUnmanagedApplicationManager(conf, + attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true); + uam.setRMProxy(rmProxy); + + reAttachUAM(null, attemptId); + registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + + // Wait for outstanding async allocate callback + waitForCallBackCountAndCheckZeroPending(callback, 2); + + finishApplicationMaster( + FinishApplicationMasterRequest.newInstance(null, null, null), + attemptId); + } + @Test(timeout = 5000) public void testReRegister() throws YarnException, IOException, InterruptedException { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); uam.setShouldReRegisterNext(); @@ -137,7 +175,8 @@ public class TestUnmanagedApplicationManager { @Override public void run() { try { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 1001, null), attemptId); } catch (Exception e) { @@ -221,7 +260,8 @@ public class TestUnmanagedApplicationManager { @Test public void testForceKill() throws YarnException, IOException, InterruptedException { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); uam.forceKillApplication(); @@ -241,19 +281,40 @@ public class TestUnmanagedApplicationManager { return ugi; } - protected RegisterApplicationMasterResponse - createAndRegisterApplicationMaster( - final RegisterApplicationMasterRequest request, - ApplicationAttemptId appAttemptId) - throws YarnException, IOException, InterruptedException { + protected Token launchUAM( + ApplicationAttemptId appAttemptId) + throws IOException, InterruptedException { + return getUGIWithToken(appAttemptId) + .doAs(new PrivilegedExceptionAction>() { + @Override + public Token run() throws Exception { + return uam.launchUAM(); + } + }); + } + + protected void reAttachUAM(final Token uamToken, + ApplicationAttemptId appAttemptId) + throws IOException, InterruptedException { + getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction() { + @Override + public Token run() throws Exception { + uam.reAttachUAM(uamToken); + return null; + } + }); + } + + protected RegisterApplicationMasterResponse registerApplicationMaster( + final RegisterApplicationMasterRequest request, + ApplicationAttemptId appAttemptId) + throws YarnException, IOException, InterruptedException { return getUGIWithToken(appAttemptId).doAs( new PrivilegedExceptionAction() { @Override public RegisterApplicationMasterResponse run() throws YarnException, IOException { - RegisterApplicationMasterResponse response = - uam.createAndRegisterApplicationMaster(request); - return response; + return uam.registerApplicationMaster(request); } }); } @@ -311,8 +372,9 @@ public class TestUnmanagedApplicationManager { public TestableUnmanagedApplicationManager(Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) { - super(conf, appId, queueName, submitter, appNameSuffix); + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { + super(conf, appId, queueName, submitter, appNameSuffix, + keepContainersAcrossApplicationAttempts); } @SuppressWarnings("unchecked") @@ -330,6 +392,14 @@ public class TestUnmanagedApplicationManager { rmProxy.setShouldReRegisterNext(); } } + + public MockResourceManagerFacade getRMProxy() { + return rmProxy; + } + + public void setRMProxy(MockResourceManagerFacade proxy) { + this.rmProxy = proxy; + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed310913/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java index c355a8b..92afcb7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -67,4 +69,18 @@ public interface AMRMProxyApplicationContext { */ Context getNMCotext(); + /** + * Gets the credentials of this application. + * + * @return the credentials. + */ + Credentials getCredentials(); + + /** + * Gets the registry client. + * + * @return the registry. + */ + RegistryOperations getRegistryClient(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed310913/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java index 9938b37..8a02095 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java @@ -22,6 +22,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -42,6 +44,8 @@ public class AMRMProxyApplicationContextImpl implements private Integer localTokenKeyId; private Token amrmToken; private Token localToken; + private Credentials credentials; + private RegistryOperations registry; /** * Create an instance of the AMRMProxyApplicationContext. @@ -52,17 +56,23 @@ public class AMRMProxyApplicationContextImpl implements * @param user user name of the application * @param amrmToken amrmToken issued by RM * @param localToken amrmToken issued by AMRMProxy + * @param credentials application credentials + * @param registry Yarn Registry client */ - public AMRMProxyApplicationContextImpl(Context nmContext, - Configuration conf, ApplicationAttemptId applicationAttemptId, - String user, Token amrmToken, - Token localToken) { + @SuppressWarnings("checkstyle:parameternumber") + public AMRMProxyApplicationContextImpl(Context nmContext, Configuration conf, + ApplicationAttemptId applicationAttemptId, String user, + Token amrmToken, + Token localToken, Credentials credentials, + RegistryOperations registry) { this.nmContext = nmContext; this.conf = conf; this.applicationAttemptId = applicationAttemptId; this.user = user; this.amrmToken = amrmToken; this.localToken = localToken; + this.credentials = credentials; + this.registry = registry; } @Override @@ -88,11 +98,14 @@ public class AMRMProxyApplicationContextImpl implements /** * Sets the application's AMRMToken. * - * @param amrmToken amrmToken issued by RM + * @param amrmToken the new amrmToken from RM + * @return whether the saved token is updated to a different value */ - public synchronized void setAMRMToken( + public synchronized boolean setAMRMToken( Token amrmToken) { + Token oldValue = this.amrmToken; this.amrmToken = amrmToken; + return !this.amrmToken.equals(oldValue); } @Override @@ -134,4 +147,14 @@ public class AMRMProxyApplicationContextImpl implements public Context getNMCotext() { return nmContext; } + + @Override + public Credentials getCredentials() { + return this.credentials; + } + + @Override + public RegistryOperations getRegistryClient() { + return this.registry; + } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org