hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject hadoop git commit: YARN-4479. Change CS LeafQueue pendingOrderingPolicy to hornor recovered apps. Contributed by Rohith Sharma K S
Date Fri, 08 Jan 2016 23:52:10 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 eef26388c -> 17d29b515


YARN-4479. Change CS LeafQueue pendingOrderingPolicy to hornor recovered apps. Contributed
by Rohith Sharma K S

(cherry picked from commit 109e528ef5d8df07443373751266b4417acc981a)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/17d29b51
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/17d29b51
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/17d29b51

Branch: refs/heads/branch-2.8
Commit: 17d29b515f0d71a937d334c7657773fe083e706e
Parents: eef2638
Author: Jian He <jianhe@apache.org>
Authored: Fri Jan 8 15:51:10 2016 -0800
Committer: Jian He <jianhe@apache.org>
Committed: Fri Jan 8 15:52:03 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../dev-support/findbugs-exclude.xml            |   2 +
 .../scheduler/SchedulerApplicationAttempt.java  |   9 +
 .../scheduler/capacity/CapacityScheduler.java   |   2 +-
 .../scheduler/capacity/LeafQueue.java           |  63 ++++++-
 .../scheduler/common/fica/FiCaSchedulerApp.java |   5 +-
 .../capacity/TestApplicationPriority.java       | 164 +++++++++++++++++++
 .../scheduler/capacity/TestLeafQueue.java       |   6 +-
 8 files changed, 241 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/17d29b51/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 55f27ae..38e5db9 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -1116,6 +1116,9 @@ Release 2.8.0 - UNRELEASED
     YARN-4546. ResourceManager crash due to scheduling opportunity overflow.
     (Jason Lowe via junping_du)
 
+    YARN-4479. Change CS LeafQueue pendingOrderingPolicy to hornor recovered apps.
+    (Rohith Sharma K S via jianhe)
+
 Release 2.7.3 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17d29b51/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 2d0d5d6..c79a35e 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -205,6 +205,8 @@
       <Field name="userLimitFactor" />
       <Field name="maxAMResourcePerQueuePercent" />
       <Field name="lastClusterResource" />
+      <Field name="pendingOrderingPolicy" />
+      <Field name="pendingOPForRecoveredApps" />
     </Or>
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17d29b51/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 61d8d49..9fe2e59 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -109,6 +109,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity
{
   private LogAggregationContext logAggregationContext;
 
   private volatile Priority appPriority = null;
+  private boolean isAttemptRecovering;
 
   protected ResourceUsage attemptResourceUsage = new ResourceUsage();
   private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0);
@@ -967,6 +968,14 @@ public class SchedulerApplicationAttempt implements SchedulableEntity
{
     // queue's resource usage for specific partition
   }
 
