Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F0DD32009F8 for ; Fri, 3 Jun 2016 22:07:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EDE7C160A49; Fri, 3 Jun 2016 20:07:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E7E27160A3B for ; Fri, 3 Jun 2016 22:07:29 +0200 (CEST) Received: (qmail 21313 invoked by uid 500); 3 Jun 2016 20:07:29 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 21302 invoked by uid 99); 3 Jun 2016 20:07:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Jun 2016 20:07:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C7932DFB14; Fri, 3 Jun 2016 20:07:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vinodkv@apache.org To: common-commits@hadoop.apache.org Message-Id: <374ee6d4df5346e09766fe67670892a6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-5098. Fixed ResourceManager's DelegationTokenRenewer to replace expiring system-tokens if RM stops and only restarts after a long time. Contributed by Jian He. Date: Fri, 3 Jun 2016 20:07:28 +0000 (UTC) archived-at: Fri, 03 Jun 2016 20:07:31 -0000 Repository: hadoop Updated Branches: refs/heads/trunk 99cc439e2 -> f10ebc67f YARN-5098. Fixed ResourceManager's DelegationTokenRenewer to replace expiring system-tokens if RM stops and only restarts after a long time. Contributed by Jian He. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f10ebc67 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f10ebc67 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f10ebc67 Branch: refs/heads/trunk Commit: f10ebc67f57a4a2e3cc916c41154ab9b6a4635c9 Parents: 99cc439 Author: Vinod Kumar Vavilapalli Authored: Fri Jun 3 13:00:07 2016 -0700 Committer: Vinod Kumar Vavilapalli Committed: Fri Jun 3 13:00:07 2016 -0700 ---------------------------------------------------------------------- .../security/DelegationTokenRenewer.java | 27 ++++-- .../security/TestDelegationTokenRenewer.java | 98 ++++++++++++++++++++ 2 files changed, 118 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f10ebc67/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java index fd12f11..4177ee2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java @@ -53,6 +53,7 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.service.AbstractService; @@ -459,6 +460,18 @@ public class DelegationTokenRenewer extends AbstractService { try { renewToken(dttr); } catch (IOException ioe) { + if (ioe instanceof SecretManager.InvalidToken + && dttr.maxDate < Time.now() + && evt instanceof DelegationTokenRenewerAppRecoverEvent + && token.getKind().equals(HDFS_DELEGATION_KIND)) { + LOG.info("Failed to renew hdfs token " + dttr + + " on recovery as it expired, requesting new hdfs token for " + + applicationId + ", user=" + evt.getUser(), ioe); + requestNewHdfsDelegationTokenAsProxyUser( + Arrays.asList(applicationId), evt.getUser(), + evt.shouldCancelAtEnd()); + continue; + } throw new IOException("Failed to renew token: " + dttr.token, ioe); } } @@ -485,7 +498,8 @@ public class DelegationTokenRenewer extends AbstractService { } if (!hasHdfsToken) { - requestNewHdfsDelegationToken(Arrays.asList(applicationId), evt.getUser(), + requestNewHdfsDelegationTokenAsProxyUser(Arrays.asList(applicationId), + evt.getUser(), shouldCancelAtEnd); } } @@ -586,8 +600,7 @@ public class DelegationTokenRenewer extends AbstractService { } catch (InterruptedException e) { throw new IOException(e); } - LOG.info("Renewed delegation-token= [" + dttr + "], for " - + dttr.referringAppIds); + LOG.info("Renewed delegation-token= [" + dttr + "]"); } // Request new hdfs token if the token is about to expire, and remove the old @@ -625,12 +638,12 @@ public class DelegationTokenRenewer extends AbstractService { } } LOG.info("Token= (" + dttr + ") is expiring, request new token."); - requestNewHdfsDelegationToken(applicationIds, dttr.user, + requestNewHdfsDelegationTokenAsProxyUser(applicationIds, dttr.user, dttr.shouldCancelAtEnd); } } - private void requestNewHdfsDelegationToken( + private void requestNewHdfsDelegationTokenAsProxyUser( Collection referringAppIds, String user, boolean shouldCancelAtEnd) throws IOException, InterruptedException { @@ -912,8 +925,8 @@ public class DelegationTokenRenewer extends AbstractService { // Setup tokens for renewal during recovery DelegationTokenRenewer.this.handleAppSubmitEvent(event); } catch (Throwable t) { - LOG.warn( - "Unable to add the application to the delegation token renewer.", t); + LOG.warn("Unable to add the application to the delegation token" + + " renewer on recovery.", t); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f10ebc67/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 1bfac8d..74fe534 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -43,6 +43,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -84,6 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -968,6 +970,101 @@ public class TestDelegationTokenRenewer { Assert.assertTrue(appCredentials.getAllTokens().contains(expectedToken)); } + + // 1. token is expired before app completes. + // 2. RM shutdown. + // 3. When RM recovers the app, token renewal will fail as token expired. + // RM should request a new token and sent it to NM for log-aggregation. + @Test + public void testRMRestartWithExpiredToken() throws Exception { + Configuration yarnConf = new YarnConfiguration(); + yarnConf + .setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true); + yarnConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + yarnConf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + yarnConf + .set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + UserGroupInformation.setConfiguration(yarnConf); + + // create Token1: + Text userText1 = new Text("user1"); + DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(userText1, + new Text("renewer1"), userText1); + final Token originalToken = + new Token<>(dtId1.getBytes(), "password1".getBytes(), dtId1.getKind(), + new Text("service1")); + Credentials credentials = new Credentials(); + credentials.addToken(userText1, originalToken); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(yarnConf); + MockRM rm1 = new TestSecurityMockRM(yarnConf, memStore); + rm1.start(); + RMApp app = rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", 1, + credentials); + + // create token2 + Text userText2 = new Text("user1"); + DelegationTokenIdentifier dtId2 = + new DelegationTokenIdentifier(userText1, new Text("renewer2"), + userText2); + final Token updatedToken = + new Token(dtId2.getBytes(), + "password2".getBytes(), dtId2.getKind(), new Text("service2")); + AtomicBoolean firstRenewInvoked = new AtomicBoolean(false); + AtomicBoolean secondRenewInvoked = new AtomicBoolean(false); + MockRM rm2 = new TestSecurityMockRM(yarnConf, memStore) { + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return new DelegationTokenRenewer() { + + @Override + protected void renewToken(final DelegationTokenToRenew dttr) + throws IOException { + + if (dttr.token.equals(updatedToken)) { + secondRenewInvoked.set(true); + super.renewToken(dttr); + } else if (dttr.token.equals(originalToken)){ + firstRenewInvoked.set(true); + throw new InvalidToken("Failed to renew"); + } else { + throw new IOException("Unexpected"); + } + } + + @Override + protected Token[] obtainSystemTokensForUser(String user, + final Credentials credentials) throws IOException { + credentials.addToken(updatedToken.getService(), updatedToken); + return new Token[] { updatedToken }; + } + }; + } + }; + + // simulating restart the rm + rm2.start(); + + // check nm can retrieve the token + final MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); + nm1.registerNode(); + NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + ByteBuffer tokenBuffer = + response.getSystemCredentialsForApps().get(app.getApplicationId()); + Assert.assertNotNull(tokenBuffer); + Credentials appCredentials = new Credentials(); + DataInputByteBuffer buf = new DataInputByteBuffer(); + tokenBuffer.rewind(); + buf.reset(tokenBuffer); + appCredentials.readTokenStorageStream(buf); + Assert.assertTrue(firstRenewInvoked.get() && secondRenewInvoked.get()); + Assert.assertTrue(appCredentials.getAllTokens().contains(updatedToken)); + } + // YARN will get the token for the app submitted without the delegation token. @Test public void testAppSubmissionWithoutDelegationToken() throws Exception { @@ -1158,4 +1255,5 @@ public class TestDelegationTokenRenewer { Assert.assertTrue(dttr.isTimerCancelled()); Assert.assertTrue(Renewer.cancelled); } + } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org