Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1B040200D19 for ; Fri, 22 Sep 2017 02:58:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1995A1609E8; Fri, 22 Sep 2017 00:58:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B96551609E1 for ; Fri, 22 Sep 2017 02:58:39 +0200 (CEST) Received: (qmail 48620 invoked by uid 500); 22 Sep 2017 00:58:27 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 45714 invoked by uid 99); 22 Sep 2017 00:58:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Sep 2017 00:58:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0A428F5A97; Fri, 22 Sep 2017 00:58:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: curino@apache.org To: common-commits@hadoop.apache.org Date: Fri, 22 Sep 2017 00:58:50 -0000 Message-Id: <631e7304641f496a87d8ffa4a7584700@git.apache.org> In-Reply-To: <8fd9b2cc5dce435c97511d939aac78bc@git.apache.org> References: <8fd9b2cc5dce435c97511d939aac78bc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [29/50] [abbrv] hadoop git commit: YARN-5531. UnmanagedAM pool manager for federating application across clusters. (Botong Huang via Subru). archived-at: Fri, 22 Sep 2017 00:58:42 -0000 YARN-5531. UnmanagedAM pool manager for federating application across clusters. (Botong Huang via Subru). (cherry picked from commit 73bb2102ce4b82b3a3bed91319f7c8f067ddc3e8) (cherry picked from commit 859aa1f9d621d07693825e610bdc0149f7a2770a) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9476d86c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9476d86c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9476d86c Branch: refs/heads/branch-2 Commit: 9476d86ce869b51fc7524ae58dd53862bc2d7d72 Parents: 7f00f93 Author: Subru Krishnan Authored: Fri May 26 16:23:38 2017 -0700 Committer: Carlo Curino Committed: Thu Sep 21 16:47:43 2017 -0700 ---------------------------------------------------------------------- .../apache/hadoop/yarn/util/AsyncCallback.java | 35 ++ .../failover/FederationProxyProviderUtil.java | 114 ++-- .../yarn/server/uam/UnmanagedAMPoolManager.java | 311 ++++++++++ .../server/uam/UnmanagedApplicationManager.java | 607 +++++++++++++++++++ .../hadoop/yarn/server/uam/package-info.java | 18 + .../yarn/server/utils/AMRMClientUtils.java | 189 ++++++ .../server/utils/YarnServerSecurityUtils.java | 41 +- .../yarn/server/MockResourceManagerFacade.java | 10 +- .../uam/TestUnmanagedApplicationManager.java | 335 ++++++++++ .../amrmproxy/DefaultRequestInterceptor.java | 30 +- .../ApplicationMasterService.java | 12 +- .../TestApplicationMasterLauncher.java | 6 +- 12 files changed, 1590 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AsyncCallback.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AsyncCallback.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AsyncCallback.java new file mode 100644 index 0000000..b4f75c9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AsyncCallback.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.util; + +/** + * Generic interface that can be used for calling back when a corresponding + * asynchronous operation completes. + * + * @param parameter type for the callback + */ +public interface AsyncCallback { + /** + * This method is called back when the corresponding asynchronous operation + * completes. + * + * @param response response of the callback + */ + void callback(T response); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java index 18f1338..3931f2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java @@ -19,22 +19,20 @@ package org.apache.hadoop.yarn.server.federation.failover; import java.io.IOException; -import java.security.PrivilegedExceptionAction; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.RMFailoverProxyProvider; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,10 +49,15 @@ public final class FederationProxyProviderUtil { public static final Logger LOG = LoggerFactory.getLogger(FederationProxyProviderUtil.class); + // Disable constructor + private FederationProxyProviderUtil() { + } + /** - * Create a proxy for the specified protocol. For non-HA, this is a direct - * connection to the ResourceManager address. When HA is enabled, the proxy - * handles the failover between the ResourceManagers as well. + * Create a proxy for the specified protocol in the context of Federation. For + * non-HA, this is a direct connection to the ResourceManager address. When HA + * is enabled, the proxy handles the failover between the ResourceManagers as + * well. * * @param configuration Configuration to generate {@link ClientRMProxy} * @param protocol Protocol for the proxy @@ -67,15 +70,16 @@ public final class FederationProxyProviderUtil { @Public @Unstable public static T createRMProxy(Configuration configuration, - final Class protocol, SubClusterId subClusterId, - UserGroupInformation user) throws IOException { + Class protocol, SubClusterId subClusterId, UserGroupInformation user) + throws IOException { return createRMProxy(configuration, protocol, subClusterId, user, null); } /** - * Create a proxy for the specified protocol. For non-HA, this is a direct - * connection to the ResourceManager address. When HA is enabled, the proxy - * handles the failover between the ResourceManagers as well. + * Create a proxy for the specified protocol in the context of Federation. For + * non-HA, this is a direct connection to the ResourceManager address. When HA + * is enabled, the proxy handles the failover between the ResourceManagers as + * well. * * @param configuration Configuration to generate {@link ClientRMProxy} * @param protocol Protocol for the proxy @@ -88,65 +92,35 @@ public final class FederationProxyProviderUtil { */ @Public @Unstable - @SuppressWarnings("unchecked") - public static T createRMProxy(final Configuration configuration, + public static T createRMProxy(Configuration configuration, final Class protocol, SubClusterId subClusterId, - UserGroupInformation user, final Token token) throws IOException { - try { - final YarnConfiguration conf = new YarnConfiguration(configuration); - updateConf(conf, subClusterId); - if (token != null) { - LOG.info( - "Creating RMProxy with a token: {} to subcluster: {}" - + " for protocol: {}", - token, subClusterId, protocol.getSimpleName()); - user.addToken(token); - setAuthModeInConf(conf); - } else { - LOG.info("Creating RMProxy without a token to subcluster: {}" - + " for protocol: {}", subClusterId, protocol.getSimpleName()); - } - final T proxyConnection = user.doAs(new PrivilegedExceptionAction() { - @Override - public T run() throws Exception { - return ClientRMProxy.createRMProxy(conf, protocol); - } - }); - - return proxyConnection; - } catch (IOException e) { - String message = - "Error while creating of RM application master service proxy for" - + " appAttemptId: " + user; - LOG.info(message); - throw new YarnRuntimeException(message, e); - } catch (InterruptedException e) { - throw new YarnRuntimeException(e); - } + UserGroupInformation user, Token token) + throws IOException { + final YarnConfiguration config = new YarnConfiguration(configuration); + updateConfForFederation(config, subClusterId.getId()); + return AMRMClientUtils.createRMProxy(config, protocol, user, token); } - private static void setAuthModeInConf(Configuration conf) { - conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, - SaslRpcServer.AuthMethod.TOKEN.toString()); - } - - // updating the conf with the refreshed RM addresses as proxy creations - // are based out of conf - private static void updateConf(Configuration conf, - SubClusterId subClusterId) { - conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId()); - // In a Federation setting, we will connect to not just the local cluster RM - // but also multiple external RMs. The membership information of all the RMs - // that are currently - // participating in Federation is available in the central - // FederationStateStore. - // So we will: - // 1. obtain the RM service addresses from FederationStateStore using the - // FederationRMFailoverProxyProvider. - // 2. disable traditional HA as that depends on local configuration lookup - // for RMs using indexes. - // 3. we will enable federation failover IF traditional HA is enabled so - // that the appropriate failover RetryPolicy is initialized. + /** + * Updating the conf with Federation as long as certain subclusterId. + * + * @param conf configuration + * @param subClusterId subclusterId for the conf + */ + public static void updateConfForFederation(Configuration conf, + String subClusterId) { + conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId); + /* + * In a Federation setting, we will connect to not just the local cluster RM + * but also multiple external RMs. The membership information of all the RMs + * that are currently participating in Federation is available in the + * central FederationStateStore. So we will: 1. obtain the RM service + * addresses from FederationStateStore using the + * FederationRMFailoverProxyProvider. 2. disable traditional HA as that + * depends on local configuration lookup for RMs using indexes. 3. we will + * enable federation failover IF traditional HA is enabled so that the + * appropriate failover RetryPolicy is initialized. + */ conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); conf.setClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER, FederationRMFailoverProxyProvider.class, RMFailoverProxyProvider.class); @@ -156,8 +130,4 @@ public final class FederationProxyProviderUtil { } } - // disable instantiation - private FederationProxyProviderUtil() { - } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java new file mode 100644 index 0000000..08aee77 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.uam; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; +import org.apache.hadoop.yarn.util.AsyncCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * A service that manages a pool of UAM managers in + * {@link UnmanagedApplicationManager}. + */ +@Public +@Unstable +public class UnmanagedAMPoolManager extends AbstractService { + public static final Logger LOG = + LoggerFactory.getLogger(UnmanagedAMPoolManager.class); + + // Map from uamId to UAM instances + private Map unmanagedAppMasterMap; + + private Map attemptIdMap; + + private ExecutorService threadpool; + + public UnmanagedAMPoolManager(ExecutorService threadpool) { + super(UnmanagedAMPoolManager.class.getName()); + this.threadpool = threadpool; + } + + @Override + protected void serviceStart() throws Exception { + if (this.threadpool == null) { + this.threadpool = Executors.newCachedThreadPool(); + } + this.unmanagedAppMasterMap = new ConcurrentHashMap<>(); + this.attemptIdMap = new ConcurrentHashMap<>(); + super.serviceStart(); + } + + /** + * Normally we should finish all applications before stop. If there are still + * UAMs running, force kill all of them. Do parallel kill because of + * performance reasons. + * + * TODO: move waiting for the kill to finish into a separate thread, without + * blocking the serviceStop. + */ + @Override + protected void serviceStop() throws Exception { + ExecutorCompletionService completionService = + new ExecutorCompletionService<>(this.threadpool); + if (this.unmanagedAppMasterMap.isEmpty()) { + return; + } + + // Save a local copy of the key set so that it won't change with the map + Set addressList = + new HashSet<>(this.unmanagedAppMasterMap.keySet()); + LOG.warn("Abnormal shutdown of UAMPoolManager, still {} UAMs in map", + addressList.size()); + + for (final String uamId : addressList) { + completionService.submit(new Callable() { + @Override + public KillApplicationResponse call() throws Exception { + try { + LOG.info("Force-killing UAM id " + uamId + " for application " + + attemptIdMap.get(uamId)); + return unmanagedAppMasterMap.remove(uamId).forceKillApplication(); + } catch (Exception e) { + LOG.error("Failed to kill unmanaged application master", e); + return null; + } + } + }); + } + + for (int i = 0; i < addressList.size(); ++i) { + try { + Future future = completionService.take(); + future.get(); + } catch (Exception e) { + LOG.error("Failed to kill unmanaged application master", e); + } + } + this.attemptIdMap.clear(); + super.serviceStop(); + } + + /** + * Create a new UAM and register the application, without specifying uamId and + * appId. We will ask for an appId from RM and use it as the uamId. + * + * @param registerRequest RegisterApplicationMasterRequest + * @param conf configuration for this UAM + * @param queueName queue of the application + * @param submitter submitter name of the UAM + * @param appNameSuffix application name suffix for the UAM + * @return uamId for the UAM + * @throws YarnException if registerApplicationMaster fails + * @throws IOException if registerApplicationMaster fails + */ + public String createAndRegisterNewUAM( + RegisterApplicationMasterRequest registerRequest, Configuration conf, + String queueName, String submitter, String appNameSuffix) + throws YarnException, IOException { + ApplicationId appId = null; + ApplicationClientProtocol rmClient; + try { + UserGroupInformation appSubmitter = + UserGroupInformation.createRemoteUser(submitter); + rmClient = AMRMClientUtils.createRMProxy(conf, + ApplicationClientProtocol.class, appSubmitter, null); + + // Get a new appId from RM + GetNewApplicationResponse response = + rmClient.getNewApplication(GetNewApplicationRequest.newInstance()); + if (response == null) { + throw new YarnException("getNewApplication got null response"); + } + appId = response.getApplicationId(); + LOG.info("Received new application ID {} from RM", appId); + } finally { + rmClient = null; + } + + createAndRegisterNewUAM(appId.toString(), registerRequest, conf, appId, + queueName, submitter, appNameSuffix); + return appId.toString(); + } + + /** + * Create a new UAM and register the application, using the provided uamId and + * appId. + * + * @param uamId identifier for the UAM + * @param registerRequest RegisterApplicationMasterRequest + * @param conf configuration for this UAM + * @param appId application id for the UAM + * @param queueName queue of the application + * @param submitter submitter name of the UAM + * @param appNameSuffix application name suffix for the UAM + * @return RegisterApplicationMasterResponse + * @throws YarnException if registerApplicationMaster fails + * @throws IOException if registerApplicationMaster fails + */ + public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId, + RegisterApplicationMasterRequest registerRequest, Configuration conf, + ApplicationId appId, String queueName, String submitter, + String appNameSuffix) throws YarnException, IOException { + + if (this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " already exists"); + } + UnmanagedApplicationManager uam = + createUAM(conf, appId, queueName, submitter, appNameSuffix); + // Put the UAM into map first before initializing it to avoid additional UAM + // for the same uamId being created concurrently + this.unmanagedAppMasterMap.put(uamId, uam); + + RegisterApplicationMasterResponse response = null; + try { + LOG.info("Creating and registering UAM id {} for application {}", uamId, + appId); + response = uam.createAndRegisterApplicationMaster(registerRequest); + } catch (Exception e) { + // Add the map earlier and remove here if register failed because we want + // to make sure there is only one uam instance per uamId at any given time + this.unmanagedAppMasterMap.remove(uamId); + throw e; + } + + this.attemptIdMap.put(uamId, uam.getAttemptId()); + return response; + } + + /** + * Creates the UAM instance. Pull out to make unit test easy. + * + * @param conf Configuration + * @param appId application id + * @param queueName queue of the application + * @param submitter submitter name of the application + * @param appNameSuffix application name suffix + * @return the UAM instance + */ + @VisibleForTesting + protected UnmanagedApplicationManager createUAM(Configuration conf, + ApplicationId appId, String queueName, String submitter, + String appNameSuffix) { + return new UnmanagedApplicationManager(conf, appId, queueName, submitter, + appNameSuffix); + } + + /** + * AllocateAsync to an UAM. + * + * @param uamId identifier for the UAM + * @param request AllocateRequest + * @param callback callback for response + * @throws YarnException if allocate fails + * @throws IOException if allocate fails + */ + public void allocateAsync(String uamId, AllocateRequest request, + AsyncCallback callback) + throws YarnException, IOException { + if (!this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " does not exist"); + } + this.unmanagedAppMasterMap.get(uamId).allocateAsync(request, callback); + } + + /** + * Finish an UAM/application. + * + * @param uamId identifier for the UAM + * @param request FinishApplicationMasterRequest + * @return FinishApplicationMasterResponse + * @throws YarnException if finishApplicationMaster call fails + * @throws IOException if finishApplicationMaster call fails + */ + public FinishApplicationMasterResponse finishApplicationMaster(String uamId, + FinishApplicationMasterRequest request) + throws YarnException, IOException { + if (!this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " does not exist"); + } + LOG.info("Finishing application for UAM id {} ", uamId); + FinishApplicationMasterResponse response = + this.unmanagedAppMasterMap.get(uamId).finishApplicationMaster(request); + + if (response.getIsUnregistered()) { + // Only remove the UAM when the unregister finished + this.unmanagedAppMasterMap.remove(uamId); + this.attemptIdMap.remove(uamId); + LOG.info("UAM id {} is unregistered", uamId); + } + return response; + } + + /** + * Get the id of all running UAMs. + * + * @return uamId set + */ + public Set getAllUAMIds() { + // Return a clone of the current id set for concurrency reasons, so that the + // returned map won't change with the actual map + return new HashSet(this.unmanagedAppMasterMap.keySet()); + } + + /** + * Return whether an UAM exists. + * + * @param uamId identifier for the UAM + * @return UAM exists or not + */ + public boolean hasUAMId(String uamId) { + return this.unmanagedAppMasterMap.containsKey(uamId); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java new file mode 100644 index 0000000..60a9a27 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -0,0 +1,607 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.uam; + +import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.EnumSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; +import org.apache.hadoop.yarn.util.AsyncCallback; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * UnmanagedApplicationManager is used to register unmanaged application and + * negotiate for resources from resource managers. An unmanagedAM is an AM that + * is not launched and managed by the RM. Allocate calls are handled + * asynchronously using {@link AsyncCallback}. + */ +@Public +@Unstable +public class UnmanagedApplicationManager { + private static final Logger LOG = + LoggerFactory.getLogger(UnmanagedApplicationManager.class); + private static final long AM_STATE_WAIT_TIMEOUT_MS = 10000; + private static final String APP_NAME = "UnmanagedAM"; + private static final String DEFAULT_QUEUE_CONFIG = "uam.default.queue.name"; + + private BlockingQueue requestQueue; + private AMRequestHandlerThread handlerThread; + private ApplicationMasterProtocol rmProxy; + private ApplicationId applicationId; + private ApplicationAttemptId attemptId; + private String submitter; + private String appNameSuffix; + private Configuration conf; + private String queueName; + private UserGroupInformation userUgi; + private RegisterApplicationMasterRequest registerRequest; + private int lastResponseId; + private ApplicationClientProtocol rmClient; + private long asyncApiPollIntervalMillis; + private RecordFactory recordFactory; + + public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, + String queueName, String submitter, String appNameSuffix) { + Preconditions.checkNotNull(conf, "Configuration cannot be null"); + Preconditions.checkNotNull(appId, "ApplicationId cannot be null"); + Preconditions.checkNotNull(submitter, "App submitter cannot be null"); + + this.conf = conf; + this.applicationId = appId; + this.queueName = queueName; + this.submitter = submitter; + this.appNameSuffix = appNameSuffix; + this.handlerThread = new AMRequestHandlerThread(); + this.requestQueue = new LinkedBlockingQueue<>(); + this.rmProxy = null; + this.registerRequest = null; + this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); + this.asyncApiPollIntervalMillis = conf.getLong( + YarnConfiguration. + YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, + YarnConfiguration. + DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); + } + + /** + * Registers this {@link UnmanagedApplicationManager} with the resource + * manager. + * + * @param request the register request + * @return the register response + * @throws YarnException if register fails + * @throws IOException if register fails + */ + public RegisterApplicationMasterResponse createAndRegisterApplicationMaster( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { + // This need to be done first in this method, because it is used as an + // indication that this method is called (and perhaps blocked due to RM + // connection and not finished yet) + this.registerRequest = request; + + // attemptId will be available after this call + UnmanagedAMIdentifier identifier = + initializeUnmanagedAM(this.applicationId); + + try { + this.userUgi = UserGroupInformation.createProxyUser( + identifier.getAttemptId().toString(), + UserGroupInformation.getCurrentUser()); + } catch (IOException e) { + LOG.error("Exception while trying to get current user", e); + throw new YarnRuntimeException(e); + } + + this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf, + this.userUgi, identifier.getToken()); + + LOG.info("Registering the Unmanaged application master {}", this.attemptId); + RegisterApplicationMasterResponse response = + this.rmProxy.registerApplicationMaster(this.registerRequest); + + // Only when register succeed that we start the heartbeat thread + this.handlerThread.setUncaughtExceptionHandler( + new HeartBeatThreadUncaughtExceptionHandler()); + this.handlerThread.setDaemon(true); + this.handlerThread.start(); + + this.lastResponseId = 0; + return response; + } + + /** + * Unregisters from the resource manager and stops the request handler thread. + * + * @param request the finishApplicationMaster request + * @return the response + * @throws YarnException if finishAM call fails + * @throws IOException if finishAM call fails + */ + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) + throws YarnException, IOException { + + this.handlerThread.shutdown(); + + if (this.rmProxy == null) { + if (this.registerRequest != null) { + // This is possible if the async registerApplicationMaster is still + // blocked and retrying. Return a dummy response in this case. + LOG.warn("Unmanaged AM still not successfully launched/registered yet." + + " Stopping the UAM client thread anyways."); + return FinishApplicationMasterResponse.newInstance(false); + } else { + throw new YarnException("finishApplicationMaster should not " + + "be called before createAndRegister"); + } + } + return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy, + this.registerRequest, this.attemptId); + } + + /** + * Force kill the UAM. + * + * @return kill response + * @throws IOException if fails to create rmProxy + * @throws YarnException if force kill fails + */ + public KillApplicationResponse forceKillApplication() + throws IOException, YarnException { + KillApplicationRequest request = + KillApplicationRequest.newInstance(this.attemptId.getApplicationId()); + + this.handlerThread.shutdown(); + + if (this.rmClient == null) { + this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf, + UserGroupInformation.createRemoteUser(this.submitter), null); + } + return this.rmClient.forceKillApplication(request); + } + + /** + * Sends the specified heart beat request to the resource manager and invokes + * the callback asynchronously with the response. + * + * @param request the allocate request + * @param callback the callback method for the request + * @throws YarnException if registerAM is not called yet + */ + public void allocateAsync(AllocateRequest request, + AsyncCallback callback) throws YarnException { + try { + this.requestQueue.put(new AsyncAllocateRequestInfo(request, callback)); + } catch (InterruptedException ex) { + // Should not happen as we have MAX_INT queue length + LOG.debug("Interrupted while waiting to put on response queue", ex); + } + // Two possible cases why the UAM is not successfully registered yet: + // 1. registerApplicationMaster is not called at all. Should throw here. + // 2. registerApplicationMaster is called but hasn't successfully returned. + // + // In case 2, we have already save the allocate request above, so if the + // registration succeed later, no request is lost. + if (this.rmProxy == null) { + if (this.registerRequest != null) { + LOG.info("Unmanaged AM still not successfully launched/registered yet." + + " Saving the allocate request and send later."); + } else { + throw new YarnException( + "AllocateAsync should not be called before createAndRegister"); + } + } + } + + /** + * Returns the application attempt id of the UAM. + * + * @return attempt id of the UAM + */ + public ApplicationAttemptId getAttemptId() { + return this.attemptId; + } + + /** + * Returns RM proxy for the specified protocol type. Unit test cases can + * override this method and return mock proxy instances. + * + * @param protocol protocal of the proxy + * @param config configuration + * @param user ugi for the proxy connection + * @param token token for the connection + * @param type of the proxy + * @return the proxy instance + * @throws IOException if fails to create the proxy + */ + protected T createRMProxy(Class protocol, Configuration config, + UserGroupInformation user, Token token) + throws IOException { + return AMRMClientUtils.createRMProxy(config, protocol, user, token); + } + + /** + * Launch and initialize an unmanaged AM. First, it creates a new application + * on the RM and negotiates a new attempt id. Then it waits for the RM + * application attempt state to reach YarnApplicationAttemptState.LAUNCHED + * after which it returns the AM-RM token and the attemptId. + * + * @param appId application id + * @return the UAM identifier + * @throws IOException if initialize fails + * @throws YarnException if initialize fails + */ + protected UnmanagedAMIdentifier initializeUnmanagedAM(ApplicationId appId) + throws IOException, YarnException { + try { + UserGroupInformation appSubmitter = + UserGroupInformation.createRemoteUser(this.submitter); + this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf, + appSubmitter, null); + + // Submit the application + submitUnmanagedApp(appId); + + // Monitor the application attempt to wait for launch state + ApplicationAttemptReport attemptReport = monitorCurrentAppAttempt(appId, + EnumSet.of(YarnApplicationState.ACCEPTED, + YarnApplicationState.RUNNING, YarnApplicationState.KILLED, + YarnApplicationState.FAILED, YarnApplicationState.FINISHED), + YarnApplicationAttemptState.LAUNCHED); + this.attemptId = attemptReport.getApplicationAttemptId(); + return getUAMIdentifier(); + } finally { + this.rmClient = null; + } + } + + private void submitUnmanagedApp(ApplicationId appId) + throws YarnException, IOException { + SubmitApplicationRequest submitRequest = + this.recordFactory.newRecordInstance(SubmitApplicationRequest.class); + + ApplicationSubmissionContext context = this.recordFactory + .newRecordInstance(ApplicationSubmissionContext.class); + + context.setApplicationId(appId); + context.setApplicationName(APP_NAME + "-" + appNameSuffix); + if (StringUtils.isBlank(this.queueName)) { + context.setQueue(this.conf.get(DEFAULT_QUEUE_CONFIG, + YarnConfiguration.DEFAULT_QUEUE_NAME)); + } else { + context.setQueue(this.queueName); + } + + ContainerLaunchContext amContainer = + this.recordFactory.newRecordInstance(ContainerLaunchContext.class); + Resource resource = BuilderUtils.newResource(1024, 1); + context.setResource(resource); + context.setAMContainerSpec(amContainer); + submitRequest.setApplicationSubmissionContext(context); + + context.setUnmanagedAM(true); + + LOG.info("Submitting unmanaged application {}", appId); + this.rmClient.submitApplication(submitRequest); + } + + /** + * Monitor the submitted application and attempt until it reaches certain + * states. + * + * @param appId Application Id of application to be monitored + * @param appStates acceptable application state + * @param attemptState acceptable application attempt state + * @return the application report + * @throws YarnException if getApplicationReport fails + * @throws IOException if getApplicationReport fails + */ + private ApplicationAttemptReport monitorCurrentAppAttempt(ApplicationId appId, + Set appStates, + YarnApplicationAttemptState attemptState) + throws YarnException, IOException { + + long startTime = System.currentTimeMillis(); + ApplicationAttemptId appAttemptId = null; + while (true) { + if (appAttemptId == null) { + // Get application report for the appId we are interested in + ApplicationReport report = getApplicationReport(appId); + YarnApplicationState state = report.getYarnApplicationState(); + if (appStates.contains(state)) { + if (state != YarnApplicationState.ACCEPTED) { + throw new YarnRuntimeException( + "Received non-accepted application state: " + state + + ". Application " + appId + " not the first attempt?"); + } + appAttemptId = + getApplicationReport(appId).getCurrentApplicationAttemptId(); + } else { + LOG.info("Current application state of {} is {}, will retry later.", + appId, state); + } + } + + if (appAttemptId != null) { + GetApplicationAttemptReportRequest req = this.recordFactory + .newRecordInstance(GetApplicationAttemptReportRequest.class); + req.setApplicationAttemptId(appAttemptId); + ApplicationAttemptReport attemptReport = this.rmClient + .getApplicationAttemptReport(req).getApplicationAttemptReport(); + if (attemptState + .equals(attemptReport.getYarnApplicationAttemptState())) { + return attemptReport; + } + LOG.info("Current attempt state of " + appAttemptId + " is " + + attemptReport.getYarnApplicationAttemptState() + + ", waiting for current attempt to reach " + attemptState); + } + + try { + Thread.sleep(this.asyncApiPollIntervalMillis); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for current attempt of " + appId + + " to reach " + attemptState); + } + + if (System.currentTimeMillis() - startTime > AM_STATE_WAIT_TIMEOUT_MS) { + throw new RuntimeException("Timeout for waiting current attempt of " + + appId + " to reach " + attemptState); + } + } + } + + /** + * Gets the identifier of the unmanaged AM. + * + * @return the identifier of the unmanaged AM. + * @throws IOException if getApplicationReport fails + * @throws YarnException if getApplicationReport fails + */ + protected UnmanagedAMIdentifier getUAMIdentifier() + throws IOException, YarnException { + Token token = null; + org.apache.hadoop.yarn.api.records.Token amrmToken = + getApplicationReport(this.attemptId.getApplicationId()).getAMRMToken(); + if (amrmToken != null) { + token = ConverterUtils.convertFromYarn(amrmToken, (Text) null); + } else { + LOG.warn( + "AMRMToken not found in the application report for application: {}", + this.attemptId.getApplicationId()); + } + return new UnmanagedAMIdentifier(this.attemptId, token); + } + + private ApplicationReport getApplicationReport(ApplicationId appId) + throws YarnException, IOException { + GetApplicationReportRequest request = + this.recordFactory.newRecordInstance(GetApplicationReportRequest.class); + request.setApplicationId(appId); + return this.rmClient.getApplicationReport(request).getApplicationReport(); + } + + /** + * Data structure that encapsulates the application attempt identifier and the + * AMRMTokenIdentifier. Make it public because clients with HA need it. + */ + public static class UnmanagedAMIdentifier { + private ApplicationAttemptId attemptId; + private Token token; + + public UnmanagedAMIdentifier(ApplicationAttemptId attemptId, + Token token) { + this.attemptId = attemptId; + this.token = token; + } + + public ApplicationAttemptId getAttemptId() { + return this.attemptId; + } + + public Token getToken() { + return this.token; + } + } + + /** + * Data structure that encapsulates AllocateRequest and AsyncCallback + * instance. + */ + public static class AsyncAllocateRequestInfo { + private AllocateRequest request; + private AsyncCallback callback; + + public AsyncAllocateRequestInfo(AllocateRequest request, + AsyncCallback callback) { + Preconditions.checkArgument(request != null, + "AllocateRequest cannot be null"); + Preconditions.checkArgument(callback != null, "Callback cannot be null"); + + this.request = request; + this.callback = callback; + } + + public AsyncCallback getCallback() { + return this.callback; + } + + public AllocateRequest getRequest() { + return this.request; + } + } + + @VisibleForTesting + public int getRequestQueueSize() { + return this.requestQueue.size(); + } + + /** + * Extends Thread and provides an implementation that is used for processing + * the AM heart beat request asynchronously and sending back the response + * using the callback method registered with the system. + */ + public class AMRequestHandlerThread extends Thread { + + // Indication flag for the thread to keep running + private volatile boolean keepRunning; + + public AMRequestHandlerThread() { + super("UnmanagedApplicationManager Heartbeat Handler Thread"); + this.keepRunning = true; + } + + /** + * Shutdown the thread. + */ + public void shutdown() { + this.keepRunning = false; + this.interrupt(); + } + + @Override + public void run() { + while (keepRunning) { + AsyncAllocateRequestInfo requestInfo; + try { + requestInfo = requestQueue.take(); + if (requestInfo == null) { + throw new YarnException( + "Null requestInfo taken from request queue"); + } + if (!keepRunning) { + break; + } + + // change the response id before forwarding the allocate request as we + // could have different values for each UAM + AllocateRequest request = requestInfo.getRequest(); + if (request == null) { + throw new YarnException("Null allocateRequest from requestInfo"); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:" + + ((request.getAskList() == null) ? " empty" + : request.getAskList().size())); + } + + request.setResponseId(lastResponseId); + AllocateResponse response = AMRMClientUtils.allocateWithReRegister( + request, rmProxy, registerRequest, attemptId); + if (response == null) { + throw new YarnException("Null allocateResponse from allocate"); + } + + lastResponseId = response.getResponseId(); + // update token if RM has reissued/renewed + if (response.getAMRMToken() != null) { + LOG.debug("Received new AMRMToken"); + YarnServerSecurityUtils.updateAMRMToken(response.getAMRMToken(), + userUgi, conf); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Received Heartbeat reply from RM. Allocated Containers:" + + ((response.getAllocatedContainers() == null) ? " empty" + : response.getAllocatedContainers().size())); + } + + if (requestInfo.getCallback() == null) { + throw new YarnException("Null callback from requestInfo"); + } + requestInfo.getCallback().callback(response); + } catch (InterruptedException ex) { + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted while waiting for queue", ex); + } + } catch (IOException ex) { + LOG.warn( + "IO Error occurred while processing heart beat for " + attemptId, + ex); + } catch (Throwable ex) { + LOG.warn( + "Error occurred while processing heart beat for " + attemptId, + ex); + } + } + + LOG.info("UnmanagedApplicationManager has been stopped for {}. " + + "AMRequestHandlerThread thread is exiting", attemptId); + } + } + + /** + * Uncaught exception handler for the background heartbeat thread. + */ + protected class HeartBeatThreadUncaughtExceptionHandler + implements UncaughtExceptionHandler { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.error("Heartbeat thread {} for application attempt {} crashed!", + t.getName(), attemptId, e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/package-info.java new file mode 100644 index 0000000..0e78094 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.uam; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java new file mode 100644 index 0000000..7993bd8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.utils; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; +import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class for AMRMClient. + */ +@Private +public final class AMRMClientUtils { + private static final Logger LOG = + LoggerFactory.getLogger(AMRMClientUtils.class); + + public static final String APP_ALREADY_REGISTERED_MESSAGE = + "Application Master is already registered : "; + + private AMRMClientUtils() { + } + + /** + * Handle ApplicationNotRegistered exception and re-register. + * + * @param attemptId app attemptId + * @param rmProxy RM proxy instance + * @param registerRequest the AM re-register request + * @throws YarnException if re-register fails + */ + public static void handleNotRegisteredExceptionAndReRegister( + ApplicationAttemptId attemptId, ApplicationMasterProtocol rmProxy, + RegisterApplicationMasterRequest registerRequest) throws YarnException { + LOG.info("App attempt {} not registered, most likely due to RM failover. " + + " Trying to re-register.", attemptId); + try { + rmProxy.registerApplicationMaster(registerRequest); + } catch (Exception e) { + if (e instanceof InvalidApplicationMasterRequestException + && e.getMessage().contains(APP_ALREADY_REGISTERED_MESSAGE)) { + LOG.info("Concurrent thread successfully registered, moving on."); + } else { + LOG.error("Error trying to re-register AM", e); + throw new YarnException(e); + } + } + } + + /** + * Helper method for client calling ApplicationMasterProtocol.allocate that + * handles re-register if RM fails over. + * + * @param request allocate request + * @param rmProxy RM proxy + * @param registerRequest the register request for re-register + * @param attemptId application attempt id + * @return allocate response + * @throws YarnException if RM call fails + * @throws IOException if RM call fails + */ + public static AllocateResponse allocateWithReRegister(AllocateRequest request, + ApplicationMasterProtocol rmProxy, + RegisterApplicationMasterRequest registerRequest, + ApplicationAttemptId attemptId) throws YarnException, IOException { + try { + return rmProxy.allocate(request); + } catch (ApplicationMasterNotRegisteredException e) { + handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy, + registerRequest); + // reset responseId after re-register + request.setResponseId(0); + // retry allocate + return allocateWithReRegister(request, rmProxy, registerRequest, + attemptId); + } + } + + /** + * Helper method for client calling + * ApplicationMasterProtocol.finishApplicationMaster that handles re-register + * if RM fails over. + * + * @param request finishApplicationMaster request + * @param rmProxy RM proxy + * @param registerRequest the register request for re-register + * @param attemptId application attempt id + * @return finishApplicationMaster response + * @throws YarnException if RM call fails + * @throws IOException if RM call fails + */ + public static FinishApplicationMasterResponse finishAMWithReRegister( + FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy, + RegisterApplicationMasterRequest registerRequest, + ApplicationAttemptId attemptId) throws YarnException, IOException { + try { + return rmProxy.finishApplicationMaster(request); + } catch (ApplicationMasterNotRegisteredException ex) { + handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy, + registerRequest); + // retry finishAM after re-register + return finishAMWithReRegister(request, rmProxy, registerRequest, + attemptId); + } + } + + /** + * Create a proxy for the specified protocol. + * + * @param configuration Configuration to generate {@link ClientRMProxy} + * @param protocol Protocol for the proxy + * @param user the user on whose behalf the proxy is being created + * @param token the auth token to use for connection + * @param Type information of the proxy + * @return Proxy to the RM + * @throws IOException on failure + */ + @Public + @Unstable + public static T createRMProxy(final Configuration configuration, + final Class protocol, UserGroupInformation user, + final Token token) throws IOException { + try { + String rmClusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID, + YarnConfiguration.DEFAULT_RM_CLUSTER_ID); + LOG.info("Creating RMProxy to RM {} for protocol {} for user {}", + rmClusterId, protocol.getSimpleName(), user); + if (token != null) { + token.setService(ClientRMProxy.getAMRMTokenService(configuration)); + user.addToken(token); + setAuthModeInConf(configuration); + } + final T proxyConnection = user.doAs(new PrivilegedExceptionAction() { + @Override + public T run() throws Exception { + return ClientRMProxy.createRMProxy(configuration, protocol); + } + }); + return proxyConnection; + + } catch (InterruptedException e) { + throw new YarnRuntimeException(e); + } + } + + private static void setAuthModeInConf(Configuration conf) { + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + SaslRpcServer.AuthMethod.TOKEN.toString()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java index 9af556e..e61798d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java @@ -23,13 +23,16 @@ import java.nio.ByteBuffer; import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -42,8 +45,8 @@ import org.slf4j.LoggerFactory; */ @Private public final class YarnServerSecurityUtils { - private static final Logger LOG = LoggerFactory - .getLogger(YarnServerSecurityUtils.class); + private static final Logger LOG = + LoggerFactory.getLogger(YarnServerSecurityUtils.class); private YarnServerSecurityUtils() { } @@ -55,8 +58,7 @@ public final class YarnServerSecurityUtils { * @return the AMRMTokenIdentifier instance for the current user * @throws YarnException */ - public static AMRMTokenIdentifier authorizeRequest() - throws YarnException { + public static AMRMTokenIdentifier authorizeRequest() throws YarnException { UserGroupInformation remoteUgi; try { @@ -82,9 +84,8 @@ public final class YarnServerSecurityUtils { } } catch (IOException e) { tokenFound = false; - message = - "Got exception while looking for AMRMToken for user " - + remoteUgi.getUserName(); + message = "Got exception while looking for AMRMToken for user " + + remoteUgi.getUserName(); } if (!tokenFound) { @@ -113,8 +114,29 @@ public final class YarnServerSecurityUtils { } /** + * Update the new AMRMToken into the ugi used for RM proxy. + * + * @param token the new AMRMToken sent by RM + * @param user ugi used for RM proxy + * @param conf configuration + */ + public static void updateAMRMToken( + org.apache.hadoop.yarn.api.records.Token token, UserGroupInformation user, + Configuration conf) { + Token amrmToken = new Token( + token.getIdentifier().array(), token.getPassword().array(), + new Text(token.getKind()), new Text(token.getService())); + // Preserve the token service sent by the RM when adding the token + // to ensure we replace the previous token setup by the RM. + // Afterwards we can update the service address for the RPC layer. + user.addToken(amrmToken); + amrmToken.setService(ClientRMProxy.getAMRMTokenService(conf)); + } + + /** * Parses the container launch context and returns a Credential instance that - * contains all the tokens from the launch context. + * contains all the tokens from the launch context. + * * @param launchContext * @return the credential instance * @throws IOException @@ -130,8 +152,7 @@ public final class YarnServerSecurityUtils { buf.reset(tokens); credentials.readTokenStorageStream(buf); if (LOG.isDebugEnabled()) { - for (Token tk : credentials - .getAllTokens()) { + for (Token tk : credentials.getAllTokens()) { LOG.debug(tk.getService() + " = " + tk.toString()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index bda41d4..c4a4002 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -116,7 +116,10 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; -import org.mortbay.log.Log; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Strings; /** * Mock Resource Manager facade implementation that exposes all the methods @@ -165,8 +168,9 @@ public class MockResourceManagerFacade Log.info("Registering application attempt: " + amrmToken); synchronized (applicationContainerIdMap) { - Assert.assertFalse("The application id is already registered: " - + amrmToken, applicationContainerIdMap.containsKey(amrmToken)); + Assert.assertFalse( + "The application id is already registered: " + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); // Keep track of the containers that are returned to this application applicationContainerIdMap.put(amrmToken, new ArrayList()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java new file mode 100644 index 0000000..9159cf7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java @@ -0,0 +1,335 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.uam; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.MockResourceManagerFacade; +import org.apache.hadoop.yarn.util.AsyncCallback; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Unit test for UnmanagedApplicationManager. + */ +public class TestUnmanagedApplicationManager { + private static final Logger LOG = + LoggerFactory.getLogger(TestUnmanagedApplicationManager.class); + + private TestableUnmanagedApplicationManager uam; + private Configuration conf = new YarnConfiguration(); + private CountingCallback callback; + + private ApplicationAttemptId attemptId; + + @Before + public void setup() { + conf.set(YarnConfiguration.RM_CLUSTER_ID, "subclusterId"); + callback = new CountingCallback(); + + attemptId = + ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1); + + uam = new TestableUnmanagedApplicationManager(conf, + attemptId.getApplicationId(), null, "submitter", "appNameSuffix"); + } + + protected void waitForCallBackCountAndCheckZeroPending( + CountingCallback callBack, int expectCallBackCount) { + synchronized (callBack) { + while (callBack.callBackCount != expectCallBackCount) { + try { + callBack.wait(); + } catch (InterruptedException e) { + } + } + Assert.assertEquals( + "Non zero pending requests when number of allocate callbacks reaches " + + expectCallBackCount, + 0, callBack.requestQueueSize); + } + } + + @Test(timeout = 5000) + public void testBasicUsage() + throws YarnException, IOException, InterruptedException { + + createAndRegisterApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + + // Wait for outstanding async allocate callback + waitForCallBackCountAndCheckZeroPending(callback, 1); + + finishApplicationMaster( + FinishApplicationMasterRequest.newInstance(null, null, null), + attemptId); + } + + @Test(timeout = 5000) + public void testReRegister() + throws YarnException, IOException, InterruptedException { + + createAndRegisterApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + + uam.setShouldReRegisterNext(); + + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + + // Wait for outstanding async allocate callback + waitForCallBackCountAndCheckZeroPending(callback, 1); + + uam.setShouldReRegisterNext(); + + finishApplicationMaster( + FinishApplicationMasterRequest.newInstance(null, null, null), + attemptId); + } + + /** + * If register is slow, async allocate requests in the meanwhile should not + * throw or be dropped. + */ + @Test(timeout = 5000) + public void testSlowRegisterCall() + throws YarnException, IOException, InterruptedException { + + // Register with wait() in RM in a separate thread + Thread registerAMThread = new Thread(new Runnable() { + @Override + public void run() { + try { + createAndRegisterApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 1001, null), + attemptId); + } catch (Exception e) { + LOG.info("Register thread exception", e); + } + } + }); + + // Sync obj from mock RM + Object syncObj = MockResourceManagerFacade.getSyncObj(); + + // Wait for register call in the thread get into RM and then wake us + synchronized (syncObj) { + LOG.info("Starting register thread"); + registerAMThread.start(); + try { + LOG.info("Test main starts waiting"); + syncObj.wait(); + LOG.info("Test main wait finished"); + } catch (Exception e) { + LOG.info("Test main wait interrupted", e); + } + } + + // First allocate before register succeeds + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + + // Notify the register thread + synchronized (syncObj) { + syncObj.notifyAll(); + } + + LOG.info("Test main wait for register thread to finish"); + registerAMThread.join(); + LOG.info("Register thread finished"); + + // Second allocate, normal case + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + + // Both allocate before should respond + waitForCallBackCountAndCheckZeroPending(callback, 2); + + finishApplicationMaster( + FinishApplicationMasterRequest.newInstance(null, null, null), + attemptId); + + // Allocates after finishAM should be ignored + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + + Assert.assertEquals(0, callback.requestQueueSize); + + // A short wait just in case the allocates get executed + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + + Assert.assertEquals(2, callback.callBackCount); + } + + @Test(expected = Exception.class) + public void testAllocateWithoutRegister() + throws YarnException, IOException, InterruptedException { + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + } + + @Test(expected = Exception.class) + public void testFinishWithoutRegister() + throws YarnException, IOException, InterruptedException { + finishApplicationMaster( + FinishApplicationMasterRequest.newInstance(null, null, null), + attemptId); + } + + @Test + public void testForceKill() + throws YarnException, IOException, InterruptedException { + createAndRegisterApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + uam.forceKillApplication(); + + try { + uam.forceKillApplication(); + Assert.fail("Should fail because application is already killed"); + } catch (YarnException t) { + } + } + + protected UserGroupInformation getUGIWithToken( + ApplicationAttemptId appAttemptId) { + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(appAttemptId.toString()); + AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, 1); + ugi.addTokenIdentifier(token); + return ugi; + } + + protected RegisterApplicationMasterResponse + createAndRegisterApplicationMaster( + final RegisterApplicationMasterRequest request, + ApplicationAttemptId appAttemptId) + throws YarnException, IOException, InterruptedException { + return getUGIWithToken(appAttemptId).doAs( + new PrivilegedExceptionAction() { + @Override + public RegisterApplicationMasterResponse run() + throws YarnException, IOException { + RegisterApplicationMasterResponse response = + uam.createAndRegisterApplicationMaster(request); + return response; + } + }); + } + + protected void allocateAsync(final AllocateRequest request, + final AsyncCallback callBack, + ApplicationAttemptId appAttemptId) + throws YarnException, IOException, InterruptedException { + getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws YarnException { + uam.allocateAsync(request, callBack); + return null; + } + }); + } + + protected FinishApplicationMasterResponse finishApplicationMaster( + final FinishApplicationMasterRequest request, + ApplicationAttemptId appAttemptId) + throws YarnException, IOException, InterruptedException { + return getUGIWithToken(appAttemptId) + .doAs(new PrivilegedExceptionAction() { + @Override + public FinishApplicationMasterResponse run() + throws YarnException, IOException { + FinishApplicationMasterResponse response = + uam.finishApplicationMaster(request); + return response; + } + }); + } + + protected class CountingCallback implements AsyncCallback { + private int callBackCount; + private int requestQueueSize; + + @Override + public void callback(AllocateResponse response) { + synchronized (this) { + callBackCount++; + requestQueueSize = uam.getRequestQueueSize(); + this.notifyAll(); + } + } + } + + /** + * Testable UnmanagedApplicationManager that talks to a mock RM. + */ + public static class TestableUnmanagedApplicationManager + extends UnmanagedApplicationManager { + + private MockResourceManagerFacade rmProxy; + + public TestableUnmanagedApplicationManager(Configuration conf, + ApplicationId appId, String queueName, String submitter, + String appNameSuffix) { + super(conf, appId, queueName, submitter, appNameSuffix); + } + + @SuppressWarnings("unchecked") + @Override + protected T createRMProxy(final Class protocol, Configuration config, + UserGroupInformation user, Token token) { + if (rmProxy == null) { + rmProxy = new MockResourceManagerFacade(config, 0); + } + return (T) rmProxy; + } + + public void setShouldReRegisterNext() { + if (rmProxy != null) { + rmProxy.setShouldReRegisterNext(); + } + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java index 22fc8f6..3ba4d20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java @@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -48,6 +47,7 @@ import org.apache.hadoop.yarn.server.api.ServerRMProxy; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; +import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,7 +134,8 @@ public final class DefaultRequestInterceptor extends } AllocateResponse allocateResponse = rmClient.allocate(request); if (allocateResponse.getAMRMToken() != null) { - updateAMRMToken(allocateResponse.getAMRMToken()); + YarnServerSecurityUtils.updateAMRMToken(allocateResponse.getAMRMToken(), + this.user, getConf()); } return allocateResponse; @@ -170,7 +171,9 @@ public final class DefaultRequestInterceptor extends ((DistributedSchedulingAMProtocol)rmClient) .allocateForDistributedScheduling(request); if (allocateResponse.getAllocateResponse().getAMRMToken() != null) { - updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken()); + YarnServerSecurityUtils.updateAMRMToken( + allocateResponse.getAllocateResponse().getAMRMToken(), this.user, + getConf()); } return allocateResponse; } else { @@ -195,18 +198,6 @@ public final class DefaultRequestInterceptor extends + "Check if the interceptor pipeline configuration is correct"); } - private void updateAMRMToken(Token token) throws IOException { - org.apache.hadoop.security.token.Token amrmToken = - new org.apache.hadoop.security.token.Token( - token.getIdentifier().array(), token.getPassword().array(), - new Text(token.getKind()), new Text(token.getService())); - // Preserve the token service sent by the RM when adding the token - // to ensure we replace the previous token setup by the RM. - // Afterwards we can update the service address for the RPC layer. - user.addToken(amrmToken); - amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf())); - } - @VisibleForTesting public void setRMClient(final ApplicationMasterProtocol rmClient) { if (rmClient instanceof DistributedSchedulingAMProtocol) { @@ -257,19 +248,12 @@ public final class DefaultRequestInterceptor extends for (org.apache.hadoop.security.token.Token token : UserGroupInformation .getCurrentUser().getTokens()) { if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { - token.setService(getAMRMTokenService(conf)); + token.setService(ClientRMProxy.getAMRMTokenService(conf)); } } } @InterfaceStability.Unstable - public static Text getAMRMTokenService(Configuration conf) { - return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); - } - - @InterfaceStability.Unstable public static Text getTokenService(Configuration conf, String address, String defaultAddr, int defaultPort) { if (HAUtil.isHAEnabled(conf)) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 147ba34..aa4d620 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -213,15 +213,13 @@ public class ApplicationMasterService extends AbstractService implements synchronized (lock) { AllocateResponse lastResponse = lock.getAllocateResponse(); if (hasApplicationMasterRegistered(applicationAttemptId)) { - String message = - "Application Master is already registered : " - + appID; + String message = AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + appID; LOG.warn(message); RMAuditLogger.logFailure( - this.rmContext.getRMApps() - .get(appID).getUser(), - AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message, - appID, applicationAttemptId); + this.rmContext.getRMApps() + .get(appID).getUser(), + AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message, + appID, applicationAttemptId); throw new InvalidApplicationMasterRequestException(message); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9476d86c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 93ab8d0..1603c2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMaste import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -346,9 +347,8 @@ public class TestApplicationMasterLauncher { am.registerAppAttempt(false); Assert.fail(); } catch (Exception e) { - Assert.assertEquals("Application Master is already registered : " - + attempt.getAppAttemptId().getApplicationId(), - e.getMessage()); + Assert.assertEquals(AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + + attempt.getAppAttemptId().getApplicationId(), e.getMessage()); } // Simulate an AM that was disconnected and app attempt was removed --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org