hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject svn commit: r1612403 [2/3] - in /hadoop/common/branches/fs-encryption/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/...
Date Mon, 21 Jul 2014 21:45:00 GMT
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Mon Jul 21 21:44:50 2014
@@ -69,7 +69,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -521,7 +520,7 @@ public class CapacityScheduler extends
   }
 
   private synchronized void addApplication(ApplicationId applicationId,
-      String queueName, String user) {
+      String queueName, String user, boolean isAppRecovering) {
     // santiy checks.
     CSQueue queue = getQueue(queueName);
     if (queue == null) {
@@ -553,14 +552,20 @@ public class CapacityScheduler extends
     applications.put(applicationId, application);
     LOG.info("Accepted application " + applicationId + " from user: " + user
         + ", in queue: " + queueName);
-    rmContext.getDispatcher().getEventHandler()
+    if (isAppRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    }
   }
 
   private synchronized void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt,
-      boolean shouldNotifyAttemptAdded) {
+      boolean isAttemptRecovering) {
     SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
     CSQueue queue = (CSQueue) application.getQueue();
@@ -578,14 +583,15 @@ public class CapacityScheduler extends
     LOG.info("Added Application Attempt " + applicationAttemptId
         + " to scheduler from user " + application.getUser() + " in queue "
         + queue.getQueueName());
-    if (shouldNotifyAttemptAdded) {
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptEvent(applicationAttemptId,
-              RMAppAttemptEventType.ATTEMPT_ADDED));
-    } else {
+    if (isAttemptRecovering) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+        LOG.debug(applicationAttemptId
+            + " is recovering. Skipping notifying ATTEMPT_ADDED");
       }
+    } else {
+      rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptEvent(applicationAttemptId,
+            RMAppAttemptEventType.ATTEMPT_ADDED));
     }
   }
 
@@ -859,21 +865,6 @@ public class CapacityScheduler extends
   
   }
 
-  private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
-    // Get the application for the finished container
-    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
-      this.rmContext.getDispatcher().getEventHandler()
-        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
-      return;
-    }
-    
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
-  }
-
   @Override
   public void handle(SchedulerEvent event) {
     switch(event.getType()) {
@@ -905,7 +896,8 @@ public class CapacityScheduler extends
     {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
+        appAddedEvent.getQueue(), appAddedEvent.getUser(),
+        appAddedEvent.getIsAppRecovering());
     }
     break;
     case APP_REMOVED:
@@ -921,7 +913,7 @@ public class CapacityScheduler extends
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
         appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
-        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
+        appAttemptAddedEvent.getIsAttemptRecovering());
     }
     break;
     case APP_ATTEMPT_REMOVED:
@@ -1089,6 +1081,7 @@ public class CapacityScheduler extends
     if (LOG.isDebugEnabled()) {
       LOG.debug("KILL_CONTAINER: container" + cont.toString());
     }
+    recoverResourceRequestForContainer(cont);
     completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus(
       cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER),
       RMContainerEventType.KILL);

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Mon Jul 21 21:44:50 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -77,6 +78,9 @@ public class FiCaSchedulerApp extends Sc
     if (null == liveContainers.remove(rmContainer.getContainerId())) {
       return false;
     }
+    
+    // Remove from the list of newly allocated containers if found
+    newlyAllocatedContainers.remove(rmContainer);
 
     Container container = rmContainer.getContainer();
     ContainerId containerId = container.getId();
@@ -129,8 +133,12 @@ public class FiCaSchedulerApp extends Sc
     liveContainers.put(container.getId(), rmContainer);    
 
     // Update consumption and track allocations
-    appSchedulingInfo.allocate(type, node, priority, request, container);
+    List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+        type, node, priority, request, container);
     Resources.addTo(currentConsumption, container.getResource());
+    
+    // Update resource requests related to "request" and store in RMContainer 
+    ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
 
     // Inform the container
     rmContainer.handle(

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java Mon Jul 21 21:44:50 2014
@@ -25,13 +25,20 @@ public class AppAddedSchedulerEvent exte
   private final ApplicationId applicationId;
   private final String queue;
   private final String user;
+  private final boolean isAppRecovering;
 
   public AppAddedSchedulerEvent(
       ApplicationId applicationId, String queue, String user) {
+    this(applicationId, queue, user, false);
+  }
+
+  public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
+      String user, boolean isAppRecovering) {
     super(SchedulerEventType.APP_ADDED);
     this.applicationId = applicationId;
     this.queue = queue;
     this.user = user;
+    this.isAppRecovering = isAppRecovering;
   }
 
   public ApplicationId getApplicationId() {
@@ -46,4 +53,7 @@ public class AppAddedSchedulerEvent exte
     return user;
   }
 
