hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject hadoop git commit: YARN-3055. Fixed ResourceManager's DelegationTokenRenewer to not stop token renewal of applications part of a bigger workflow. Contributed by Daryn Sharp.
Date Thu, 09 Apr 2015 20:09:45 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 922b7ed21 -> 9c5911294


YARN-3055. Fixed ResourceManager's DelegationTokenRenewer to not stop token renewal of applications
part of a bigger workflow. Contributed by Daryn Sharp.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9c591129
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9c591129
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9c591129

Branch: refs/heads/trunk
Commit: 9c5911294e0ba71aefe4763731b0e780cde9d0ca
Parents: 922b7ed
Author: Vinod Kumar Vavilapalli <vinodkv@apache.org>
Authored: Thu Apr 9 13:08:53 2015 -0700
Committer: Vinod Kumar Vavilapalli <vinodkv@apache.org>
Committed: Thu Apr 9 13:08:53 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../security/DelegationTokenRenewer.java        | 137 ++++++++++++-------
 .../security/TestDelegationTokenRenewer.java    |  87 +++++++++++-
 3 files changed, 173 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c591129/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index c6c56d3..c29fdea 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -912,6 +912,9 @@ Release 2.7.0 - UNRELEASED
     YARN-3466. Fix RM nodes web page to sort by node HTTP-address, #containers 
     and node-label column (Jason Lowe via wangda)
 
