hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1612270 - in /hadoop/common/branches/HDFS-6584/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache...
Date Mon, 21 Jul 2014 13:48:05 GMT
Author: szetszwo
Date: Mon Jul 21 13:48:02 2014
New Revision: 1612270

URL: http://svn.apache.org/r1612270
Log:
Merge r1609845 through r1612268 from trunk.

Modified:
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
    hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt?rev=1612270&r1=1612269&r2=1612270&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/CHANGES.txt Mon Jul 21 13:48:02 2014
@@ -49,6 +49,11 @@ Release 2.6.0 - UNRELEASED
     YARN-1341. Recover NMTokens upon nodemanager restart. (Jason Lowe via 
     junping_du)
 
+    YARN-2208. AMRMTokenManager need to have a way to roll over AMRMToken. (xgong)
+
+    YARN-2323. FairShareComparator creates too many Resource objects (Hong Zhiguo
+    via Sandy Ryza)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -69,6 +74,9 @@ Release 2.6.0 - UNRELEASED
     after RM recovery but before scheduler learns about apps and app-attempts.
     (Jian He via vinodkv)
 
+    YARN-2244. FairScheduler missing handling of containers for unknown 
+    application attempts. (Anubhav Dhoot via kasha)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java?rev=1612270&r1=1612269&r2=1612270&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenIdentifier.java Mon Jul 21 13:48:02 2014
@@ -44,6 +44,7 @@ public class AMRMTokenIdentifier extends
   public static final Text KIND_NAME = new Text("YARN_AM_RM_TOKEN");
 
   private ApplicationAttemptId applicationAttemptId;
+  private int keyId = Integer.MIN_VALUE;
 
   public AMRMTokenIdentifier() {
   }
@@ -53,6 +54,13 @@ public class AMRMTokenIdentifier extends
     this.applicationAttemptId = appAttemptId;
   }
 
+  public AMRMTokenIdentifier(ApplicationAttemptId appAttemptId,
+      int masterKeyId) {
+    this();
+    this.applicationAttemptId = appAttemptId;
+    this.keyId = masterKeyId;
+  }
+
   @Private
   public ApplicationAttemptId getApplicationAttemptId() {
     return this.applicationAttemptId;
@@ -64,6 +72,7 @@ public class AMRMTokenIdentifier extends
     out.writeLong(appId.getClusterTimestamp());
     out.writeInt(appId.getId());
     out.writeInt(this.applicationAttemptId.getAttemptId());
+    out.writeInt(this.keyId);
   }
 
   @Override
@@ -75,6 +84,7 @@ public class AMRMTokenIdentifier extends
         ApplicationId.newInstance(clusterTimeStamp, appId);
     this.applicationAttemptId =
         ApplicationAttemptId.newInstance(applicationId, attemptId);
+    this.keyId = in.readInt();
   }
 
   @Override
@@ -92,6 +102,10 @@ public class AMRMTokenIdentifier extends
         .toString());
   }
 
+  public int getKeyId() {
+    return this.keyId;
+  }
+
   // TODO: Needed?
   @InterfaceAudience.Private
   public static class Renewer extends Token.TrivialRenewer {

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1612270&r1=1612269&r2=1612270&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Mon Jul 21 13:48:02 2014
@@ -774,11 +774,9 @@ public class RMAppAttemptImpl implements
       }
 
       // create AMRMToken
-      AMRMTokenIdentifier id =
-          new AMRMTokenIdentifier(appAttempt.applicationAttemptId);
       appAttempt.amrmToken =
-          new Token<AMRMTokenIdentifier>(id,
-            appAttempt.rmContext.getAMRMTokenSecretManager());
+          appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+            appAttempt.applicationAttemptId);
 
       // Add the applicationAttempt to the scheduler and inform the scheduler
       // whether to transfer the state from previous attempt.

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1612270&r1=1612269&r2=1612270&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Mon Jul 21 13:48:02 2014
@@ -123,6 +123,23 @@ public abstract class AbstractYarnSchedu
     return maximumAllocation;
   }
 