+  public boolean getIsAppRecovering() {
+    return isAppRecovering;
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java Mon Jul 21 21:44:50 2014
@@ -24,22 +24,22 @@ public class AppAttemptAddedSchedulerEve
 
   private final ApplicationAttemptId applicationAttemptId;
   private final boolean transferStateFromPreviousAttempt;
-  private final boolean shouldNotifyAttemptAdded;
+  private final boolean isAttemptRecovering;
 
   public AppAttemptAddedSchedulerEvent(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt) {
-    this(applicationAttemptId, transferStateFromPreviousAttempt, true);
+    this(applicationAttemptId, transferStateFromPreviousAttempt, false);
   }
 
   public AppAttemptAddedSchedulerEvent(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt,
-      boolean shouldNotifyAttemptAdded) {
+      boolean isAttemptRecovering) {
     super(SchedulerEventType.APP_ATTEMPT_ADDED);
     this.applicationAttemptId = applicationAttemptId;
     this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
-    this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded;
+    this.isAttemptRecovering = isAttemptRecovering;
   }
 
   public ApplicationAttemptId getApplicationAttemptId() {
@@ -50,7 +50,7 @@ public class AppAttemptAddedSchedulerEve
     return transferStateFromPreviousAttempt;
   }
 
-  public boolean getShouldNotifyAttemptAdded() {
-    return shouldNotifyAttemptAdded;
+  public boolean getIsAttemptRecovering() {
+    return isAttemptRecovering;
   }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Mon Jul 21 21:44:50 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -82,6 +83,9 @@ public class FSSchedulerApp extends Sche
     Container container = rmContainer.getContainer();
     ContainerId containerId = container.getId();
     
+    // Remove from the list of newly allocated containers if found
+    newlyAllocatedContainers.remove(rmContainer);
+    
     // Inform the container
     rmContainer.handle(
         new RMContainerFinishedEvent(
@@ -281,9 +285,13 @@ public class FSSchedulerApp extends Sche
     liveContainers.put(container.getId(), rmContainer);    
 
     // Update consumption and track allocations
-    appSchedulingInfo.allocate(type, node, priority, request, container);
+    List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+        type, node, priority, request, container);
     Resources.addTo(currentConsumption, container.getResource());
 
+    // Update resource requests related to "request" and store in RMContainer
+    ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
+
     // Inform the container
     rmContainer.handle(
         new RMContainerEvent(container.getId(), RMContainerEventType.START));

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Mon Jul 21 21:44:50 2014
@@ -422,7 +422,7 @@ public class FairScheduler extends
     }
   }
   
-  private void warnOrKillContainer(RMContainer container) {
+  protected void warnOrKillContainer(RMContainer container) {
     ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
     FSSchedulerApp app = getSchedulerApp(appAttemptId);
     FSLeafQueue queue = app.getQueue();
@@ -440,6 +440,7 @@ public class FairScheduler extends
           SchedulerUtils.createPreemptedContainerStatus(
             container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
 
+        recoverResourceRequestForContainer(container);
         // TODO: Not sure if this ever actually adds this to the list of cleanup
         // containers on the RMNode (see SchedulerNode.releaseContainer()).
         completedContainer(container, status, RMContainerEventType.KILL);
@@ -565,7 +566,7 @@ public class FairScheduler extends
    * configured limits, but the app will not be marked as runnable.
    */
   protected synchronized void addApplication(ApplicationId applicationId,
-      String queueName, String user) {
+      String queueName, String user, boolean isAppRecovering) {
     if (queueName == null || queueName.isEmpty()) {
       String message = "Reject application " + applicationId +
               " submitted by user " + user + " with an empty queue name.";
@@ -602,8 +603,14 @@ public class FairScheduler extends
     LOG.info("Accepted application " + applicationId + " from user: " + user
         + ", in queue: " + queueName + ", currently num of applications: "
         + applications.size());
-    rmContext.getDispatcher().getEventHandler()
+    if (isAppRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    }
   }
 
   /**
@@ -612,7 +619,7 @@ public class FairScheduler extends
   protected synchronized void addApplicationAttempt(
       ApplicationAttemptId applicationAttemptId,
       boolean transferStateFromPreviousAttempt,
-      boolean shouldNotifyAttemptAdded) {
+      boolean isAttemptRecovering) {
     SchedulerApplication<FSSchedulerApp> application =
         applications.get(applicationAttemptId.getApplicationId());
     String user = application.getUser();
@@ -641,14 +648,15 @@ public class FairScheduler extends
     LOG.info("Added Application Attempt " + applicationAttemptId
         + " to scheduler from user: " + user);
 
-    if (shouldNotifyAttemptAdded) {
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptEvent(applicationAttemptId,
-              RMAppAttemptEventType.ATTEMPT_ADDED));
-    } else {
+    if (isAttemptRecovering) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+        LOG.debug(applicationAttemptId
+            + " is recovering. Skipping notifying ATTEMPT_ADDED");
       }
+    } else {
+      rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptEvent(applicationAttemptId,
+            RMAppAttemptEventType.ATTEMPT_ADDED));
     }
   }
 
@@ -921,22 +929,6 @@ public class FairScheduler extends
   }
 
   /**
-   * Process a container which has launched on a node, as reported by the node.
-   */
-  private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
-    // Get the application for the finished container
-    FSSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
-      return;
-    }
-
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
-  }
-
-  /**
    * Process a heartbeat update from a node.
    */
   private synchronized void nodeUpdate(RMNode nm) {
@@ -1135,7 +1127,8 @@ public class FairScheduler extends
       }
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
+        appAddedEvent.getQueue(), appAddedEvent.getUser(),
+        appAddedEvent.getIsAppRecovering());
       break;
     case APP_REMOVED:
       if (!(event instanceof AppRemovedSchedulerEvent)) {
@@ -1153,7 +1146,7 @@ public class FairScheduler extends
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
         appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
-        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
+        appAttemptAddedEvent.getIsAttemptRecovering());
       break;
     case APP_ATTEMPT_REMOVED:
       if (!(event instanceof AppAttemptRemovedSchedulerEvent)) {

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Mon Jul 21 21:44:50 2014
@@ -65,6 +65,7 @@ public class FairSharePolicy extends Sch
   private static class FairShareComparator implements Comparator<Schedulable>,
       Serializable {
     private static final long serialVersionUID = 5564969375856699313L;
+    private static final Resource ONE = Resources.createResource(1);
 
     @Override
     public int compare(Schedulable s1, Schedulable s2) {
@@ -78,11 +79,10 @@ public class FairSharePolicy extends Sch
           s1.getResourceUsage(), minShare1);
       boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
           s2.getResourceUsage(), minShare2);
-      Resource one = Resources.createResource(1);
       minShareRatio1 = (double) s1.getResourceUsage().getMemory()
-          / Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory();
+          / Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory();
       minShareRatio2 = (double) s2.getResourceUsage().getMemory()
-          / Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory();
+          / Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory();
       useToWeightRatio1 = s1.getResourceUsage().getMemory() /
           s1.getWeights().getWeight(ResourceType.MEMORY);
       useToWeightRatio2 = s2.getResourceUsage().getMemory() /

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Mon Jul 21 21:44:50 2014
@@ -66,7 +66,6 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@@ -356,22 +355,28 @@ public class FifoScheduler extends
 
   @VisibleForTesting
   public synchronized void addApplication(ApplicationId applicationId,
-      String queue, String user) {
+      String queue, String user, boolean isAppRecovering) {
     SchedulerApplication<FiCaSchedulerApp> application =
         new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
     applications.put(applicationId, application);
     metrics.submitApp(user);
     LOG.info("Accepted application " + applicationId + " from user: " + user
         + ", currently num of applications: " + applications.size());
-    rmContext.getDispatcher().getEventHandler()
+    if (isAppRecovering) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+      }
+    } else {
+      rmContext.getDispatcher().getEventHandler()
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+    }
   }
 
   @VisibleForTesting
   public synchronized void
       addApplicationAttempt(ApplicationAttemptId appAttemptId,
           boolean transferStateFromPreviousAttempt,
-          boolean shouldNotifyAttemptAdded) {
+          boolean isAttemptRecovering) {
     SchedulerApplication<FiCaSchedulerApp> application =
         applications.get(appAttemptId.getApplicationId());
     String user = application.getUser();
@@ -389,14 +394,15 @@ public class FifoScheduler extends
     metrics.submitAppAttempt(user);
     LOG.info("Added Application Attempt " + appAttemptId
         + " to scheduler from user " + application.getUser());
-    if (shouldNotifyAttemptAdded) {
-      rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptEvent(appAttemptId,
-              RMAppAttemptEventType.ATTEMPT_ADDED));
-    } else {
+    if (isAttemptRecovering) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping notifying ATTEMPT_ADDED");
+        LOG.debug(appAttemptId
+            + " is recovering. Skipping notifying ATTEMPT_ADDED");
       }
+    } else {
+      rmContext.getDispatcher().getEventHandler().handle(
+        new RMAppAttemptEvent(appAttemptId,
+            RMAppAttemptEventType.ATTEMPT_ADDED));
     }
   }
 