+    YARN-3055. Fixed ResourceManager's DelegationTokenRenewer to not stop token
+    renewal of applications part of a bigger workflow. (Daryn Sharp via vinodkv)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c591129/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
index 2619971..d49ecfc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
@@ -229,15 +230,16 @@ public class DelegationTokenRenewer extends AbstractService {
   @VisibleForTesting
   protected static class DelegationTokenToRenew {
     public final Token<?> token;
-    public final ApplicationId applicationId;
+    public final Collection<ApplicationId> referringAppIds;
     public final Configuration conf;
     public long expirationDate;
-    public TimerTask timerTask;
+    public RenewalTimerTask timerTask;
     public volatile boolean shouldCancelAtEnd;
     public long maxDate;
     public String user;
 
-    public DelegationTokenToRenew(ApplicationId jId, Token<?> token,
+    public DelegationTokenToRenew(Collection<ApplicationId> applicationIds,
+        Token<?> token,
         Configuration conf, long expirationDate, boolean shouldCancelAtEnd,
         String user) {
       this.token = token;
@@ -251,20 +253,33 @@ public class DelegationTokenRenewer extends AbstractService {
           throw new YarnRuntimeException(e);
         }
       }
-      this.applicationId = jId;
+      this.referringAppIds = Collections.synchronizedSet(
+          new HashSet<ApplicationId>(applicationIds));
       this.conf = conf;
       this.expirationDate = expirationDate;
       this.timerTask = null;
       this.shouldCancelAtEnd = shouldCancelAtEnd;
     }
     
-    public void setTimerTask(TimerTask tTask) {
+    public void setTimerTask(RenewalTimerTask tTask) {
       timerTask = tTask;
     }
-    
+
+    @VisibleForTesting
+    public void cancelTimer() {
+      if (timerTask != null) {
+        timerTask.cancel();
+      }
+    }
+
+    @VisibleForTesting
+    public boolean isTimerCancelled() {
+      return (timerTask != null) && timerTask.cancelled.get();
+    }
+
     @Override
     public String toString() {
-      return token + ";exp=" + expirationDate;
+      return token + ";exp=" + expirationDate + "; apps=" + referringAppIds;
     }
     
     @Override
@@ -415,19 +430,16 @@ public class DelegationTokenRenewer extends AbstractService {
         }
 
         DelegationTokenToRenew dttr = allTokens.get(token);
-        if (dttr != null) {
-          // If any of the jobs sharing the same token doesn't want to cancel
-          // the token, we should not cancel the token.
-          if (!evt.shouldCancelAtEnd) {
-            dttr.shouldCancelAtEnd = evt.shouldCancelAtEnd;
-            LOG.info("Set shouldCancelAtEnd=" + shouldCancelAtEnd
-                + " for token " + dttr.token);
+        if (dttr == null) {
+          dttr = new DelegationTokenToRenew(Arrays.asList(applicationId), token,
+              getConfig(), now, shouldCancelAtEnd, evt.getUser());
+          try {
+            renewToken(dttr);
+          } catch (IOException ioe) {
+            throw new IOException("Failed to renew token: " + dttr.token, ioe);
           }
-          continue;
         }
-
-        tokenList.add(new DelegationTokenToRenew(applicationId, token,
-          getConfig(), now, shouldCancelAtEnd, evt.getUser()));
+        tokenList.add(dttr);
       }
     }
 
@@ -436,21 +448,21 @@ public class DelegationTokenRenewer extends AbstractService {
       // If user provides incorrect token then it should not be added for
       // renewal.
       for (DelegationTokenToRenew dtr : tokenList) {
-        try {
-          renewToken(dtr);
-        } catch (IOException ioe) {
-          throw new IOException("Failed to renew token: " + dtr.token, ioe);
+        DelegationTokenToRenew currentDtr =
+            allTokens.putIfAbsent(dtr.token, dtr);
+        if (currentDtr != null) {
+          // another job beat us
+          currentDtr.referringAppIds.add(applicationId);
+          appTokens.get(applicationId).add(currentDtr);
+        } else {
+          appTokens.get(applicationId).add(dtr);
+          setTimerForTokenRenewal(dtr);
         }
       }
-      for (DelegationTokenToRenew dtr : tokenList) {
-        appTokens.get(applicationId).add(dtr);
-        allTokens.put(dtr.token, dtr);
-        setTimerForTokenRenewal(dtr);
-      }
     }
 
     if (!hasHdfsToken) {
-      requestNewHdfsDelegationToken(applicationId, evt.getUser(),
+      requestNewHdfsDelegationToken(Arrays.asList(applicationId), evt.getUser(),
         shouldCancelAtEnd);
     }
   }
@@ -478,7 +490,7 @@ public class DelegationTokenRenewer extends AbstractService {
       try {
         requestNewHdfsDelegationTokenIfNeeded(dttr);
         // if the token is not replaced by a new token, renew the token
-        if (appTokens.get(dttr.applicationId).contains(dttr)) {
+        if (!dttr.isTimerCancelled()) {
           renewToken(dttr);
           setTimerForTokenRenewal(dttr);// set the next one
         } else {
@@ -508,12 +520,12 @@ public class DelegationTokenRenewer extends AbstractService {
     long expiresIn = token.expirationDate - System.currentTimeMillis();
     long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration
     // need to create new task every time
-    TimerTask tTask = new RenewalTimerTask(token);
+    RenewalTimerTask tTask = new RenewalTimerTask(token);
     token.setTimerTask(tTask); // keep reference to the timer
 
     renewalTimer.schedule(token.timerTask, new Date(renewIn));
     LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = "
-        + token.applicationId);
+        + token.referringAppIds);
   }
 
   // renew a token
@@ -535,7 +547,7 @@ public class DelegationTokenRenewer extends AbstractService {
       throw new IOException(e);
     }
     LOG.info("Renewed delegation-token= [" + dttr + "], for "
-        + dttr.applicationId);
+        + dttr.referringAppIds);
   }
 
   // Request new hdfs token if the token is about to expire, and remove the old
@@ -548,30 +560,37 @@ public class DelegationTokenRenewer extends AbstractService {
         && dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining
         && dttr.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
 
+      final Collection<ApplicationId> applicationIds;
+      synchronized (dttr.referringAppIds) {
+        applicationIds = new HashSet<>(dttr.referringAppIds);
+        dttr.referringAppIds.clear();
+      }
       // remove all old expiring hdfs tokens for this application.
-      Set<DelegationTokenToRenew> tokenSet = appTokens.get(dttr.applicationId);
-      if (tokenSet != null && !tokenSet.isEmpty()) {
+      for (ApplicationId appId : applicationIds) {
+        Set<DelegationTokenToRenew> tokenSet = appTokens.get(appId);
+        if (tokenSet == null || tokenSet.isEmpty()) {
+          continue;
+        }
         Iterator<DelegationTokenToRenew> iter = tokenSet.iterator();
         synchronized (tokenSet) {
           while (iter.hasNext()) {
             DelegationTokenToRenew t = iter.next();
             if (t.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
               iter.remove();
-              if (t.timerTask != null) {
-                t.timerTask.cancel();
-              }
+              t.cancelTimer();
               LOG.info("Removed expiring token " + t);
             }
           }
         }
       }
       LOG.info("Token= (" + dttr + ") is expiring, request new token.");
-      requestNewHdfsDelegationToken(dttr.applicationId, dttr.user,
-        dttr.shouldCancelAtEnd);
+      requestNewHdfsDelegationToken(applicationIds, dttr.user,
+          dttr.shouldCancelAtEnd);
     }
   }
 
-  private void requestNewHdfsDelegationToken(ApplicationId applicationId,
+  private void requestNewHdfsDelegationToken(
+      Collection<ApplicationId> referringAppIds,
       String user, boolean shouldCancelAtEnd) throws IOException,
       InterruptedException {
     if (!hasProxyUserPrivileges) {
@@ -583,18 +602,20 @@ public class DelegationTokenRenewer extends AbstractService {
     Token<?>[] newTokens = obtainSystemTokensForUser(user, credentials);
 
     // Add new tokens to the toRenew list.
-    LOG.info("Received new tokens for " + applicationId + ". Received "
+    LOG.info("Received new tokens for " + referringAppIds + ". Received "
         + newTokens.length + " tokens.");
     if (newTokens.length > 0) {
       for (Token<?> token : newTokens) {
         if (token.isManaged()) {
           DelegationTokenToRenew tokenToRenew =
-              new DelegationTokenToRenew(applicationId, token, getConfig(),
+              new DelegationTokenToRenew(referringAppIds, token, getConfig(),
                 Time.now(), shouldCancelAtEnd, user);
           // renew the token to get the next expiration date.
           renewToken(tokenToRenew);
           setTimerForTokenRenewal(tokenToRenew);
-          appTokens.get(applicationId).add(tokenToRenew);
+          for (ApplicationId applicationId : referringAppIds) {
+            appTokens.get(applicationId).add(tokenToRenew);
+          }
           LOG.info("Received new token " + token);
         }
       }
@@ -602,7 +623,9 @@ public class DelegationTokenRenewer extends AbstractService {
     DataOutputBuffer dob = new DataOutputBuffer();
     credentials.writeTokenStorageToStream(dob);
     ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-    rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
+    for (ApplicationId applicationId : referringAppIds) {
+      rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
+    }
   }
 
   @VisibleForTesting
@@ -644,16 +667,18 @@ public class DelegationTokenRenewer extends AbstractService {
    * removing failed DT
    */
   private void removeFailedDelegationToken(DelegationTokenToRenew t) {
-    ApplicationId applicationId = t.applicationId;
-    LOG.error("removing failed delegation token for appid=" + applicationId
-        + ";t=" + t.token.getService());
-    appTokens.get(applicationId).remove(t);
+    Collection<ApplicationId> applicationIds = t.referringAppIds;
+    synchronized (applicationIds) {
+      LOG.error("removing failed delegation token for appid=" + applicationIds
+          + ";t=" + t.token.getService());
+      for (ApplicationId applicationId : applicationIds) {
+        appTokens.get(applicationId).remove(t);
+      }
+    }
     allTokens.remove(t.token);
 
     // cancel the timer
-    if (t.timerTask != null) {
-      t.timerTask.cancel();
-    }
+    t.cancelTimer();
   }
 
   /**
@@ -706,9 +731,15 @@ public class DelegationTokenRenewer extends AbstractService {
                 + "; token=" + dttr.token.getService());
           }
 
+          // continue if the app list isn't empty
+          synchronized(dttr.referringAppIds) {
+            dttr.referringAppIds.remove(applicationId);
+            if (!dttr.referringAppIds.isEmpty()) {
+              continue;
+            }
+          }
           // cancel the timer
-          if (dttr.timerTask != null)
-            dttr.timerTask.cancel();
+          dttr.cancelTimer();
 
           // cancel the token
           cancelToken(dttr);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c591129/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
index 99a506a..bc9c295 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
@@ -89,6 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer.DelegationTokenToRenew;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -123,7 +124,7 @@ public class TestDelegationTokenRenewer {
       counter = 0;
       lastRenewed = null;
       tokenToRenewIn2Sec = null;
-
+      cancelled = false;
     }
 
     @Override
@@ -1046,4 +1047,88 @@ public class TestDelegationTokenRenewer {
     delegationTokenRenewer.obtainSystemTokensForUser(user, credentials);
     Assert.assertEquals(oldCounter, MyFS.getInstanceCounter());
   }
+  
+  // Test submitting an application with the token obtained by a previously
+  // submitted application that is set to be cancelled.  Token should be
+  // renewed while all apps are running, and then cancelled when all apps
+  // complete
+  @Test (timeout = 30000)
+  public void testCancelWithMultipleAppSubmissions() throws Exception{
+    MockRM rm = new TestSecurityMockRM(conf, null);
+    rm.start();
+    final MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+    nm1.registerNode();
+
+    //MyFS fs = (MyFS)FileSystem.get(conf);
+    //MyToken token1 = fs.getDelegationToken("user123");
+
+    // create Token1:
+    Text userText1 = new Text("user");
+    DelegationTokenIdentifier dtId1 =
+        new DelegationTokenIdentifier(userText1, new Text("renewer1"),
+          userText1);
+    final Token<DelegationTokenIdentifier> token1 =
+        new Token<DelegationTokenIdentifier>(dtId1.getBytes(),
+          "password1".getBytes(), dtId1.getKind(), new Text("service1"));
+
+    Credentials credentials = new Credentials();
+    credentials.addToken(token1.getService(), token1);
+
+    DelegationTokenRenewer renewer =
+        rm.getRMContext().getDelegationTokenRenewer();
+    Assert.assertTrue(renewer.getAllTokens().isEmpty());
+    Assert.assertFalse(Renewer.cancelled);
+
+    RMApp app1 =
+        rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
+          null, true, false, false, null, 0, null, true);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+    rm.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+
+    DelegationTokenToRenew dttr = renewer.getAllTokens().get(token1);
+    Assert.assertNotNull(dttr);
+    Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId()));
+    RMApp app2 =
+        rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
+          null, true, false, false, null, 0, null, true);
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
+    rm.waitForState(app2.getApplicationId(), RMAppState.RUNNING);
+    Assert.assertTrue(renewer.getAllTokens().containsKey(token1));
+    Assert.assertTrue(dttr.referringAppIds.contains(app2.getApplicationId()));
+    Assert.assertTrue(dttr.referringAppIds.contains(app2.getApplicationId()));
+    Assert.assertFalse(Renewer.cancelled);
+
+    MockRM.finishAMAndVerifyAppState(app2, rm, nm1, am2);
+    // app2 completes, app1 is still running, check the token is not cancelled
+    Assert.assertTrue(renewer.getAllTokens().containsKey(token1));
+    Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId()));
+    Assert.assertFalse(dttr.referringAppIds.contains(app2.getApplicationId()));
+    Assert.assertFalse(dttr.isTimerCancelled());
+    Assert.assertFalse(Renewer.cancelled);
+
+    RMApp app3 =
+        rm.submitApp(200, "name", "user", null, false, null, 2, credentials,
+          null, true, false, false, null, 0, null, true);
+    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm1);
+    rm.waitForState(app3.getApplicationId(), RMAppState.RUNNING);
+    Assert.assertTrue(renewer.getAllTokens().containsKey(token1));
+    Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId()));
+    Assert.assertTrue(dttr.referringAppIds.contains(app3.getApplicationId()));
+    Assert.assertFalse(dttr.isTimerCancelled());
+    Assert.assertFalse(Renewer.cancelled);
+
+    MockRM.finishAMAndVerifyAppState(app1, rm, nm1, am1);
+    Assert.assertTrue(renewer.getAllTokens().containsKey(token1));
+    Assert.assertFalse(dttr.referringAppIds.contains(app1.getApplicationId()));
+    Assert.assertTrue(dttr.referringAppIds.contains(app3.getApplicationId()));
+    Assert.assertFalse(dttr.isTimerCancelled());
+    Assert.assertFalse(Renewer.cancelled);
+
+    MockRM.finishAMAndVerifyAppState(app3, rm, nm1, am3);
+    Assert.assertFalse(renewer.getAllTokens().containsKey(token1));
+    Assert.assertTrue(dttr.referringAppIds.isEmpty());
+    Assert.assertTrue(dttr.isTimerCancelled());
+    Assert.assertTrue(Renewer.cancelled);
+  }
 }


Mime
View raw message