Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ABF33E595 for ; Wed, 6 Feb 2013 19:04:18 +0000 (UTC) Received: (qmail 96871 invoked by uid 500); 6 Feb 2013 19:04:18 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 96844 invoked by uid 500); 6 Feb 2013 19:04:18 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 96834 invoked by uid 99); 6 Feb 2013 19:04:18 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Feb 2013 19:04:18 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Feb 2013 19:04:13 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6C9EF2388847; Wed, 6 Feb 2013 19:03:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1443131 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/security/ hadoop-yarn/hadoop-yarn-cli... Date: Wed, 06 Feb 2013 19:03:53 -0000 To: yarn-commits@hadoop.apache.org From: sseth@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130206190353.6C9EF2388847@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: sseth Date: Wed Feb 6 19:03:52 2013 New Revision: 1443131 URL: http://svn.apache.org/viewvc?rev=1443131&view=rev Log: YARN-355. Fixes a bug where RM app submission could jam under load. Contributed by Daryn Sharp. Removed: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/security/RMDelegationTokenRenewer.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/resources/ Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1443131&r1=1443130&r2=1443131&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Feb 6 19:03:52 2013 @@ -234,6 +234,9 @@ Release 2.0.3-alpha - 2013-02-06 YARN-370. Fix SchedulerUtils to correctly round up the resource for containers. (Zhijie Shen via acmurthy) + YARN-355. Fixes a bug where RM app submission could jam under load. + (Daryn Sharp via sseth) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java?rev=1443131&r1=1443130&r2=1443131&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java Wed Feb 6 19:03:52 2013 @@ -25,13 +25,11 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.yarn.api.ClientRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; @@ -47,8 +45,6 @@ import org.apache.hadoop.yarn.api.protoc import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -199,30 +195,6 @@ public class YarnClientImpl extends Abst } - // Not part of YarnClient itself. Placed in YarnClientImpl while renew/cancel - // are part of ClientRMProtocol. - @Private - public long renewRMDelegationToken(DelegationToken rmToken) - throws YarnRemoteException { - RenewDelegationTokenRequest request = Records - .newRecord(RenewDelegationTokenRequest.class); - request.setDelegationToken(rmToken); - RenewDelegationTokenResponse response = rmClient - .renewDelegationToken(request); - return response.getNextExpirationTime(); - } - - // Not part of YarnClient itself. Placed in YarnClientImpl while renew/cancel - // are part of ClietnRMProtocol - @Private - public void cancelRMDelegationToken(DelegationToken rmToken) - throws YarnRemoteException { - CancelDelegationTokenRequest request = Records - .newRecord(CancelDelegationTokenRequest.class); - request.setDelegationToken(rmToken); - rmClient.cancelDelegationToken(request); - } - private GetQueueInfoRequest getQueueInfoRequest(String queueName, boolean includeApplications, boolean includeChildQueues, boolean recursive) { Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java?rev=1443131&r1=1443130&r2=1443131&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java Wed Feb 6 19:03:52 2013 @@ -19,10 +19,28 @@ package org.apache.hadoop.yarn.security.client; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; +import org.apache.hadoop.yarn.api.ClientRMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.records.DelegationToken; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; /** * Delegation Token Identifier that identifies the delegation tokens from the @@ -51,4 +69,100 @@ public class RMDelegationTokenIdentifier public Text getKind() { return KIND_NAME; } + + public static class Renewer extends TokenRenewer { + + @Override + public boolean handleKind(Text kind) { + return KIND_NAME.equals(kind); + } + + @Override + public boolean isManaged(Token token) throws IOException { + return true; + } + + private static + AbstractDelegationTokenSecretManager localSecretManager; + private static InetSocketAddress localServiceAddress; + + @Private + public static void setSecretManager( + AbstractDelegationTokenSecretManager secretManager, + InetSocketAddress serviceAddress) { + localSecretManager = secretManager; + localServiceAddress = serviceAddress; + } + + @SuppressWarnings("unchecked") + @Override + public long renew(Token token, Configuration conf) throws IOException, + InterruptedException { + final ClientRMProtocol rmClient = getRmClient(token, conf); + if (rmClient != null) { + try { + RenewDelegationTokenRequest request = + Records.newRecord(RenewDelegationTokenRequest.class); + request.setDelegationToken(convertToProtoToken(token)); + return rmClient.renewDelegationToken(request).getNextExpirationTime(); + } finally { + RPC.stopProxy(rmClient); + } + } else { + return localSecretManager.renewToken( + (Token)token, getRenewer(token)); + } + } + + @SuppressWarnings("unchecked") + @Override + public void cancel(Token token, Configuration conf) throws IOException, + InterruptedException { + final ClientRMProtocol rmClient = getRmClient(token, conf); + if (rmClient != null) { + try { + CancelDelegationTokenRequest request = + Records.newRecord(CancelDelegationTokenRequest.class); + request.setDelegationToken(convertToProtoToken(token)); + rmClient.cancelDelegationToken(request); + } finally { + RPC.stopProxy(rmClient); + } + } else { + localSecretManager.cancelToken( + (Token)token, getRenewer(token)); + } + } + + private static ClientRMProtocol getRmClient(Token token, + Configuration conf) { + InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token); + if (localSecretManager != null) { + // return null if it's our token + if (localServiceAddress.getAddress().isAnyLocalAddress()) { + if (NetUtils.isLocalAddress(addr.getAddress()) && + addr.getPort() == localServiceAddress.getPort()) { + return null; + } + } else if (addr.equals(localServiceAddress)) { + return null; + } + } + final YarnRPC rpc = YarnRPC.create(conf); + return (ClientRMProtocol)rpc.getProxy(ClientRMProtocol.class, addr, conf); + } + + // get renewer so we can always renew our own tokens + @SuppressWarnings("unchecked") + private static String getRenewer(Token token) throws IOException { + return ((Token)token).decodeIdentifier() + .getRenewer().toString(); + } + + private static DelegationToken convertToProtoToken(Token token) { + return BuilderUtils.newDelegationToken( + token.getIdentifier(), token.getKind().toString(), + token.getPassword(), token.getService().toString()); + } + } } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer?rev=1443131&r1=1443130&r2=1443131&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer Wed Feb 6 19:03:52 2013 @@ -13,3 +13,4 @@ # org.apache.hadoop.yarn.security.ApplicationTokenIdentifier$Renewer org.apache.hadoop.yarn.security.ContainerTokenIdentifier$Renewer +org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier$Renewer Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1443131&r1=1443130&r2=1443131&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Wed Feb 6 19:03:52 2013 @@ -157,6 +157,10 @@ public class ClientRMService extends Abs this.server.start(); clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS, server.getListenerAddress()); + // enable RM to short-circuit token operations directly to itself + RMDelegationTokenIdentifier.Renewer.setSecretManager( + rmDTSecretManager, clientBindAddress); + super.start(); } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java?rev=1443131&r1=1443130&r2=1443131&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java Wed Feb 6 19:03:52 2013 @@ -17,13 +17,12 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; @@ -34,9 +33,15 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; @@ -46,12 +51,14 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ProtoUtils; import org.apache.hadoop.yarn.util.Records; +import org.junit.Before; import org.junit.Test; @@ -59,6 +66,10 @@ public class TestClientRMTokens { private static final Log LOG = LogFactory.getLog(TestClientRMTokens.class); + @Before + public void resetSecretManager() { + RMDelegationTokenIdentifier.Renewer.setSecretManager(null, null); + } @Test public void testDelegationToken() throws IOException, InterruptedException { @@ -200,7 +211,122 @@ public class TestClientRMTokens { RPC.stopProxy(clientRMWithDT); } } + } + + @Test + public void testShortCircuitRenewCancel() + throws IOException, InterruptedException { + InetSocketAddress addr = + new InetSocketAddress(InetAddress.getLocalHost(), 123); + checkShortCircuitRenewCancel(addr, addr, true); + } + + @Test + public void testShortCircuitRenewCancelWildcardAddress() + throws IOException, InterruptedException { + InetSocketAddress rmAddr = new InetSocketAddress(123); + checkShortCircuitRenewCancel( + rmAddr, + new InetSocketAddress(InetAddress.getLocalHost(), rmAddr.getPort()), + true); + } + + @Test + public void testShortCircuitRenewCancelSameHostDifferentPort() + throws IOException, InterruptedException { + InetSocketAddress rmAddr = + new InetSocketAddress(InetAddress.getLocalHost(), 123); + checkShortCircuitRenewCancel( + rmAddr, + new InetSocketAddress(rmAddr.getAddress(), rmAddr.getPort()+1), + false); + } + + @Test + public void testShortCircuitRenewCancelDifferentHostSamePort() + throws IOException, InterruptedException { + InetSocketAddress rmAddr = + new InetSocketAddress(InetAddress.getLocalHost(), 123); + checkShortCircuitRenewCancel( + rmAddr, + new InetSocketAddress("1.1.1.1", rmAddr.getPort()), + false); + } + + @Test + public void testShortCircuitRenewCancelDifferentHostDifferentPort() + throws IOException, InterruptedException { + InetSocketAddress rmAddr = + new InetSocketAddress(InetAddress.getLocalHost(), 123); + checkShortCircuitRenewCancel( + rmAddr, + new InetSocketAddress("1.1.1.1", rmAddr.getPort()+1), + false); + } + + @SuppressWarnings("unchecked") + private void checkShortCircuitRenewCancel(InetSocketAddress rmAddr, + InetSocketAddress serviceAddr, + boolean shouldShortCircuit + ) throws IOException, InterruptedException { + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.IPC_RPC_IMPL, + YarnBadRPC.class, YarnRPC.class); + RMDelegationTokenSecretManager secretManager = + mock(RMDelegationTokenSecretManager.class); + RMDelegationTokenIdentifier.Renewer.setSecretManager(secretManager, rmAddr); + + RMDelegationTokenIdentifier ident = new RMDelegationTokenIdentifier( + new Text("owner"), new Text("renewer"), null); + Token token = + new Token(ident, secretManager); + + SecurityUtil.setTokenService(token, serviceAddr); + if (shouldShortCircuit) { + token.renew(conf); + verify(secretManager).renewToken(eq(token), eq("renewer")); + reset(secretManager); + token.cancel(conf); + verify(secretManager).cancelToken(eq(token), eq("renewer")); + } else { + try { + token.renew(conf); + fail(); + } catch (RuntimeException e) { + assertEquals("getProxy", e.getMessage()); + } + verify(secretManager, never()).renewToken(any(Token.class), anyString()); + try { + token.cancel(conf); + fail(); + } catch (RuntimeException e) { + assertEquals("getProxy", e.getMessage()); + } + verify(secretManager, never()).cancelToken(any(Token.class), anyString()); + } + } + + @SuppressWarnings("rawtypes") + public static class YarnBadRPC extends YarnRPC { + @Override + public Object getProxy(Class protocol, InetSocketAddress addr, + Configuration conf) { + throw new RuntimeException("getProxy"); + } + + @Override + public void stopProxy(Object proxy, Configuration conf) { + throw new RuntimeException("stopProxy"); + } + + @Override + public Server getServer(Class protocol, Object instance, + InetSocketAddress addr, Configuration conf, + SecretManager secretManager, + int numHandlers, String portRangeConfig) { + throw new RuntimeException("getServer"); + } } // Get the delegation token directly as it is a little difficult to setup