+  protected void containerLaunchedOnNode(ContainerId containerId,
+                                         SchedulerNode node) {
+    // Get the application for the finished container
+    SchedulerApplicationAttempt application = getCurrentAttemptForContainer
+        (containerId);
+    if (application == null) {
+      LOG.info("Unknown application "
+          + containerId.getApplicationAttemptId().getApplicationId()
+          + " launched container " + containerId + " on node: " + node);
+      this.rmContext.getDispatcher().getEventHandler()
+        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+      return;
+    }
+
+    application.containerLaunchedOnNode(containerId, node.getNodeID());
+  }
+
   public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
     SchedulerApplication<T> app =
         applications.get(applicationAttemptId.getApplicationId());

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1612270&r1=1612269&r2=1612270&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Mon Jul 21 13:48:02 2014
@@ -69,7 +69,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -866,21 +865,6 @@ public class CapacityScheduler extends
   
   }
 
-  private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
-    // Get the application for the finished container
-    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
-      this.rmContext.getDispatcher().getEventHandler()
-        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
-      return;
-    }
-    
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
-  }
-
   @Override
   public void handle(SchedulerEvent event) {
     switch(event.getType()) {

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1612270&r1=1612269&r2=1612270&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Mon Jul 21 13:48:02 2014
@@ -929,22 +929,6 @@ public class FairScheduler extends
   }
 
   /**
-   * Process a container which has launched on a node, as reported by the node.
-   */
-  private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
-    // Get the application for the finished container
-    FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
-      return;
-    }
-
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
-  }
-
-  /**
    * Process a heartbeat update from a node.
    */
   private synchronized void nodeUpdate(RMNode nm) {

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1612270&r1=1612269&r2=1612270&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Mon Jul 21 13:48:02 2014
@@ -65,6 +65,7 @@ public class FairSharePolicy extends Sch
   private static class FairShareComparator implements Comparator<Schedulable>,
       Serializable {
     private static final long serialVersionUID = 5564969375856699313L;
+    private static final Resource ONE = Resources.createResource(1);
 
     @Override
     public int compare(Schedulable s1, Schedulable s2) {
@@ -78,11 +79,10 @@ public class FairSharePolicy extends Sch
           s1.getResourceUsage(), minShare1);
       boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
           s2.getResourceUsage(), minShare2);
-      Resource one = Resources.createResource(1);
       minShareRatio1 = (double) s1.getResourceUsage().getMemory()
-          / Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory();
+          / Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory();
       minShareRatio2 = (double) s2.getResourceUsage().getMemory()
-          / Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory();
+          / Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory();
       useToWeightRatio1 = s1.getResourceUsage().getMemory() /
           s1.getWeights().getWeight(ResourceType.MEMORY);
       useToWeightRatio2 = s2.getResourceUsage().getMemory() /

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1612270&r1=1612269&r2=1612270&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Mon Jul 21 13:48:02 2014
@@ -66,7 +66,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@@ -831,23 +830,6 @@ public class FifoScheduler extends
     }
   }
 
-  private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
-    // Get the application for the finished container
-    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
-      // Some unknown container sneaked into the system. Kill it.
-      this.rmContext.getDispatcher().getEventHandler()
-        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
-
-      return;
-    }
-    
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
-  }
-
   @Lock(FifoScheduler.class)
   private synchronized void containerCompleted(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java?rev=1612270&r1=1612269&r2=1612270&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java Mon Jul 21 13:48:02 2014
@@ -19,22 +19,28 @@
 package org.apache.hadoop.yarn.server.resourcemanager.security;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.security.SecureRandom;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-
-import javax.crypto.SecretKey;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * AMRM-tokens are per ApplicationAttempt. If users redistribute their
@@ -49,40 +55,66 @@ public class AMRMTokenSecretManager exte
   private static final Log LOG = LogFactory
     .getLog(AMRMTokenSecretManager.class);
 
-  private SecretKey masterKey;
+  private int serialNo = new SecureRandom().nextInt();
+  private MasterKeyData nextMasterKey;
+  private MasterKeyData currentMasterKey;
+
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
   private final Timer timer;
   private final long rollingInterval;
+  private final long activationDelay;
 
-  private final Map<ApplicationAttemptId, byte[]> passwords =
-      new HashMap<ApplicationAttemptId, byte[]>();
+  private final Set<ApplicationAttemptId> appAttemptSet =
+      new HashSet<ApplicationAttemptId>();
 
   /**
    * Create an {@link AMRMTokenSecretManager}
    */
   public AMRMTokenSecretManager(Configuration conf) {
-    rollMasterKey();
     this.timer = new Timer();
     this.rollingInterval =
         conf
           .getLong(
             YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
             YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
+    // Adding delay = 1.5 * expiry interval makes sure that all active AMs get
+    // the updated shared-key.
+    this.activationDelay =
+        (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+            YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5);
+    LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval
+        + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay + " ms");
+    if (rollingInterval <= activationDelay * 2) {
+      throw new IllegalArgumentException(
+          YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS
+              + " should be more than 2 X "
+              + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS);
+    }
   }
 
   public void start() {
-    this.timer.scheduleAtFixedRate(new MasterKeyRoller(), 0, rollingInterval);
+    if (this.currentMasterKey == null) {
+      this.currentMasterKey = createNewMasterKey();
+    }
+    this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
+      rollingInterval);
   }
 
   public void stop() {
     this.timer.cancel();
   }
 