+  public boolean isAttemptRecovering() {
+    return isAttemptRecovering;
+  }
+
+  protected void setAttemptRecovering(boolean isRecovering) {
+    this.isAttemptRecovering = isRecovering;
+  }
+
   public static enum AMState {
     UNMANAGED("User launched the Application Master, since it's unmanaged. "),
     INACTIVATED("Application is added to the scheduler and is not yet activated. "),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17d29b51/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 159c7a5..0a4ff54 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -783,7 +783,7 @@ public class CapacityScheduler extends
 
     FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
         application.getUser(), queue, queue.getActiveUsersManager(), rmContext,
-        application.getPriority());
+            application.getPriority(), isAttemptRecovering);
     if (transferStateFromPreviousAttempt) {
       attempt.transferStateFromPreviousAttempt(
           application.getCurrentAppAttempt());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17d29b51/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index be1ba89..c63e81e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -95,6 +95,9 @@ public class LeafQueue extends AbstractCSQueue {
 
   private OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy = null;
   
+  // Always give preference to this while activating the application attempts.
+  private OrderingPolicy<FiCaSchedulerApp> pendingOPForRecoveredApps = null;
+
   private volatile float minimumAllocationFactor;
 
   private Map<String, User> users = new HashMap<String, User>();
@@ -156,6 +159,8 @@ public class LeafQueue extends AbstractCSQueue {
     setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
     setPendingAppsOrderingPolicy(conf
         .<FiCaSchedulerApp> getOrderingPolicy(getQueuePath()));
+    setPendingAppsOrderingPolicyRecovery(conf
+        .<FiCaSchedulerApp> getOrderingPolicy(getQueuePath()));
     
     userLimit = conf.getUserLimit(getQueuePath());
     userLimitFactor = conf.getUserLimitFactor(getQueuePath());
@@ -320,7 +325,8 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   public synchronized int getNumPendingApplications() {
-    return pendingOrderingPolicy.getNumSchedulableEntities();
+    return pendingOrderingPolicy.getNumSchedulableEntities()
+        + pendingOPForRecoveredApps.getNumSchedulableEntities();
   }
 
   public synchronized int getNumActiveApplications() {
@@ -607,9 +613,19 @@ public class LeafQueue extends AbstractCSQueue {
     Map<String, Resource> userAmPartitionLimit =
         new HashMap<String, Resource>();
 
-    for (Iterator<FiCaSchedulerApp> i = getPendingAppsOrderingPolicy()
-        .getAssignmentIterator(); i.hasNext();) {
-      FiCaSchedulerApp application = i.next();
+    activateApplications(getPendingAppsOrderingPolicyRecovery()
+        .getAssignmentIterator(), amPartitionLimit, userAmPartitionLimit);
+
+    activateApplications(
+        getPendingAppsOrderingPolicy().getAssignmentIterator(),
+        amPartitionLimit, userAmPartitionLimit);
+  }
+
+  private synchronized void activateApplications(
+      Iterator<FiCaSchedulerApp> fsApp, Map<String, Resource> amPartitionLimit,
+      Map<String, Resource> userAmPartitionLimit) {
+    while (fsApp.hasNext()) {
+      FiCaSchedulerApp application = fsApp.next();
       ApplicationId applicationId = application.getApplicationId();
 
       // Get the am-node-partition associated with each application
@@ -700,7 +716,7 @@ public class LeafQueue extends AbstractCSQueue {
       metrics.incAMUsed(application.getUser(),
           application.getAMResource(partitionName));
       metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit);
-      i.remove();
+      fsApp.remove();
       LOG.info("Application " + applicationId + " from user: "
           + application.getUser() + " activated in queue: " + getQueueName());
     }
@@ -710,7 +726,11 @@ public class LeafQueue extends AbstractCSQueue {
       User user) {
     // Accept 
     user.submitApplication();
-    getPendingAppsOrderingPolicy().addSchedulableEntity(application);
+    if (application.isAttemptRecovering()) {
+      getPendingAppsOrderingPolicyRecovery().addSchedulableEntity(application);
+    } else {
+      getPendingAppsOrderingPolicy().addSchedulableEntity(application);
+    }
     applicationAttemptMap.put(application.getApplicationAttemptId(), application);
 
     // Activate applications
@@ -750,7 +770,11 @@ public class LeafQueue extends AbstractCSQueue {
     boolean wasActive =
       orderingPolicy.removeSchedulableEntity(application);
     if (!wasActive) {
-      pendingOrderingPolicy.removeSchedulableEntity(application);
+      if (application.isAttemptRecovering()) {
+        pendingOPForRecoveredApps.removeSchedulableEntity(application);
+      } else {
+        pendingOrderingPolicy.removeSchedulableEntity(application);
+      }
     } else {
       queueUsage.decAMUsed(partitionName,
           application.getAMResource(partitionName));
@@ -1499,7 +1523,11 @@ public class LeafQueue extends AbstractCSQueue {
    * Obtain (read-only) collection of pending applications.
    */
   public Collection<FiCaSchedulerApp> getPendingApplications() {
-    return pendingOrderingPolicy.getSchedulableEntities();
+    Collection<FiCaSchedulerApp> pendingApps =
+        new ArrayList<FiCaSchedulerApp>();
+    pendingApps.addAll(pendingOPForRecoveredApps.getSchedulableEntities());
+    pendingApps.addAll(pendingOrderingPolicy.getSchedulableEntities());
+    return pendingApps;
   }
 
   /**
@@ -1543,6 +1571,10 @@ public class LeafQueue extends AbstractCSQueue {
   @Override
   public synchronized void collectSchedulerApplications(
       Collection<ApplicationAttemptId> apps) {
+    for (FiCaSchedulerApp pendingApp : pendingOPForRecoveredApps
+        .getSchedulableEntities()) {
+      apps.add(pendingApp.getApplicationAttemptId());
+    }
     for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy
         .getSchedulableEntities()) {
       apps.add(pendingApp.getApplicationAttemptId());
@@ -1678,6 +1710,21 @@ public class LeafQueue extends AbstractCSQueue {
     this.pendingOrderingPolicy = pendingOrderingPolicy;
   }
 
+  public synchronized OrderingPolicy<FiCaSchedulerApp>
+      getPendingAppsOrderingPolicyRecovery() {
+    return pendingOPForRecoveredApps;
+  }
+
+  public synchronized void setPendingAppsOrderingPolicyRecovery(
+      OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicyRecovery) {
+    if (null != this.pendingOPForRecoveredApps) {
+      pendingOrderingPolicyRecovery
+          .addAllSchedulableEntities(this.pendingOPForRecoveredApps
+              .getSchedulableEntities());
+    }
+    this.pendingOPForRecoveredApps = pendingOrderingPolicyRecovery;
+  }
+
   /*
    * Holds shared values used by all applications in
    * the queue to calculate headroom on demand

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17d29b51/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index c9c792e..4b88415 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -99,12 +99,12 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       String user, Queue queue, ActiveUsersManager activeUsersManager,
       RMContext rmContext) {
     this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
-        Priority.newInstance(0));
+        Priority.newInstance(0), false);
   }
 
   public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
       String user, Queue queue, ActiveUsersManager activeUsersManager,
-      RMContext rmContext, Priority appPriority) {
+      RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) {
     super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
     
     RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
@@ -129,6 +129,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     setAppAMNodePartitionName(partition);
     setAMResource(partition, amResource);
     setPriority(appPriority);
+    setAttemptRecovering(isAttemptRecovering);
 
     scheduler = rmContext.getScheduler();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17d29b51/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
index 169e9f6..2ad805a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
@@ -567,4 +569,166 @@ public class TestApplicationPriority {
     Assert.assertEquals(6, schedulerAppAttemptApp1.getLiveContainers().size());
     rm.stop();
   }
+
+  /**
+   * <p>
+   * Test case verifies the order of applications activated after RM Restart.
+   * </p>
+   * <li>App-1 and app-2 submitted and scheduled and running with a priority
+   * 5 and 6 Respectively</li>
+   * <li>App-3 submitted and scheduled with a priority 7. This
+   * is not activated since AMResourceLimit is reached</li>
+   * <li>RM restarted</li>
+   * <li>App-1 get activated nevertheless of AMResourceLimit</li>
+   * <li>App-2 and app-3 put in pendingOrderingPolicy</li>
+   * <li>After NM registration, app-3 is activated</li>
+   * <p>
+   * Expected Output : App-2 must get activated since app-2 was running earlier
+   * </p>
+   * @throws Exception
+   */
+  @Test
+  public void testOrderOfActivatingThePriorityApplicationOnRMRestart()
+      throws Exception {
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
+    conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
+    final DrainDispatcher dispatcher = new DrainDispatcher();
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    MockRM rm1 = new MockRM(conf, memStore) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm1.start();
+
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 16384, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    dispatcher.await();
+
+    ResourceScheduler scheduler = rm1.getRMContext().getScheduler();
+    LeafQueue defaultQueue =
+        (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default");
+    int memory = defaultQueue.getAMResourceLimit().getMemory() / 2;
+
+    // App-1 with priority 5 submitted and running
+    Priority appPriority1 = Priority.newInstance(5);
+    RMApp app1 = rm1.submitApp(memory, appPriority1);
+    MockAM am1 = MockRM.launchAM(app1, rm1, nm1);
+    am1.registerAppAttempt();
+
+    // App-2 with priority 6 submitted and running
+    Priority appPriority2 = Priority.newInstance(6);
+    RMApp app2 = rm1.submitApp(memory, appPriority2);
+    MockAM am2 = MockRM.launchAM(app2, rm1, nm1);
+    am2.registerAppAttempt();
+
+    dispatcher.await();
+    Assert.assertEquals(2, defaultQueue.getNumActiveApplications());
+    Assert.assertEquals(0, defaultQueue.getNumPendingApplications());
+
+    // App-3 with priority 7 submitted and scheduled. But not activated since
+    // AMResourceLimit threshold
+    Priority appPriority3 = Priority.newInstance(7);
+    RMApp app3 = rm1.submitApp(memory, appPriority3);
+
+    dispatcher.await();
+    Assert.assertEquals(2, defaultQueue.getNumActiveApplications());
+    Assert.assertEquals(1, defaultQueue.getNumPendingApplications());
+
+    Iterator<FiCaSchedulerApp> iterator =
+        defaultQueue.getOrderingPolicy().getSchedulableEntities().iterator();
+    FiCaSchedulerApp fcApp2 = iterator.next();
+    Assert.assertEquals(app2.getCurrentAppAttempt().getAppAttemptId(),
+        fcApp2.getApplicationAttemptId());
+
+    FiCaSchedulerApp fcApp1 = iterator.next();
+    Assert.assertEquals(app1.getCurrentAppAttempt().getAppAttemptId(),
+        fcApp1.getApplicationAttemptId());
+
+    iterator = defaultQueue.getPendingApplications().iterator();
+    FiCaSchedulerApp fcApp3 = iterator.next();
+    Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(),
+        fcApp3.getApplicationAttemptId());
+
+    final DrainDispatcher dispatcher1 = new DrainDispatcher();
+    // create new RM to represent restart and recover state
+    MockRM rm2 = new MockRM(conf, memStore) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher1;
+      }
+    };
+
+    // start new RM
+    rm2.start();
+    // change NM to point to new RM
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+
+    // Verify RM Apps after this restart
+    Assert.assertEquals(3, rm2.getRMContext().getRMApps().size());
+
+    dispatcher1.await();
+    scheduler = rm2.getRMContext().getScheduler();
+    defaultQueue =
+        (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default");
+
+    // wait for all applications to get added to scheduler
+    int count = 5;
+    while (count-- > 0) {
+      if ((defaultQueue.getNumActiveApplications() + defaultQueue
+          .getNumPendingApplications()) == 3) {
+        break;
+      }
+      Thread.sleep(500);
+    }
+
+    // Before NM registration, AMResourceLimit threshold is 0. So 1st
+    // applications get activated nevertheless of AMResourceLimit threshold
+    // Two applications are in pending
+    Assert.assertEquals(1, defaultQueue.getNumActiveApplications());
+    Assert.assertEquals(2, defaultQueue.getNumPendingApplications());
+
+    // NM resync to new RM
+    nm1.registerNode();
+    dispatcher1.await();
+
+    // wait for activating one applications
+    count = 5;
+    while (count-- > 0) {
+      if (defaultQueue.getOrderingPolicy().getSchedulableEntities().size() == 2) {
+        break;
+      }
+      Thread.sleep(500);
+    }
+
+    // verify for order of activated applications iterator
+    iterator =
+        defaultQueue.getOrderingPolicy().getSchedulableEntities().iterator();
+    fcApp2 = iterator.next();
+    Assert.assertEquals(app2.getCurrentAppAttempt().getAppAttemptId(),
+        fcApp2.getApplicationAttemptId());
+
+    fcApp1 = iterator.next();
+    Assert.assertEquals(app1.getCurrentAppAttempt().getAppAttemptId(),
+        fcApp1.getApplicationAttemptId());
+
+    // verify for pending application iterator. It should be app-3 attempt
+    iterator = defaultQueue.getPendingApplications().iterator();
+    fcApp3 = iterator.next();
+    Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(),
+        fcApp3.getApplicationAttemptId());
+
+    rm2.stop();
+    rm1.stop();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/17d29b51/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 479e25a..d4b8dcd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -2413,14 +2413,16 @@ public class TestLeafQueue {
         TestUtils.getMockApplicationAttemptId(0, 0); 
     FiCaSchedulerApp app_0 = 
         spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
-            mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3)));
+            mock(ActiveUsersManager.class), spyRMContext,
+            Priority.newInstance(3), false));
     a.submitApplicationAttempt(app_0, user_0);
     
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     FiCaSchedulerApp app_1 = 
         spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
-            mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5)));
+            mock(ActiveUsersManager.class), spyRMContext,
+            Priority.newInstance(5), false));
     a.submitApplicationAttempt(app_1, user_0);
  
     Priority priority = TestUtils.createMockPriority(1);


Mime
View raw message