@@ -772,7 +778,8 @@ public class FifoScheduler extends
     {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationId(),
-        appAddedEvent.getQueue(), appAddedEvent.getUser());
+        appAddedEvent.getQueue(), appAddedEvent.getUser(),
+        appAddedEvent.getIsAppRecovering());
     }
     break;
     case APP_REMOVED:
@@ -788,7 +795,7 @@ public class FifoScheduler extends
           (AppAttemptAddedSchedulerEvent) event;
       addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
         appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
-        appAttemptAddedEvent.getShouldNotifyAttemptAdded());
+        appAttemptAddedEvent.getIsAttemptRecovering());
     }
     break;
     case APP_ATTEMPT_REMOVED:
@@ -823,23 +830,6 @@ public class FifoScheduler extends
     }
   }
 
-  private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
-    // Get the application for the finished container
-    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Unknown application "
-          + containerId.getApplicationAttemptId().getApplicationId()
-          + " launched container " + containerId + " on node: " + node);
-      // Some unknown container sneaked into the system. Kill it.
-      this.rmContext.getDispatcher().getEventHandler()
-        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
-
-      return;
-    }
-    
-    application.containerLaunchedOnNode(containerId, node.getNodeID());
-  }
-
   @Lock(FifoScheduler.class)
   private synchronized void containerCompleted(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java Mon Jul 21 21:44:50 2014
@@ -19,22 +19,28 @@
 package org.apache.hadoop.yarn.server.resourcemanager.security;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.security.SecureRandom;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-
-import javax.crypto.SecretKey;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * AMRM-tokens are per ApplicationAttempt. If users redistribute their
@@ -49,40 +55,66 @@ public class AMRMTokenSecretManager exte
   private static final Log LOG = LogFactory
     .getLog(AMRMTokenSecretManager.class);
 
-  private SecretKey masterKey;
+  private int serialNo = new SecureRandom().nextInt();
+  private MasterKeyData nextMasterKey;
+  private MasterKeyData currentMasterKey;
+
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
   private final Timer timer;
   private final long rollingInterval;
+  private final long activationDelay;
 
-  private final Map<ApplicationAttemptId, byte[]> passwords =
-      new HashMap<ApplicationAttemptId, byte[]>();
+  private final Set<ApplicationAttemptId> appAttemptSet =
+      new HashSet<ApplicationAttemptId>();
 
   /**
    * Create an {@link AMRMTokenSecretManager}
    */
   public AMRMTokenSecretManager(Configuration conf) {
-    rollMasterKey();
     this.timer = new Timer();
     this.rollingInterval =
         conf
           .getLong(
             YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
             YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
+    // Adding delay = 1.5 * expiry interval makes sure that all active AMs get
+    // the updated shared-key.
+    this.activationDelay =
+        (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+            YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5);
+    LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval
+        + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay + " ms");
+    if (rollingInterval <= activationDelay * 2) {
+      throw new IllegalArgumentException(
+          YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS
+              + " should be more than 2 X "
+              + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS);
+    }
   }
 
   public void start() {
-    this.timer.scheduleAtFixedRate(new MasterKeyRoller(), 0, rollingInterval);
+    if (this.currentMasterKey == null) {
+      this.currentMasterKey = createNewMasterKey();
+    }
+    this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
+      rollingInterval);
   }
 
   public void stop() {
     this.timer.cancel();
   }
 
-  public synchronized void applicationMasterFinished(
-      ApplicationAttemptId appAttemptId) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Application finished, removing password for " + appAttemptId);
+  public void applicationMasterFinished(ApplicationAttemptId appAttemptId) {
+    this.writeLock.lock();
+    try {
+      LOG.info("Application finished, removing password for " + appAttemptId);
+      this.appAttemptSet.remove(appAttemptId);
+    } finally {
+      this.writeLock.unlock();
     }
-    this.passwords.remove(appAttemptId);
   }
 
   private class MasterKeyRoller extends TimerTask {
@@ -93,49 +125,89 @@ public class AMRMTokenSecretManager exte
   }
 
   @Private