-  public synchronized void applicationMasterFinished(
-      ApplicationAttemptId appAttemptId) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Application finished, removing password for " + appAttemptId);
+  public void applicationMasterFinished(ApplicationAttemptId appAttemptId) {
+    this.writeLock.lock();
+    try {
+      LOG.info("Application finished, removing password for " + appAttemptId);
+      this.appAttemptSet.remove(appAttemptId);
+    } finally {
+      this.writeLock.unlock();
     }
-    this.passwords.remove(appAttemptId);
   }
 
   private class MasterKeyRoller extends TimerTask {
@@ -93,49 +125,89 @@ public class AMRMTokenSecretManager exte
   }
 
   @Private
-  public synchronized void setMasterKey(SecretKey masterKey) {
-    this.masterKey = masterKey;
+  void rollMasterKey() {
+    this.writeLock.lock();
+    try {
+      LOG.info("Rolling master-key for amrm-tokens");
+      this.nextMasterKey = createNewMasterKey();
+      this.timer.schedule(new NextKeyActivator(), this.activationDelay);
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
-  @Private
-  public synchronized SecretKey getMasterKey() {
-    return this.masterKey;
+  private class NextKeyActivator extends TimerTask {
+    @Override
+    public void run() {
+      activateNextMasterKey();
+    }
+  }
+
+  public void activateNextMasterKey() {
+    this.writeLock.lock();
+    try {
+      LOG.info("Activating next master key with id: "
+          + this.nextMasterKey.getMasterKey().getKeyId());
+      this.currentMasterKey = this.nextMasterKey;
+      this.nextMasterKey = null;
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
   @Private
-  synchronized void rollMasterKey() {
-    LOG.info("Rolling master-key for amrm-tokens");
-    this.masterKey = generateSecret();
+  @VisibleForTesting
+  public MasterKeyData createNewMasterKey() {
+    this.writeLock.lock();
+    try {
+      return new MasterKeyData(serialNo++, generateSecret());
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
-  /**
-   * Create a password for a given {@link AMRMTokenIdentifier}. Used to
-   * send to the AppicationAttempt which can give it back during authentication.
-   */
-  @Override
-  public synchronized byte[] createPassword(
-      AMRMTokenIdentifier identifier) {
-    ApplicationAttemptId applicationAttemptId =
-        identifier.getApplicationAttemptId();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Creating password for " + applicationAttemptId);
-    }
-    byte[] password = createPassword(identifier.getBytes(), masterKey);
-    this.passwords.put(applicationAttemptId, password);
-    return password;
+  public Token<AMRMTokenIdentifier> createAndGetAMRMToken(
+      ApplicationAttemptId appAttemptId) {
+    this.writeLock.lock();
+    try {
+      LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);
+      AMRMTokenIdentifier identifier =
+          new AMRMTokenIdentifier(appAttemptId, getMasterKey().getMasterKey()
+            .getKeyId());
+      byte[] password = this.createPassword(identifier);
+      appAttemptSet.add(appAttemptId);
+      return new Token<AMRMTokenIdentifier>(identifier.getBytes(), password,
+        identifier.getKind(), new Text());
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  // If nextMasterKey is not Null, then return nextMasterKey
+  // otherwise return currentMasterKey
+  @VisibleForTesting
+  public MasterKeyData getMasterKey() {
+    this.readLock.lock();
+    try {
+      return nextMasterKey == null ? currentMasterKey : nextMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   /**
    * Populate persisted password of AMRMToken back to AMRMTokenSecretManager.
    */
-  public synchronized void
-      addPersistedPassword(Token<AMRMTokenIdentifier> token) throws IOException {
-    AMRMTokenIdentifier identifier = token.decodeIdentifier();
-    if (LOG.isDebugEnabled()) {
+  public void addPersistedPassword(Token<AMRMTokenIdentifier> token)
+      throws IOException {
+    this.writeLock.lock();
+    try {
+      AMRMTokenIdentifier identifier = token.decodeIdentifier();
       LOG.debug("Adding password for " + identifier.getApplicationAttemptId());
+      appAttemptSet.add(identifier.getApplicationAttemptId());
+    } finally {
+      this.writeLock.unlock();
     }
-    this.passwords.put(identifier.getApplicationAttemptId(),
-      token.getPassword());
   }
 
   /**
@@ -143,19 +215,35 @@ public class AMRMTokenSecretManager exte
    * Used by RPC layer to validate a remote {@link AMRMTokenIdentifier}.
    */
   @Override
-  public synchronized byte[] retrievePassword(
-      AMRMTokenIdentifier identifier) throws InvalidToken {
-    ApplicationAttemptId applicationAttemptId =
-        identifier.getApplicationAttemptId();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Trying to retrieve password for " + applicationAttemptId);
-    }
-    byte[] password = this.passwords.get(applicationAttemptId);
-    if (password == null) {
-      throw new InvalidToken("Password not found for ApplicationAttempt "
-          + applicationAttemptId);
+  public byte[] retrievePassword(AMRMTokenIdentifier identifier)
+      throws InvalidToken {
+    this.readLock.lock();
+    try {
+      ApplicationAttemptId applicationAttemptId =
+          identifier.getApplicationAttemptId();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Trying to retrieve password for " + applicationAttemptId);
+      }
+      if (!appAttemptSet.contains(applicationAttemptId)) {
+        throw new InvalidToken("Password not found for ApplicationAttempt "
+            + applicationAttemptId);
+      }
+      if (identifier.getKeyId() == this.currentMasterKey.getMasterKey()
+        .getKeyId()) {
+        return createPassword(identifier.getBytes(),
+          this.currentMasterKey.getSecretKey());
+      } else if (nextMasterKey != null
+          && identifier.getKeyId() == this.nextMasterKey.getMasterKey()
+            .getKeyId()) {
+        return createPassword(identifier.getBytes(),
+          this.nextMasterKey.getSecretKey());
+      }
+      throw new InvalidToken("Given AMRMToken for application : "
+          + applicationAttemptId.toString()
+          + " seems to have been generated illegally.");
+    } finally {
+      this.readLock.unlock();
     }
-    return password;
   }
 
   /**
@@ -167,4 +255,40 @@ public class AMRMTokenSecretManager exte
     return new AMRMTokenIdentifier();
   }
 
+  @Private
+  @VisibleForTesting
+  public MasterKeyData getCurrnetMasterKeyData() {
+    this.readLock.lock();
+    try {
+      return this.currentMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Private
+  @VisibleForTesting
+  public MasterKeyData getNextMasterKeyData() {
+    this.readLock.lock();
+    try {
+      return this.nextMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  @Private
+  protected byte[] createPassword(AMRMTokenIdentifier identifier) {
+    this.readLock.lock();
+    try {
+      ApplicationAttemptId applicationAttemptId =
+          identifier.getApplicationAttemptId();
+      LOG.info("Creating password for " + applicationAttemptId);
+      return createPassword(identifier.getBytes(), getMasterKey()
+        .getSecretKey());
+    } finally {
+      this.readLock.unlock();
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1612270&r1=1612269&r2=1612270&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java Mon Jul 21 13:48:02 2014
@@ -232,20 +232,7 @@ public class TestApplicationCleanup {
     containerStatuses.put(app.getApplicationId(), containerStatusList);
 
     NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
-    dispatcher.await();
-    List<ContainerId> contsToClean = resp.getContainersToCleanup();
-    int cleanedConts = contsToClean.size();
-    waitCount = 0;
-    while (cleanedConts < 1 && waitCount++ < 200) {
-      LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
-      Thread.sleep(100);
-      resp = nm1.nodeHeartbeat(true);
-      dispatcher.await();
-      contsToClean = resp.getContainersToCleanup();
-      cleanedConts += contsToClean.size();
-    }
-    LOG.info("Got cleanup for " + contsToClean.get(0));
-    Assert.assertEquals(1, cleanedConts);
+    waitForContainerCleanup(dispatcher, nm1, resp);
 
     // Now to test the case when RM already gave cleanup, and NM suddenly
     // realizes that the container is running.
@@ -258,26 +245,36 @@ public class TestApplicationCleanup {
     containerStatuses.put(app.getApplicationId(), containerStatusList);
 
     resp = nm1.nodeHeartbeat(containerStatuses, true);
-    dispatcher.await();
-    contsToClean = resp.getContainersToCleanup();
-    cleanedConts = contsToClean.size();
     // The cleanup list won't be instantaneous as it is given out by scheduler
     // and not RMNodeImpl.
-    waitCount = 0;
-    while (cleanedConts < 1 && waitCount++ < 200) {
-      LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
-      Thread.sleep(100);
-      resp = nm1.nodeHeartbeat(true);
+    waitForContainerCleanup(dispatcher, nm1, resp);
+
+    rm.stop();
+  }
+
+  protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm,
+      NodeHeartbeatResponse resp) throws Exception {
+    int waitCount = 0, cleanedConts = 0;
+    List<ContainerId> contsToClean;
+    do {
       dispatcher.await();
       contsToClean = resp.getContainersToCleanup();
       cleanedConts += contsToClean.size();
+      if (cleanedConts >= 1) {
+        break;
+      }
+      Thread.sleep(100);
+      resp = nm.nodeHeartbeat(true);
+    } while(waitCount++ < 200);
+
+    if (contsToClean.isEmpty()) {
+      LOG.error("Failed to get any containers to cleanup");
+    } else {
+      LOG.info("Got cleanup for " + contsToClean.get(0));
     }
-    LOG.info("Got cleanup for " + contsToClean.get(0));
     Assert.assertEquals(1, cleanedConts);
-
-    rm.stop();
   }
-  
+
   private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId)
       throws Exception {
     while (true) {
@@ -400,6 +397,58 @@ public class TestApplicationCleanup {
     rm2.stop();
   }
 
+  @SuppressWarnings("resource")
+  @Test (timeout = 60000)
+  public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws
+      Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // start RM
+    final DrainDispatcher dispatcher = new DrainDispatcher();
+    MockRM rm1 = new MockRM(conf, memStore) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = launchAM(app0, rm1, nm1);
+    nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING);
+    rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
+
+    // start new RM
+    final DrainDispatcher dispatcher2 = new DrainDispatcher();
+    MockRM rm2 = new MockRM(conf, memStore) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher2;
+      }
+    };
+    rm2.start();
+
+    // nm1 register to rm2, and do a heartbeat
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    nm1.registerNode(Arrays.asList(app0.getApplicationId()));
+    rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
+
+    // Add unknown container for application unknown to scheduler
+    NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0
+        .getApplicationAttemptId(), 2, ContainerState.RUNNING);
+
+    waitForContainerCleanup(dispatcher2, nm1, response);
+
+    rm1.stop();
+    rm2.stop();
+  }
+
   public static void main(String[] args) throws Exception {
     TestApplicationCleanup t = new TestApplicationCleanup();
     t.testAppCleanup();

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1612270&r1=1612269&r2=1612270&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Mon Jul 21 13:48:02 2014
@@ -1250,10 +1250,11 @@ public class TestRMRestart {
             .getEncoded());
 
     // assert AMRMTokenSecretManager also knows about the AMRMToken password
-    Token<AMRMTokenIdentifier> amrmToken = loadedAttempt1.getAMRMToken();
-    Assert.assertArrayEquals(amrmToken.getPassword(),
-      rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword(
-        amrmToken.decodeIdentifier()));
+    // TODO: fix this on YARN-2211
+//    Token<AMRMTokenIdentifier> amrmToken = loadedAttempt1.getAMRMToken();
+//    Assert.assertArrayEquals(amrmToken.getPassword(),
+//      rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword(
+//        amrmToken.decodeIdentifier()));
     rm1.stop();
     rm2.stop();
   }

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java?rev=1612270&r1=1612269&r2=1612270&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java Mon Jul 21 13:48:02 2014
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -34,7 +35,6 @@ import java.util.Map;
 import javax.crypto.SecretKey;
 
 import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 public class RMStateStoreTestBase extends ClientBaseWithFixes{
@@ -175,8 +176,11 @@ public class RMStateStoreTestBase extend
     TestDispatcher dispatcher = new TestDispatcher();
     store.setRMDispatcher(dispatcher);
 
-    AMRMTokenSecretManager appTokenMgr =
-        new AMRMTokenSecretManager(conf);
+    AMRMTokenSecretManager appTokenMgr = spy(
+        new AMRMTokenSecretManager(conf));
+    MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
+    when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);
+
     ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
         new ClientToAMTokenSecretManagerInRM();
 
@@ -455,10 +459,8 @@ public class RMStateStoreTestBase extend
   private Token<AMRMTokenIdentifier> generateAMRMToken(
       ApplicationAttemptId attemptId,
       AMRMTokenSecretManager appTokenMgr) {
-    AMRMTokenIdentifier appTokenId =
-        new AMRMTokenIdentifier(attemptId);
     Token<AMRMTokenIdentifier> appToken =
-        new Token<AMRMTokenIdentifier>(appTokenId, appTokenMgr);
+        appTokenMgr.createAndGetAMRMToken(attemptId);
     appToken.setService(new Text("appToken service"));
     return appToken;
   }

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1612270&r1=1612269&r2=1612270&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Mon Jul 21 13:48:02 2014
@@ -97,6 +97,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@@ -224,6 +225,8 @@ public class TestRMAppAttemptTransitions
     amLivelinessMonitor = mock(AMLivelinessMonitor.class);
     amFinishingMonitor = mock(AMLivelinessMonitor.class);
     writer = mock(RMApplicationHistoryWriter.class);
+    MasterKeyData masterKeyData = amRMTokenManager.createNewMasterKey();
+    when(amRMTokenManager.getMasterKey()).thenReturn(masterKeyData);
     rmContext =
         new RMContextImpl(rmDispatcher,
           containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,

Modified: hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java?rev=1612270&r1=1612269&r2=1612270&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java (original)
+++ hadoop/common/branches/HDFS-6584/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java Mon Jul 21 13:48:02 2014
@@ -23,13 +23,12 @@ import java.security.PrivilegedAction;
 import java.util.Arrays;
 import java.util.Collection;
 
-import javax.crypto.SecretKey;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -41,7 +40,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
@@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
@@ -65,6 +67,8 @@ public class TestAMRMTokens {
 
   private final Configuration conf;
   private static final int maxWaitAttempts = 50;
+  private static final int rolling_interval_sec = 13;
+  private static final long am_expire_ms = 4000;
 
   @Parameters
   public static Collection<Object[]> configs() {
@@ -201,15 +205,22 @@ public class TestAMRMTokens {
   @Test
   public void testMasterKeyRollOver() throws Exception {
 
+    conf.setLong(
+      YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
+      rolling_interval_sec);
+    conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms);
     MyContainerManager containerManager = new MyContainerManager();
     final MockRMWithAMS rm =
         new MockRMWithAMS(conf, containerManager);
     rm.start();
-
+    Long startTime = System.currentTimeMillis();
     final Configuration conf = rm.getConfig();
     final YarnRPC rpc = YarnRPC.create(conf);
     ApplicationMasterProtocol rmClient = null;
-
+    AMRMTokenSecretManager appTokenSecretManager =
+        rm.getRMContext().getAMRMTokenSecretManager();
+    MasterKeyData oldKey = appTokenSecretManager.getMasterKey();
+    Assert.assertNotNull(oldKey);
     try {
       MockNM nm1 = rm.registerNode("localhost:1234", 5120);
 
@@ -218,7 +229,7 @@ public class TestAMRMTokens {
       nm1.nodeHeartbeat(true);
 
       int waitCount = 0;
-      while (containerManager.containerTokens == null && waitCount++ < 20) {
+      while (containerManager.containerTokens == null && waitCount++ < maxWaitAttempts) {
         LOG.info("Waiting for AM Launch to happen..");
         Thread.sleep(1000);
       }
@@ -250,21 +261,65 @@ public class TestAMRMTokens {
       Assert.assertTrue(
           rmClient.allocate(allocateRequest).getAMCommand() == null);
 
-      // Simulate a master-key-roll-over
-      AMRMTokenSecretManager appTokenSecretManager =
-          rm.getRMContext().getAMRMTokenSecretManager();
-      SecretKey oldKey = appTokenSecretManager.getMasterKey();
-      appTokenSecretManager.rollMasterKey();
-      SecretKey newKey = appTokenSecretManager.getMasterKey();
+      // Wait for enough time and make sure the roll_over happens
+      // At mean time, the old AMRMToken should continue to work
+      while(System.currentTimeMillis() - startTime < rolling_interval_sec*1000) {
+        rmClient.allocate(allocateRequest);
+        Thread.sleep(500);
+      }
+
+      MasterKeyData newKey = appTokenSecretManager.getMasterKey();
+      Assert.assertNotNull(newKey);
       Assert.assertFalse("Master key should have changed!",
         oldKey.equals(newKey));
 
+      // Another allocate call with old AMRMToken. Should continue to work.
+      rpc.stopProxy(rmClient, conf); // To avoid using cached client
+      rmClient = createRMClient(rm, conf, rpc, currentUser);
+      Assert
+        .assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null);
+
+      waitCount = 0;
+      while(waitCount++ <= maxWaitAttempts) {
+        if (appTokenSecretManager.getCurrnetMasterKeyData() != oldKey) {
+          break;
+        }
+        try {
+          rmClient.allocate(allocateRequest);
+        } catch (Exception ex) {
+          break;
+        }
+        Thread.sleep(200);
+      }
+      // active the nextMasterKey, and replace the currentMasterKey
+      Assert.assertTrue(appTokenSecretManager.getCurrnetMasterKeyData().equals(newKey));
+      Assert.assertTrue(appTokenSecretManager.getMasterKey().equals(newKey));
+      Assert.assertTrue(appTokenSecretManager.getNextMasterKeyData() == null);
+
+      // Create a new Token
+      Token<AMRMTokenIdentifier> newToken =
+          appTokenSecretManager.createAndGetAMRMToken(applicationAttemptId);
+      SecurityUtil.setTokenService(newToken, rmBindAddress);
+      currentUser.addToken(newToken);
       // Another allocate call. Should continue to work.
       rpc.stopProxy(rmClient, conf); // To avoid using cached client
       rmClient = createRMClient(rm, conf, rpc, currentUser);
       allocateRequest = Records.newRecord(AllocateRequest.class);
-      Assert.assertTrue(
-          rmClient.allocate(allocateRequest).getAMCommand() == null);
+      Assert
+        .assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null);
+
+      // Should not work by using the old AMRMToken.
+      rpc.stopProxy(rmClient, conf); // To avoid using cached client
+      try {
+        currentUser.addToken(amRMToken);
+        rmClient = createRMClient(rm, conf, rpc, currentUser);
+        allocateRequest = Records.newRecord(AllocateRequest.class);
+        Assert
+          .assertTrue(rmClient.allocate(allocateRequest).getAMCommand() == null);
+        Assert.fail("The old Token should not work");
+      } catch (Exception ex) {
+        // expect exception
+      }
     } finally {
       rm.stop();
       if (rmClient != null) {



Mime
View raw message