hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject [1/2] YARN-2704. Changed ResourceManager to optionally obtain tokens itself for the sake of localization and log-aggregation for long-running services. Contributed by Jian He.
Date Mon, 27 Oct 2014 22:51:10 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 5dbd27f8b -> 0ad33e148


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ad33e14/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
index f65fcdc..b824df7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
@@ -38,6 +38,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -54,6 +55,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -61,6 +63,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -74,11 +77,16 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -88,16 +96,18 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import com.google.common.base.Supplier;
+
 /**
  * unit test - 
  * tests addition/deletion/cancellation of renewals of delegation tokens
  *
  */
-@SuppressWarnings("rawtypes")
+@SuppressWarnings({"rawtypes", "unchecked"})
 public class TestDelegationTokenRenewer {
   private static final Log LOG = 
       LogFactory.getLog(TestDelegationTokenRenewer.class);
-  private static final Text KIND = new Text("TestDelegationTokenRenewer.Token");
+  private static final Text KIND = new Text("HDFS_DELEGATION_TOKEN");
   
   private static BlockingQueue<Event> eventQueue;
   private static volatile AtomicInteger counter;
@@ -125,6 +135,9 @@ public class TestDelegationTokenRenewer {
 
     @Override
     public long renew(Token<?> t, Configuration conf) throws IOException {
+      if ( !(t instanceof MyToken)) {
+        return DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
+      }
       MyToken token = (MyToken)t;
       if(token.isCanceled()) {
         throw new InvalidToken("token has been canceled");
@@ -179,8 +192,10 @@ public class TestDelegationTokenRenewer {
     dispatcher = new AsyncDispatcher(eventQueue);
     Renewer.reset();
     delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter);
-    RMContext mockContext = mock(RMContext.class);
+    RMContext mockContext =  mock(RMContext.class);
     ClientRMService mockClientRMService = mock(ClientRMService.class);
+    when(mockContext.getSystemCredentialsForApps()).thenReturn(
+      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
     when(mockContext.getDelegationTokenRenewer()).thenReturn(
         delegationTokenRenewer);
     when(mockContext.getDispatcher()).thenReturn(dispatcher);
@@ -291,8 +306,8 @@ public class TestDelegationTokenRenewer {
     
     MyDelegationTokenSecretManager sm = new MyDelegationTokenSecretManager(
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
-        DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT,
         3600000, null);
     sm.startThreads();
     
@@ -353,7 +368,7 @@ public class TestDelegationTokenRenewer {
     // register the tokens for renewal
     ApplicationId applicationId_0 = 
         BuilderUtils.newApplicationId(0, 0);
-    delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true);
+    delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true, "user");
     waitForEventsToGetProcessed(delegationTokenRenewer);
 
     // first 3 initial renewals + 1 real
@@ -393,7 +408,7 @@ public class TestDelegationTokenRenewer {
     
 
     ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
-    delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true);
+    delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true, "user");
     waitForEventsToGetProcessed(delegationTokenRenewer);
     delegationTokenRenewer.applicationFinished(applicationId_1);
     waitForEventsToGetProcessed(delegationTokenRenewer);
@@ -429,7 +444,7 @@ public class TestDelegationTokenRenewer {
     
     // register the tokens for renewal
     ApplicationId appId =  BuilderUtils.newApplicationId(0, 0);
-    delegationTokenRenewer.addApplicationAsync(appId, ts, true);
+    delegationTokenRenewer.addApplicationAsync(appId, ts, true, "user");
     int waitCnt = 20;
     while (waitCnt-- >0) {
       if (!eventQueue.isEmpty()) {
@@ -473,7 +488,7 @@ public class TestDelegationTokenRenewer {
     
 
     ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
-    delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false);
+    delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false, "user");
     waitForEventsToGetProcessed(delegationTokenRenewer);
     delegationTokenRenewer.applicationFinished(applicationId_1);
     waitForEventsToGetProcessed(delegationTokenRenewer);
@@ -516,6 +531,8 @@ public class TestDelegationTokenRenewer {
     DelegationTokenRenewer localDtr =
         createNewDelegationTokenRenewer(lconf, counter);
     RMContext mockContext = mock(RMContext.class);
+    when(mockContext.getSystemCredentialsForApps()).thenReturn(
+      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
     when(mockContext.getDelegationTokenRenewer()).thenReturn(
@@ -540,7 +557,7 @@ public class TestDelegationTokenRenewer {
 
     // register the tokens for renewal
     ApplicationId applicationId_0 =  BuilderUtils.newApplicationId(0, 0);
-    localDtr.addApplicationAsync(applicationId_0, ts, true);
+    localDtr.addApplicationAsync(applicationId_0, ts, true, "user");
     waitForEventsToGetProcessed(localDtr);
     if (!eventQueue.isEmpty()){
       Event evt = eventQueue.take();
@@ -593,6 +610,8 @@ public class TestDelegationTokenRenewer {
     DelegationTokenRenewer localDtr =
         createNewDelegationTokenRenewer(conf, counter);
     RMContext mockContext = mock(RMContext.class);
+    when(mockContext.getSystemCredentialsForApps()).thenReturn(
+      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
     when(mockContext.getDelegationTokenRenewer()).thenReturn(
@@ -617,7 +636,7 @@ public class TestDelegationTokenRenewer {
 
     // register the tokens for renewal
     ApplicationId applicationId_0 =  BuilderUtils.newApplicationId(0, 0);
-    localDtr.addApplicationAsync(applicationId_0, ts, true);
+    localDtr.addApplicationAsync(applicationId_0, ts, true, "user");
     localDtr.applicationFinished(applicationId_0);
     waitForEventsToGetProcessed(delegationTokenRenewer);
     //Send another keep alive.
@@ -640,7 +659,7 @@ public class TestDelegationTokenRenewer {
 
   private DelegationTokenRenewer createNewDelegationTokenRenewer(
       Configuration conf, final AtomicInteger counter) {
-    return new DelegationTokenRenewer() {
+    DelegationTokenRenewer renew =  new DelegationTokenRenewer() {
 
       @Override
       protected ThreadPoolExecutor
@@ -664,6 +683,8 @@ public class TestDelegationTokenRenewer {
         return pool;
       }
     };
+    renew.setRMContext(TestUtils.getMockRMContext());
+    return renew;
   }
 
   private void waitForEventsToGetProcessed(DelegationTokenRenewer dtr)
@@ -679,7 +700,12 @@ public class TestDelegationTokenRenewer {
   public void testDTRonAppSubmission()
       throws IOException, InterruptedException, BrokenBarrierException {
     final Credentials credsx = new Credentials();
-    final Token<?> tokenx = mock(Token.class);
+    final Token<DelegationTokenIdentifier> tokenx = mock(Token.class);
+    when(tokenx.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
+    DelegationTokenIdentifier dtId1 = 
+        new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"),
+          new Text("user1"));
+    when(tokenx.decodeIdentifier()).thenReturn(dtId1);
     credsx.addToken(new Text("token"), tokenx);
     doReturn(true).when(tokenx).isManaged();
     doThrow(new IOException("boom"))
@@ -688,6 +714,8 @@ public class TestDelegationTokenRenewer {
     final DelegationTokenRenewer dtr =
          createNewDelegationTokenRenewer(conf, counter);
     RMContext mockContext = mock(RMContext.class);
+    when(mockContext.getSystemCredentialsForApps()).thenReturn(
+      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
     InetSocketAddress sockAddr =
@@ -699,7 +727,7 @@ public class TestDelegationTokenRenewer {
     dtr.start();
 
     try {
-      dtr.addApplicationSync(mock(ApplicationId.class), credsx, false);
+      dtr.addApplicationSync(mock(ApplicationId.class), credsx, false, "user");
       fail("Catch IOException on app submission");
     } catch (IOException e){
       Assert.assertTrue(e.getMessage().contains(tokenx.toString()));
@@ -716,7 +744,12 @@ public class TestDelegationTokenRenewer {
                                                                                
     // this token uses barriers to block during renew                          
     final Credentials creds1 = new Credentials();                              
-    final Token<?> token1 = mock(Token.class);                                 
+    final Token<DelegationTokenIdentifier> token1 = mock(Token.class);    
+    when(token1.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
+    DelegationTokenIdentifier dtId1 = 
+        new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"),
+          new Text("user1"));
+    when(token1.decodeIdentifier()).thenReturn(dtId1);
     creds1.addToken(new Text("token"), token1);                                
     doReturn(true).when(token1).isManaged();                                   
     doAnswer(new Answer<Long>() {                                              
@@ -729,7 +762,9 @@ public class TestDelegationTokenRenewer {
                                                                                
     // this dummy token fakes renewing                                         
     final Credentials creds2 = new Credentials();                              
-    final Token<?> token2 = mock(Token.class);                                 
+    final Token<DelegationTokenIdentifier> token2 = mock(Token.class);           
+    when(token2.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
+    when(token2.decodeIdentifier()).thenReturn(dtId1);
     creds2.addToken(new Text("token"), token2);                                
     doReturn(true).when(token2).isManaged();                                   
     doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));     
@@ -737,7 +772,9 @@ public class TestDelegationTokenRenewer {
     // fire up the renewer                                                     
     final DelegationTokenRenewer dtr =
         createNewDelegationTokenRenewer(conf, counter);           
-    RMContext mockContext = mock(RMContext.class);                             
+    RMContext mockContext = mock(RMContext.class);
+    when(mockContext.getSystemCredentialsForApps()).thenReturn(
+      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
     ClientRMService mockClientRMService = mock(ClientRMService.class);         
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);    
     InetSocketAddress sockAddr =                                               
@@ -751,14 +788,14 @@ public class TestDelegationTokenRenewer {
     Thread submitThread = new Thread() {                                       
       @Override                                                                
       public void run() {
-        dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false);
+        dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false, "user");
       }                                                                        
     };                                                                         
     submitThread.start();                                                      
                                                                                
     // wait till 1st submit blocks, then submit another
     startBarrier.await();                           
-    dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false);
+    dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false, "user");
     // signal 1st to complete                                                  
     endBarrier.await();                                                        
     submitThread.join(); 
@@ -793,4 +830,139 @@ public class TestDelegationTokenRenewer {
           "Bad header found in token storage"));
     }
   }
+
+
+  @Test (timeout = 20000)
+  public void testReplaceExpiringDelegationToken() throws Exception {
+    conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true);
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+      "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+
+    // create Token1:
+    Text userText1 = new Text("user1");
+    DelegationTokenIdentifier dtId1 =
+        new DelegationTokenIdentifier(userText1, new Text("renewer1"),
+          userText1);
+    // set max date to 0 to simulate an expiring token;
+    dtId1.setMaxDate(0);
+    final Token<DelegationTokenIdentifier> token1 =
+        new Token<DelegationTokenIdentifier>(dtId1.getBytes(),
+          "password1".getBytes(), dtId1.getKind(), new Text("service1"));
+
+    // create token2
+    Text userText2 = new Text("user2");
+    DelegationTokenIdentifier dtId2 =
+        new DelegationTokenIdentifier(userText1, new Text("renewer2"),
+          userText2);
+    final Token<DelegationTokenIdentifier> expectedToken =
+        new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
+          "password2".getBytes(), dtId2.getKind(), new Text("service2"));
+
+    final MockRM rm = new TestSecurityMockRM(conf, null) {
+      @Override
+      protected DelegationTokenRenewer createDelegationTokenRenewer() {
+        return new DelegationTokenRenewer() {
+          @Override
+          protected Token<?>[] obtainSystemTokensForUser(String user,
+              final Credentials credentials) throws IOException {
+            credentials.addToken(expectedToken.getService(), expectedToken);
+            return new Token<?>[] { expectedToken };
+          }
+        };
+      }
+    };
+    rm.start();
+    Credentials credentials = new Credentials();
+    credentials.addToken(userText1, token1);
+
+    RMApp app =
+        rm.submitApp(200, "name", "user",
+          new HashMap<ApplicationAccessType, String>(), false, "default", 1,
+          credentials);
+
+    // wait for the initial expiring hdfs token to be removed.
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      public Boolean get() {
+        return !rm.getRMContext().getDelegationTokenRenewer()
+          .getDelegationTokens().contains(token1);
+      }
+    }, 1000, 20000);
+
+    // wait for the new retrieved hdfs token.
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      public Boolean get() {
+        return rm.getRMContext().getDelegationTokenRenewer()
+          .getDelegationTokens().contains(expectedToken);
+      }
+    }, 1000, 20000);
+
+    // check nm can retrieve the token
+    final MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+    nm1.registerNode();
+    NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
+    ByteBuffer tokenBuffer =
+        response.getSystemCredentialsForApps().get(app.getApplicationId());
+    Assert.assertNotNull(tokenBuffer);
+    Credentials appCredentials = new Credentials();
+    DataInputByteBuffer buf = new DataInputByteBuffer();
+    tokenBuffer.rewind();
+    buf.reset(tokenBuffer);
+    appCredentials.readTokenStorageStream(buf);
+    Assert.assertTrue(appCredentials.getAllTokens().contains(expectedToken));
+  }
+
+  // YARN will get the token for the app submitted without the delegation token.
+  @Test
+  public void testAppSubmissionWithoutDelegationToken() throws Exception {
+    // create token2
+    Text userText2 = new Text("user2");
+    DelegationTokenIdentifier dtId2 =
+        new DelegationTokenIdentifier(new Text("user2"), new Text("renewer2"),
+          userText2);
+    final Token<DelegationTokenIdentifier> token2 =
+        new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
+          "password2".getBytes(), dtId2.getKind(), new Text("service2"));
+    final MockRM rm = new TestSecurityMockRM(conf, null) {
+      @Override
+      protected DelegationTokenRenewer createDelegationTokenRenewer() {
+        return new DelegationTokenRenewer() {
+          @Override
+          protected Token<?>[] obtainSystemTokensForUser(String user,
+              final Credentials credentials) throws IOException {
+            credentials.addToken(token2.getService(), token2);
+            return new Token<?>[] { token2 };
+          }
+        };
+      }
+    };
+    rm.start();
+
+    // submit an app without delegationToken
+    RMApp app = rm.submitApp(200);
+
+    // wait for the new retrieved hdfs token.
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      public Boolean get() {
+        return rm.getRMContext().getDelegationTokenRenewer()
+          .getDelegationTokens().contains(token2);
+      }
+    }, 1000, 20000);
+
+    // check nm can retrieve the token
+    final MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+    nm1.registerNode();
+    NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
+    ByteBuffer tokenBuffer =
+        response.getSystemCredentialsForApps().get(app.getApplicationId());
+    Assert.assertNotNull(tokenBuffer);
+    Credentials appCredentials = new Credentials();
+    DataInputByteBuffer buf = new DataInputByteBuffer();
+    tokenBuffer.rewind();
+    buf.reset(tokenBuffer);
+    appCredentials.readTokenStorageStream(buf);
+    Assert.assertTrue(appCredentials.getAllTokens().contains(token2));
+  }
 }


Mime
View raw message