-  public synchronized void setMasterKey(SecretKey masterKey) {
-    this.masterKey = masterKey;
+  void rollMasterKey() {
+    this.writeLock.lock();
+    try {
+      LOG.info("Rolling master-key for amrm-tokens");
+      this.nextMasterKey = createNewMasterKey();
+      this.timer.schedule(new NextKeyActivator(), this.activationDelay);
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
-  @Private
-  public synchronized SecretKey getMasterKey() {
-    return this.masterKey;
+  private class NextKeyActivator extends TimerTask {
+    @Override
+    public void run() {
+      activateNextMasterKey();
+    }
+  }
+
+  public void activateNextMasterKey() {
+    this.writeLock.lock();
+    try {
+      LOG.info("Activating next master key with id: "
+          + this.nextMasterKey.getMasterKey().getKeyId());
+      this.currentMasterKey = this.nextMasterKey;
+      this.nextMasterKey = null;
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
   @Private
-  synchronized void rollMasterKey() {
-    LOG.info("Rolling master-key for amrm-tokens");
-    this.masterKey = generateSecret();
+  @VisibleForTesting
+  public MasterKeyData createNewMasterKey() {
+    this.writeLock.lock();
+    try {
+      return new MasterKeyData(serialNo++, generateSecret());
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
-  /**
-   * Create a password for a given {@link AMRMTokenIdentifier}. Used to
-   * send to the AppicationAttempt which can give it back during authentication.
-   */
-  @Override
-  public synchronized byte[] createPassword(
-      AMRMTokenIdentifier identifier) {
-    ApplicationAttemptId applicationAttemptId =
-        identifier.getApplicationAttemptId();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Creating password for " + applicationAttemptId);
-    }
-    byte[] password = createPassword(identifier.getBytes(), masterKey);
-    this.passwords.put(applicationAttemptId, password);
-    return password;
+  public Token<AMRMTokenIdentifier> createAndGetAMRMToken(
+      ApplicationAttemptId appAttemptId) {
+    this.writeLock.lock();
+    try {
+      LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);
+      AMRMTokenIdentifier identifier =
+          new AMRMTokenIdentifier(appAttemptId, getMasterKey().getMasterKey()
+            .getKeyId());
+      byte[] password = this.createPassword(identifier);
+      appAttemptSet.add(appAttemptId);
+      return new Token<AMRMTokenIdentifier>(identifier.getBytes(), password,
+        identifier.getKind(), new Text());
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  // If nextMasterKey is not Null, then return nextMasterKey
+  // otherwise return currentMasterKey
+  @VisibleForTesting
+  public MasterKeyData getMasterKey() {
+    this.readLock.lock();
+    try {
+      return nextMasterKey == null ? currentMasterKey : nextMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   /**
    * Populate persisted password of AMRMToken back to AMRMTokenSecretManager.
    */
-  public synchronized void
-      addPersistedPassword(Token<AMRMTokenIdentifier> token) throws IOException {
-    AMRMTokenIdentifier identifier = token.decodeIdentifier();
-    if (LOG.isDebugEnabled()) {
+  public void addPersistedPassword(Token<AMRMTokenIdentifier> token)
+      throws IOException {
+    this.writeLock.lock();
+    try {
+      AMRMTokenIdentifier identifier = token.decodeIdentifier();
       LOG.debug("Adding password for " + identifier.getApplicationAttemptId());
+      appAttemptSet.add(identifier.getApplicationAttemptId());
+    } finally {
+      this.writeLock.unlock();
     }
-    this.passwords.put(identifier.getApplicationAttemptId(),
-      token.getPassword());
   }
 
   /**
@@ -143,19 +215,35 @@ public class AMRMTokenSecretManager exte
    * Used by RPC layer to validate a remote {@link AMRMTokenIdentifier}.
    */
   @Override
-  public synchronized byte[] retrievePassword(
-      AMRMTokenIdentifier identifier) throws InvalidToken {
-    ApplicationAttemptId applicationAttemptId =
-        identifier.getApplicationAttemptId();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Trying to retrieve password for " + applicationAttemptId);
-    }
-    byte[] password = this.passwords.get(applicationAttemptId);
-    if (password == null) {
-      throw new InvalidToken("Password not found for ApplicationAttempt "
-          + applicationAttemptId);
+  public byte[] retrievePassword(AMRMTokenIdentifier identifier)
+      throws InvalidToken {
+    this.readLock.lock();
+    try {
+      ApplicationAttemptId applicationAttemptId =
+          identifier.getApplicationAttemptId();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Trying to retrieve password for " + applicationAttemptId);
+      }
+      if (!appAttemptSet.contains(applicationAttemptId)) {
+        throw new InvalidToken("Password not found for ApplicationAttempt "
+            + applicationAttemptId);
+      }
+      if (identifier.getKeyId() == this.currentMasterKey.getMasterKey()
+        .getKeyId()) {
+        return createPassword(identifier.getBytes(),
+          this.currentMasterKey.getSecretKey());
+      } else if (nextMasterKey != null
+          && identifier.getKeyId() == this.nextMasterKey.getMasterKey()
+            .getKeyId()) {
+        return createPassword(identifier.getBytes(),
+          this.nextMasterKey.getSecretKey());
+      }
+      throw new InvalidToken("Given AMRMToken for application : "
+          + applicationAttemptId.toString()
+          + " seems to have been generated illegally.");
+    } finally {
+      this.readLock.unlock();
     }
-    return password;
   }
 
   /**
@@ -167,4 +255,40 @@ public class AMRMTokenSecretManager exte
     return new AMRMTokenIdentifier();
   }
 
+  @Private
+  @VisibleForTesting
+  public MasterKeyData getCurrnetMasterKeyData() {
+    this.readLock.lock();
+    try {
+      return this.currentMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Private
+  @VisibleForTesting
+  public MasterKeyData getNextMasterKeyData() {
+    this.readLock.lock();
+    try {
+      return this.nextMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  @Private
+  protected byte[] createPassword(AMRMTokenIdentifier identifier) {
+    this.readLock.lock();
+    try {
+      ApplicationAttemptId applicationAttemptId =
+          identifier.getApplicationAttemptId();
+      LOG.info("Creating password for " + applicationAttemptId);
+      return createPassword(identifier.getBytes(), getMasterKey()
+        .getSecretKey());
+    } finally {
+      this.readLock.unlock();
+    }
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java Mon Jul 21 21:44:50 2014
@@ -29,8 +29,10 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -193,4 +195,14 @@ public class RMDelegationTokenSecretMana
       addPersistedDelegationToken(entry.getKey(), entry.getValue());
     }
   }
+
+  public long getRenewDate(RMDelegationTokenIdentifier ident)
+      throws InvalidToken {
+    DelegationTokenInformation info = currentTokens.get(ident);
+    if (info == null) {
+      throw new InvalidToken("token (" + ident.toString()
+          + ") can't be found in cache");
+    }
+    return info.getRenewDate();
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java Mon Jul 21 21:44:50 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.security.AccessControlException;
 import java.nio.ByteBuffer;
+import java.security.Principal;
 import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.Collection;
@@ -36,6 +37,7 @@ import java.util.concurrent.ConcurrentMa
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
@@ -57,6 +59,8 @@ import org.apache.hadoop.io.DataOutputBu
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -67,6 +71,13 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -85,6 +96,7 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
@@ -109,6 +121,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CredentialsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FifoSchedulerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LocalResourceInfo;
@@ -118,6 +131,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.webapp.BadRequestException;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
@@ -139,6 +153,9 @@ public class RMWebServices {
   private final Configuration conf;
   private @Context HttpServletResponse response;
 
+  public final static String DELEGATION_TOKEN_HEADER =
+      "Hadoop-YARN-RM-Delegation-Token";
+
   @Inject
   public RMWebServices(final ResourceManager rm, Configuration conf) {
     this.rm = rm;
@@ -147,11 +164,7 @@ public class RMWebServices {
 
   protected Boolean hasAccess(RMApp app, HttpServletRequest hsr) {
     // Check for the authorization.
-    String remoteUser = hsr.getRemoteUser();
-    UserGroupInformation callerUGI = null;
-    if (remoteUser != null) {
-      callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
-    }
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
     if (callerUGI != null
         && !(this.rm.getApplicationACLsManager().checkAccess(callerUGI,
               ApplicationAccessType.VIEW_APP, app.getUser(),
@@ -626,7 +639,7 @@ public class RMWebServices {
   public AppState getAppState(@Context HttpServletRequest hsr,
       @PathParam("appid") String appId) throws AuthorizationException {
     init();
-    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
     String userName = "";
     if (callerUGI != null) {
       userName = callerUGI.getUserName();
@@ -661,7 +674,7 @@ public class RMWebServices {
       IOException {
 
     init();
-    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
     if (callerUGI == null) {
       String msg = "Unable to obtain user name, user not authenticated";
       throw new AuthorizationException(msg);
@@ -771,9 +784,14 @@ public class RMWebServices {
   }
 
   private UserGroupInformation getCallerUserGroupInformation(
-      HttpServletRequest hsr) {
+      HttpServletRequest hsr, boolean usePrincipal) {
 
     String remoteUser = hsr.getRemoteUser();
+    if (usePrincipal) {
+      Principal princ = hsr.getUserPrincipal();
+      remoteUser = princ == null ? null : princ.getName();
+    }
+
     UserGroupInformation callerUGI = null;
     if (remoteUser != null) {
       callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
@@ -799,7 +817,7 @@ public class RMWebServices {
   public Response createNewApplication(@Context HttpServletRequest hsr)
       throws AuthorizationException, IOException, InterruptedException {
     init();
-    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
     if (callerUGI == null) {
       throw new AuthorizationException("Unable to obtain user name, "
           + "user not authenticated");
@@ -835,7 +853,7 @@ public class RMWebServices {
       IOException, InterruptedException {
 
     init();
-    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr);
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
     if (callerUGI == null) {
       throw new AuthorizationException("Unable to obtain user name, "
           + "user not authenticated");
@@ -887,8 +905,8 @@ public class RMWebServices {
       throw new YarnRuntimeException(msg, e);
     }
     NewApplication appId =
-        new NewApplication(resp.getApplicationId().toString(), new ResourceInfo(
-          resp.getMaximumResourceCapability()));
+        new NewApplication(resp.getApplicationId().toString(),
+          new ResourceInfo(resp.getMaximumResourceCapability()));
     return appId;
   }
 
@@ -962,7 +980,8 @@ public class RMWebServices {
    * @throws IOException
    */
   protected ContainerLaunchContext createContainerLaunchContext(
-      ApplicationSubmissionContextInfo newApp) throws BadRequestException, IOException {
+      ApplicationSubmissionContextInfo newApp) throws BadRequestException,
+      IOException {
 
     // create container launch context
 
@@ -1033,4 +1052,238 @@ public class RMWebServices {
     }
     return ret;
   }
+
+  private UserGroupInformation createKerberosUserGroupInformation(
+      HttpServletRequest hsr) throws AuthorizationException, YarnException {
+
+    UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
+    if (callerUGI == null) {
+      String msg = "Unable to obtain user name, user not authenticated";
+      throw new AuthorizationException(msg);
+    }
+
+    String authType = hsr.getAuthType();
+    if (!KerberosAuthenticationHandler.TYPE.equals(authType)) {
+      String msg =
+          "Delegation token operations can only be carried out on a "
+              + "Kerberos authenticated channel";
+      throw new YarnException(msg);
+    }
+
+    callerUGI.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
+    return callerUGI;
+  }
+
+  @POST
+  @Path("/delegation-token")
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response postDelegationToken(DelegationToken tokenData,
+      @Context HttpServletRequest hsr) throws AuthorizationException,
+      IOException, InterruptedException, Exception {
+
+    init();
+    UserGroupInformation callerUGI;
+    try {
+      callerUGI = createKerberosUserGroupInformation(hsr);
+    } catch (YarnException ye) {
+      return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build();
+    }
+    return createDelegationToken(tokenData, hsr, callerUGI);
+  }
+
+  @POST
+  @Path("/delegation-token/expiration")
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response
+      postDelegationTokenExpiration(@Context HttpServletRequest hsr)
+          throws AuthorizationException, IOException, InterruptedException,
+          Exception {
+
+    init();
+    UserGroupInformation callerUGI;
+    try {
+      callerUGI = createKerberosUserGroupInformation(hsr);
+    } catch (YarnException ye) {
+      return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build();
+    }
+
+    DelegationToken requestToken = new DelegationToken();
+    requestToken.setToken(extractToken(hsr).encodeToUrlString());
+    return renewDelegationToken(requestToken, hsr, callerUGI);
+  }
+
+  private Response createDelegationToken(DelegationToken tokenData,
+      HttpServletRequest hsr, UserGroupInformation callerUGI)
+      throws AuthorizationException, IOException, InterruptedException,
+      Exception {
+
+    final String renewer = tokenData.getRenewer();
+    GetDelegationTokenResponse resp;
+    try {
+      resp =
+          callerUGI
+            .doAs(new PrivilegedExceptionAction<GetDelegationTokenResponse>() {
+              @Override
+              public GetDelegationTokenResponse run() throws IOException,
+                  YarnException {
+                GetDelegationTokenRequest createReq =
+                    GetDelegationTokenRequest.newInstance(renewer);
+                return rm.getClientRMService().getDelegationToken(createReq);
+              }
+            });
+    } catch (Exception e) {
+      LOG.info("Create delegation token request failed", e);
+      throw e;
+    }
+
+    Token<RMDelegationTokenIdentifier> tk =
+        new Token<RMDelegationTokenIdentifier>(resp.getRMDelegationToken()
+          .getIdentifier().array(), resp.getRMDelegationToken().getPassword()
+          .array(), new Text(resp.getRMDelegationToken().getKind()), new Text(
+          resp.getRMDelegationToken().getService()));
+    RMDelegationTokenIdentifier identifier = tk.decodeIdentifier();
+    long currentExpiration =
+        rm.getRMContext().getRMDelegationTokenSecretManager()
+          .getRenewDate(identifier);
+    DelegationToken respToken =
+        new DelegationToken(tk.encodeToUrlString(), renewer, identifier
+          .getOwner().toString(), tk.getKind().toString(), currentExpiration,
+          identifier.getMaxDate());
+    return Response.status(Status.OK).entity(respToken).build();
+  }
+
+  private Response renewDelegationToken(DelegationToken tokenData,
+      HttpServletRequest hsr, UserGroupInformation callerUGI)
+      throws AuthorizationException, IOException, InterruptedException,
+      Exception {
+
+    Token<RMDelegationTokenIdentifier> token =
+        extractToken(tokenData.getToken());
+
+    org.apache.hadoop.yarn.api.records.Token dToken =
+        BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind()
+          .toString(), token.getPassword(), token.getService().toString());
+    final RenewDelegationTokenRequest req =
+        RenewDelegationTokenRequest.newInstance(dToken);
+
+    RenewDelegationTokenResponse resp;
+    try {
+      resp =
+          callerUGI
+            .doAs(new PrivilegedExceptionAction<RenewDelegationTokenResponse>() {
+              @Override
+              public RenewDelegationTokenResponse run() throws IOException,
+                  YarnException {
+                return rm.getClientRMService().renewDelegationToken(req);
+              }
+            });
+    } catch (UndeclaredThrowableException ue) {
+      if (ue.getCause() instanceof YarnException) {
+        if (ue.getCause().getCause() instanceof InvalidToken) {
+          throw new BadRequestException(ue.getCause().getCause().getMessage());
+        } else if (ue.getCause().getCause() instanceof org.apache.hadoop.security.AccessControlException) {
+          return Response.status(Status.FORBIDDEN)
+            .entity(ue.getCause().getCause().getMessage()).build();
+        }
+        LOG.info("Renew delegation token request failed", ue);
+        throw ue;
+      }
+      LOG.info("Renew delegation token request failed", ue);
+      throw ue;
+    } catch (Exception e) {
+      LOG.info("Renew delegation token request failed", e);
+      throw e;
+    }
+    long renewTime = resp.getNextExpirationTime();
+
+    DelegationToken respToken = new DelegationToken();
+    respToken.setNextExpirationTime(renewTime);
+    return Response.status(Status.OK).entity(respToken).build();
+  }
+
+  // For cancelling tokens, the encoded token is passed as a header
+  // There are two reasons for this -
+  // 1. Passing a request body as part of a DELETE request is not
+  // allowed by Jetty
+  // 2. Passing the encoded token as part of the url is not ideal
+  // since urls tend to get logged and anyone with access to
+  // the logs can extract tokens which are meant to be secret
+  @DELETE
+  @Path("/delegation-token")
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response cancelDelegationToken(@Context HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException,
+      Exception {
+
+    init();
+    UserGroupInformation callerUGI;
+    try {
+      callerUGI = createKerberosUserGroupInformation(hsr);
+    } catch (YarnException ye) {
+      return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build();
+    }
+
+    Token<RMDelegationTokenIdentifier> token = extractToken(hsr);
+
+    org.apache.hadoop.yarn.api.records.Token dToken =
+        BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind()
+          .toString(), token.getPassword(), token.getService().toString());
+    final CancelDelegationTokenRequest req =
+        CancelDelegationTokenRequest.newInstance(dToken);
+
+    try {
+      callerUGI
+        .doAs(new PrivilegedExceptionAction<CancelDelegationTokenResponse>() {
+          @Override
+          public CancelDelegationTokenResponse run() throws IOException,
+              YarnException {
+            return rm.getClientRMService().cancelDelegationToken(req);
+          }
+        });
+    } catch (UndeclaredThrowableException ue) {
+      if (ue.getCause() instanceof YarnException) {
+        if (ue.getCause().getCause() instanceof InvalidToken) {
+          throw new BadRequestException(ue.getCause().getCause().getMessage());
+        } else if (ue.getCause().getCause() instanceof org.apache.hadoop.security.AccessControlException) {
+          return Response.status(Status.FORBIDDEN)
+            .entity(ue.getCause().getCause().getMessage()).build();
+        }
+        LOG.info("Renew delegation token request failed", ue);
+        throw ue;
+      }
+      LOG.info("Renew delegation token request failed", ue);
+      throw ue;
+    } catch (Exception e) {
+      LOG.info("Renew delegation token request failed", e);
+      throw e;
+    }
+
+    return Response.status(Status.OK).build();
+  }
+
+  private Token<RMDelegationTokenIdentifier> extractToken(
+      HttpServletRequest request) {
+    String encodedToken = request.getHeader(DELEGATION_TOKEN_HEADER);
+    if (encodedToken == null) {
+      String msg =
+          "Header '" + DELEGATION_TOKEN_HEADER
+              + "' containing encoded token not found";
+      throw new BadRequestException(msg);
+    }
+    return extractToken(encodedToken);
+  }
+
+  private Token<RMDelegationTokenIdentifier> extractToken(String encodedToken) {
+    Token<RMDelegationTokenIdentifier> token =
+        new Token<RMDelegationTokenIdentifier>();
+    try {
+      token.decodeFromUrlString(encodedToken);
+    } catch (Exception ie) {
+      String msg = "Could not decode encoded token";
+      throw new BadRequestException(msg);
+    }
+    return token;
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java Mon Jul 21 21:44:50 2014
@@ -232,20 +232,7 @@ public class TestApplicationCleanup {
     containerStatuses.put(app.getApplicationId(), containerStatusList);
 
     NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
-    dispatcher.await();
-    List<ContainerId> contsToClean = resp.getContainersToCleanup();
-    int cleanedConts = contsToClean.size();
-    waitCount = 0;
-    while (cleanedConts < 1 && waitCount++ < 200) {
-      LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
-      Thread.sleep(100);
-      resp = nm1.nodeHeartbeat(true);
-      dispatcher.await();
-      contsToClean = resp.getContainersToCleanup();
-      cleanedConts += contsToClean.size();
-    }
-    LOG.info("Got cleanup for " + contsToClean.get(0));
-    Assert.assertEquals(1, cleanedConts);
+    waitForContainerCleanup(dispatcher, nm1, resp);
 
     // Now to test the case when RM already gave cleanup, and NM suddenly
     // realizes that the container is running.
@@ -258,26 +245,36 @@ public class TestApplicationCleanup {
     containerStatuses.put(app.getApplicationId(), containerStatusList);
 
     resp = nm1.nodeHeartbeat(containerStatuses, true);
-    dispatcher.await();
-    contsToClean = resp.getContainersToCleanup();
-    cleanedConts = contsToClean.size();
     // The cleanup list won't be instantaneous as it is given out by scheduler
     // and not RMNodeImpl.
-    waitCount = 0;
-    while (cleanedConts < 1 && waitCount++ < 200) {
-      LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
-      Thread.sleep(100);
-      resp = nm1.nodeHeartbeat(true);
+    waitForContainerCleanup(dispatcher, nm1, resp);
+
+    rm.stop();
+  }
+
+  protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm,
+      NodeHeartbeatResponse resp) throws Exception {
+    int waitCount = 0, cleanedConts = 0;
+    List<ContainerId> contsToClean;
+    do {
       dispatcher.await();
       contsToClean = resp.getContainersToCleanup();
       cleanedConts += contsToClean.size();
+      if (cleanedConts >= 1) {
+        break;
+      }
+      Thread.sleep(100);
+      resp = nm.nodeHeartbeat(true);
+    } while(waitCount++ < 200);
+
+    if (contsToClean.isEmpty()) {
+      LOG.error("Failed to get any containers to cleanup");
+    } else {
+      LOG.info("Got cleanup for " + contsToClean.get(0));
     }
-    LOG.info("Got cleanup for " + contsToClean.get(0));
     Assert.assertEquals(1, cleanedConts);
-
-    rm.stop();
   }
-  
+
   private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId)
       throws Exception {
     while (true) {
@@ -400,6 +397,58 @@ public class TestApplicationCleanup {
     rm2.stop();
   }
 
+  @SuppressWarnings("resource")
+  @Test (timeout = 60000)
+  public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws
+      Exception {
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // start RM
+    final DrainDispatcher dispatcher = new DrainDispatcher();
+    MockRM rm1 = new MockRM(conf, memStore) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = launchAM(app0, rm1, nm1);
+    nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING);
+    rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
+
+    // start new RM
+    final DrainDispatcher dispatcher2 = new DrainDispatcher();
+    MockRM rm2 = new MockRM(conf, memStore) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher2;
+      }
+    };
+    rm2.start();
+
+    // nm1 register to rm2, and do a heartbeat
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    nm1.registerNode(Arrays.asList(app0.getApplicationId()));
+    rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
+
+    // Add unknown container for application unknown to scheduler
+    NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0
+        .getApplicationAttemptId(), 2, ContainerState.RUNNING);
+
+    waitForContainerCleanup(dispatcher2, nm1, response);
+
+    rm1.stop();
+    rm2.stop();
+  }
+
   public static void main(String[] args) throws Exception {
     TestApplicationCleanup t = new TestApplicationCleanup();
     t.testAppCleanup();

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java Mon Jul 21 21:44:50 2014
@@ -228,7 +228,7 @@ public class TestFifoScheduler {
     scheduler.handle(new NodeAddedSchedulerEvent(node));
 
     ApplicationId appId = ApplicationId.newInstance(0, 1);
-    scheduler.addApplication(appId, "queue1", "user1");
+    scheduler.addApplication(appId, "queue1", "user1", false);
 
     NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
     try {
@@ -238,7 +238,7 @@ public class TestFifoScheduler {
     }
 
     ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1);
-    scheduler.addApplicationAttempt(attId, false, true);
+    scheduler.addApplicationAttempt(attId, false, false);
 
     rm.stop();
   }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Mon Jul 21 21:44:50 2014
@@ -1250,10 +1250,11 @@ public class TestRMRestart {
             .getEncoded());
 
     // assert AMRMTokenSecretManager also knows about the AMRMToken password
-    Token<AMRMTokenIdentifier> amrmToken = loadedAttempt1.getAMRMToken();
-    Assert.assertArrayEquals(amrmToken.getPassword(),
-      rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword(
-        amrmToken.decodeIdentifier()));
+    // TODO: fix this on YARN-2211
+//    Token<AMRMTokenIdentifier> amrmToken = loadedAttempt1.getAMRMToken();
+//    Assert.assertArrayEquals(amrmToken.getPassword(),
+//      rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword(
+//        amrmToken.decodeIdentifier()));
     rm1.stop();
     rm2.stop();
   }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Mon Jul 21 21:44:50 2014
@@ -610,6 +610,36 @@ public class TestWorkPreservingRMRestart
         attempt0.getMasterContainer().getId()).isAMContainer());
   }
 
+  @Test (timeout = 20000)
+  public void testRecoverSchedulerAppAndAttemptSynchronously() throws Exception {
+    // start RM
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    // create app and launch the AM
+    RMApp app0 = rm1.submitApp(200);
+    MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
+
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    // scheduler app/attempt is immediately available after RM is re-started.
+    Assert.assertNotNull(rm2.getResourceScheduler().getSchedulerAppInfo(
+      am0.getApplicationAttemptId()));
+
+    // getTransferredContainers should not throw NPE.
+    ((AbstractYarnScheduler) rm2.getResourceScheduler())
+      .getTransferredContainers(am0.getApplicationAttemptId());
+
+    List<NMContainerStatus> containers = createNMContainerStatusForApp(am0);
+    nm1.registerNode(containers, null);
+    waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId());
+  }
 
   private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
       int appsPending, int appsRunning, int appsCompleted,

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java Mon Jul 21 21:44:50 2014
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -34,7 +35,6 @@ import java.util.Map;
 import javax.crypto.SecretKey;
 
 import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 public class RMStateStoreTestBase extends ClientBaseWithFixes{
@@ -175,8 +176,11 @@ public class RMStateStoreTestBase extend
     TestDispatcher dispatcher = new TestDispatcher();
     store.setRMDispatcher(dispatcher);
 
-    AMRMTokenSecretManager appTokenMgr =
-        new AMRMTokenSecretManager(conf);
+    AMRMTokenSecretManager appTokenMgr = spy(
+        new AMRMTokenSecretManager(conf));
+    MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
+    when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);
+
     ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
         new ClientToAMTokenSecretManagerInRM();
 
@@ -455,10 +459,8 @@ public class RMStateStoreTestBase extend
   private Token<AMRMTokenIdentifier> generateAMRMToken(
       ApplicationAttemptId attemptId,
       AMRMTokenSecretManager appTokenMgr) {
-    AMRMTokenIdentifier appTokenId =
-        new AMRMTokenIdentifier(attemptId);
     Token<AMRMTokenIdentifier> appToken =
-        new Token<AMRMTokenIdentifier>(appTokenId, appTokenMgr);
+        appTokenMgr.createAndGetAMRMToken(attemptId);
     appToken.setService(new Text("appToken service"));
     return appToken;
   }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Mon Jul 21 21:44:50 2014
@@ -97,6 +97,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@@ -224,6 +225,8 @@ public class TestRMAppAttemptTransitions
     amLivelinessMonitor = mock(AMLivelinessMonitor.class);
     amFinishingMonitor = mock(AMLivelinessMonitor.class);
     writer = mock(RMApplicationHistoryWriter.class);
+    MasterKeyData masterKeyData = amRMTokenManager.createNewMasterKey();
+    when(amRMTokenManager.getMasterKey()).thenReturn(masterKeyData);
     rmContext =
         new RMContextImpl(rmDispatcher,
           containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java Mon Jul 21 21:44:50 2014
@@ -26,6 +26,9 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -36,17 +39,24 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
@@ -204,4 +214,36 @@ public class TestRMContainerImpl {
     assertEquals(RMContainerState.RUNNING, rmContainer.getState());
     verify(writer, never()).containerFinished(any(RMContainer.class));
   }
+  
+  @Test
+  public void testExistenceOfResourceRequestInRMContainer() throws Exception {
+    Configuration conf = new Configuration();
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
+    RMApp app1 = rm1.submitApp(1024);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    ResourceScheduler scheduler = rm1.getResourceScheduler();
+
+    // request a container.
+    am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
+    ContainerId containerId2 = ContainerId.newInstance(
+        am1.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
+
+    // Verify whether list of ResourceRequest is present in RMContainer
+    // while moving to ALLOCATED state
+    Assert.assertNotNull(scheduler.getRMContainer(containerId2)
+        .getResourceRequests());
+
+    // Allocate container
+    am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>())
+        .getAllocatedContainers();
+    rm1.waitForState(nm1, containerId2, RMContainerState.ACQUIRED);
+
+    // After RMContainer moving to ACQUIRED state, list of ResourceRequest will
+    // be empty
+    Assert.assertNull(scheduler.getRMContainer(containerId2)
+        .getResourceRequests());
+  }
 }

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1612403&r1=1612402&r2=1612403&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Mon Jul 21 21:44:50 2014
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -79,6 +80,8 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -87,6 +90,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -947,4 +951,67 @@ public class TestCapacityScheduler {
 
     rm1.stop();
   }
+  
+  @Test(timeout = 30000)
+  public void testRecoverRequestAfterPreemption() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
+    RMApp app1 = rm1.submitApp(1024);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+    // request a container.
+    am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
+    ContainerId containerId1 = ContainerId.newInstance(
+        am1.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, containerId1, RMContainerState.ALLOCATED);
+
+    RMContainer rmContainer = cs.getRMContainer(containerId1);
+    List<ResourceRequest> requests = rmContainer.getResourceRequests();
+    FiCaSchedulerApp app = cs.getApplicationAttempt(am1
+        .getApplicationAttemptId());
+
+    FiCaSchedulerNode node = cs.getNode(rmContainer.getAllocatedNode());
+    for (ResourceRequest request : requests) {
+      // Skip the OffRack and RackLocal resource requests.
+      if (request.getResourceName().equals(node.getRackName())
+          || request.getResourceName().equals(ResourceRequest.ANY)) {
+        continue;
+      }
+
+      // Already the node local resource request is cleared from RM after
+      // allocation.
+      Assert.assertNull(app.getResourceRequest(request.getPriority(),
+          request.getResourceName()));
+    }
+
+    // Call killContainer to preempt the container
+    cs.killContainer(rmContainer);
+
+    Assert.assertEquals(3, requests.size());
+    for (ResourceRequest request : requests) {
+      // Resource request must have added back in RM after preempt event
+      // handling.
+      Assert.assertEquals(
+          1,
+          app.getResourceRequest(request.getPriority(),
+              request.getResourceName()).getNumContainers());
+    }
+
+    // New container will be allocated and will move to ALLOCATED state
+    ContainerId containerId2 = ContainerId.newInstance(
+        am1.getApplicationAttemptId(), 3);
+    rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
+
+    // allocate container
+    List<Container> containers = am1.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>()).getAllocatedContainers();
+
+    // Now with updated ResourceRequest, a container is allocated for AM.
+    Assert.assertTrue(containers.size() == 1);
+  }
 }



Mime
View raw message