hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject git commit: YARN-2644. Fixed CapacityScheduler to return up-to-date headroom when AM allocates. Contributed by Craig Welch (cherry picked from commit 519e5a7dd2bd540105434ec3c8939b68f6c024f8)
Date Mon, 06 Oct 2014 22:51:46 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 eeb39dc21 -> 5c33e9122


YARN-2644. Fixed CapacityScheduler to return up-to-date headroom when AM allocates. Contributed
by Craig Welch
(cherry picked from commit 519e5a7dd2bd540105434ec3c8939b68f6c024f8)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5c33e912
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5c33e912
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5c33e912

Branch: refs/heads/branch-2
Commit: 5c33e9122900ee2b5d90eff39ac924a7217e74e3
Parents: eeb39dc
Author: Jian He <jianhe@apache.org>
Authored: Mon Oct 6 15:47:48 2014 -0700
Committer: Jian He <jianhe@apache.org>
Committed: Mon Oct 6 15:51:38 2014 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../capacity/CapacityHeadroomProvider.java      | 65 +++++++++++++
 .../scheduler/capacity/CapacityScheduler.java   |  4 +
 .../scheduler/capacity/LeafQueue.java           | 74 ++++++++++++---
 .../scheduler/common/fica/FiCaSchedulerApp.java | 28 ++++++
 .../capacity/TestApplicationLimits.java         | 18 ++--
 .../scheduler/capacity/TestLeafQueue.java       | 98 +++++++++++++++++++-
 7 files changed, 263 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c33e912/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8f97018..19a56d9 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -552,6 +552,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2611. Fixing jenkins findbugs warning and TestRMWebServicesCapacitySched
     for branch YARN-1051. (Subru Krishnan and Carlo Curino via subru)
 
+    YARN-2644. Fixed CapacityScheduler to return up-to-date headroom when
+    AM allocates. (Craig Welch via jianhe)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c33e912/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
new file mode 100644
index 0000000..f79d195
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public class CapacityHeadroomProvider {
+  
+  LeafQueue.User user;
+  LeafQueue queue;
+  FiCaSchedulerApp application;
+  Resource required;
+  LeafQueue.QueueHeadroomInfo queueHeadroomInfo;
+  
+  public CapacityHeadroomProvider(
+    LeafQueue.User user,
+    LeafQueue queue,
+    FiCaSchedulerApp application,
+    Resource required,
+    LeafQueue.QueueHeadroomInfo queueHeadroomInfo) {
+    
+    this.user = user;
+    this.queue = queue;
+    this.application = application;
+    this.required = required;
+    this.queueHeadroomInfo = queueHeadroomInfo;
+    
+  }
+  
+  public Resource getHeadroom() {
+    
+    Resource queueMaxCap;
+    Resource clusterResource;
+    synchronized (queueHeadroomInfo) {
+      queueMaxCap = queueHeadroomInfo.getQueueMaxCap();
+      clusterResource = queueHeadroomInfo.getClusterResource();
+    }
+    Resource headroom = queue.getHeadroom(user, queueMaxCap, 
+      clusterResource, application, required);
+    
+    // Corner case to deal with applications being slightly over-limit
+    if (headroom.getMemory() < 0) {
+      headroom.setMemory(0);
+    }
+    return headroom;
+  
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c33e912/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 6a3c7dc..02f27b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -469,6 +469,10 @@ public class CapacityScheduler extends
     // Re-configure queues
     root.reinitialize(newRoot, clusterResource);
     initializeQueueMappings();
+    
+    // Re-calculate headroom for active applications
+    root.updateClusterResource(clusterResource);
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c33e912/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index f0cff71..57f0907 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -132,6 +132,8 @@ public class LeafQueue implements CSQueue {
   
   private boolean reservationsContinueLooking;
   
+  private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo();
+  
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, CSQueue parent, CSQueue old) {
     this.scheduler = cs;
@@ -970,6 +972,22 @@ public class LeafQueue implements CSQueue {
     // "re-reservation" is *free*
     return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
   }
+  
+  protected Resource getHeadroom(User user, Resource queueMaxCap,
+      Resource clusterResource, FiCaSchedulerApp application, Resource required) {
+    return getHeadroom(user, queueMaxCap, clusterResource,
+	  computeUserLimit(application, clusterResource, required, user));
+  }
+  
+  private Resource getHeadroom(User user, Resource queueMaxCap,
+      Resource clusterResource, Resource userLimit) {
+    Resource headroom = 
+        Resources.subtract(
+            Resources.min(resourceCalculator, clusterResource, 
+                userLimit, queueMaxCap), 
+            user.getConsumedResources());
+    return headroom;
+  }
 
 
   @Private
@@ -1038,12 +1056,14 @@ public class LeafQueue implements CSQueue {
     
     String user = application.getUser();
     
+    User queueUser = getUser(user);
+    
     /** 
      * Headroom is min((userLimit, queue-max-cap) - consumed)
      */
 
     Resource userLimit =                          // User limit
-        computeUserLimit(application, clusterResource, required);
+        computeUserLimit(application, clusterResource, required, queueUser);
 
     //Max avail capacity needs to take into account usage by ancestor-siblings
     //which are greater than their base capacity, so we are interested in "max avail"
@@ -1057,23 +1077,27 @@ public class LeafQueue implements CSQueue {
             clusterResource, 
             absoluteMaxAvailCapacity,
             minimumAllocation);
+	
+    synchronized (queueHeadroomInfo) {
+      queueHeadroomInfo.setQueueMaxCap(queueMaxCap);
+      queueHeadroomInfo.setClusterResource(clusterResource);
+    }
     
-    Resource userConsumed = getUser(user).getConsumedResources(); 
-    Resource headroom = 
-        Resources.subtract(
-            Resources.min(resourceCalculator, clusterResource, 
-                userLimit, queueMaxCap), 
-            userConsumed);
+    Resource headroom = getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit);
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("Headroom calculation for user " + user + ": " + 
           " userLimit=" + userLimit + 
           " queueMaxCap=" + queueMaxCap + 
-          " consumed=" + userConsumed + 
+          " consumed=" + queueUser.getConsumedResources() + 
           " headroom=" + headroom);
     }
     
-    application.setHeadroom(headroom);
+    CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
+      queueUser, this, application, required, queueHeadroomInfo);
+    
+    application.setHeadroomProvider(headroomProvider);
+
     metrics.setAvailableResourcesToUser(user, headroom);
     
     return userLimit;
@@ -1081,7 +1105,7 @@ public class LeafQueue implements CSQueue {
   
   @Lock(NoLock.class)
   private Resource computeUserLimit(FiCaSchedulerApp application, 
-      Resource clusterResource, Resource required) {
+      Resource clusterResource, Resource required, User user) {
     // What is our current capacity? 
     // * It is equal to the max(required, queue-capacity) if
     //   we're running below capacity. The 'max' ensures that jobs in queues
@@ -1138,7 +1162,7 @@ public class LeafQueue implements CSQueue {
           " userLimit=" + userLimit +
           " userLimitFactor=" + userLimitFactor +
           " required: " + required + 
-          " consumed: " + getUser(userName).getConsumedResources() + 
+          " consumed: " + user.getConsumedResources() + 
           " limit: " + limit +
           " queueCapacity: " + queueCapacity + 
           " qconsumed: " + usedResources +
@@ -1687,9 +1711,6 @@ public class LeafQueue implements CSQueue {
     String userName = application.getUser();
     User user = getUser(userName);
     user.assignContainer(resource);
-    // Note this is a bit unconventional since it gets the object and modifies it here
-    // rather then using set routine
-    Resources.subtractFrom(application.getHeadroom(), resource); // headroom
     metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
     
     if (LOG.isDebugEnabled()) {
@@ -1896,4 +1917,29 @@ public class LeafQueue implements CSQueue {
   public void setMaxApplications(int maxApplications) {
     this.maxApplications = maxApplications;
   }
+  
+  /*
+   * Holds shared values used by all applications in
+   * the queue to calculate headroom on demand
+   */
+  static class QueueHeadroomInfo {
+    private Resource queueMaxCap;
+    private Resource clusterResource;
+    
+    public void setQueueMaxCap(Resource queueMaxCap) {
+      this.queueMaxCap = queueMaxCap;
+    }
+    
+    public Resource getQueueMaxCap() {
+      return queueMaxCap;
+    }
+    
+    public void setClusterResource(Resource clusterResource) {
+      this.clusterResource = clusterResource;
+    }
+    
+    public Resource getClusterResource() {
+      return clusterResource;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c33e912/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index dc0d0f0..2f9569c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
 
 /**
  * Represents an application attempt from the viewpoint of the FIFO or Capacity
@@ -64,6 +65,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
   private final Set<ContainerId> containersToPreempt =
     new HashSet<ContainerId>();
+    
+  private CapacityHeadroomProvider headroomProvider;
 
   public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, 
       String user, Queue queue, ActiveUsersManager activeUsersManager,
@@ -280,6 +283,31 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     }
     return null;
   }
+  
+  public synchronized void setHeadroomProvider(
+    CapacityHeadroomProvider headroomProvider) {
+    this.headroomProvider = headroomProvider;
+  }
+
+  public synchronized CapacityHeadroomProvider getHeadroomProvider() {
+    return headroomProvider;
+  }
+  
+  @Override
+  public synchronized Resource getHeadroom() {
+    if (headroomProvider != null) {
+      return headroomProvider.getHeadroom();
+    }
+    return super.getHeadroom();
+  }
+  
+  @Override
+  public synchronized void transferStateFromPreviousAttempt(
+      SchedulerApplicationAttempt appAttempt) {
+    super.transferStateFromPreviousAttempt(appAttempt);
+    this.headroomProvider = 
+      ((FiCaSchedulerApp) appAttempt).getHeadroomProvider();
+  }
 
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c33e912/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index ff8e873..b922c02 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -518,7 +518,7 @@ public class TestApplicationLimits {
     // Schedule to compute 
     queue.assignContainers(clusterResource, node_0, false);
     Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
-    verify(app_0_0).setHeadroom(eq(expectedHeadroom));
+    assertEquals(expectedHeadroom, app_0_0.getHeadroom());
 
     // Submit second application from user_0, check headroom
     final ApplicationAttemptId appAttemptId_0_1 = 
@@ -536,8 +536,8 @@ public class TestApplicationLimits {
 
     // Schedule to compute 
     queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
-    verify(app_0_0, times(2)).setHeadroom(eq(expectedHeadroom));
-    verify(app_0_1).setHeadroom(eq(expectedHeadroom));// no change
+    assertEquals(expectedHeadroom, app_0_0.getHeadroom());
+    assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
     
     // Submit first application from user_1, check  for new headroom
     final ApplicationAttemptId appAttemptId_1_0 = 
@@ -556,17 +556,17 @@ public class TestApplicationLimits {
     // Schedule to compute 
     queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
     expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
-    verify(app_0_0).setHeadroom(eq(expectedHeadroom));
-    verify(app_0_1).setHeadroom(eq(expectedHeadroom));
-    verify(app_1_0).setHeadroom(eq(expectedHeadroom));
+    assertEquals(expectedHeadroom, app_0_0.getHeadroom());
+    assertEquals(expectedHeadroom, app_0_1.getHeadroom());
+    assertEquals(expectedHeadroom, app_1_0.getHeadroom());
 
     // Now reduce cluster size and check for the smaller headroom
     clusterResource = Resources.createResource(90*16*GB);
     queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
     expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
-    verify(app_0_0).setHeadroom(eq(expectedHeadroom));
-    verify(app_0_1).setHeadroom(eq(expectedHeadroom));
-    verify(app_1_0).setHeadroom(eq(expectedHeadroom));
+    assertEquals(expectedHeadroom, app_0_0.getHeadroom());
+    assertEquals(expectedHeadroom, app_0_1.getHeadroom());
+    assertEquals(expectedHeadroom, app_1_0.getHeadroom());
   }
   
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c33e912/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 092ff83..9e06c52 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -640,6 +640,94 @@ public class TestLeafQueue {
   }
   
   @Test
+  public void testUserHeadroomMultiApp() throws Exception {
+    // Mock the queue
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+    //unset maxCapacity
+    a.setMaxCapacity(1.0f);
+
+    // Users
+    final String user_0 = "user_0";
+    final String user_1 = "user_1";
+
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 =
+        TestUtils.getMockApplicationAttemptId(0, 0);
+    FiCaSchedulerApp app_0 =
+        new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+            a.getActiveUsersManager(), spyRMContext);
+    a.submitApplicationAttempt(app_0, user_0);
+
+    final ApplicationAttemptId appAttemptId_1 =
+        TestUtils.getMockApplicationAttemptId(1, 0);
+    FiCaSchedulerApp app_1 =
+        new FiCaSchedulerApp(appAttemptId_1, user_0, a,
+            a.getActiveUsersManager(), spyRMContext);
+    a.submitApplicationAttempt(app_1, user_0);  // same user
+
+    final ApplicationAttemptId appAttemptId_2 =
+        TestUtils.getMockApplicationAttemptId(2, 0);
+    FiCaSchedulerApp app_2 =
+        new FiCaSchedulerApp(appAttemptId_2, user_1, a,
+            a.getActiveUsersManager(), spyRMContext);
+    a.submitApplicationAttempt(app_2, user_1);
+
+    // Setup some nodes
+    String host_0 = "127.0.0.1";
+    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 
+      0, 16*GB);
+    String host_1 = "127.0.0.2";
+    FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 
+      0, 16*GB);
+
+    final int numNodes = 2;
+    Resource clusterResource = Resources.createResource(numNodes * (16*GB), 1);
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+    Priority priority = TestUtils.createMockPriority(1);
+
+    app_0.updateResourceRequests(Collections.singletonList(
+            TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
+                priority, recordFactory)));
+
+    a.assignContainers(clusterResource, node_0, false);
+    assertEquals(1*GB, a.getUsedResources().getMemory());
+    assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+    //Now, headroom is the same for all apps for a given user + queue combo
+    //and a change to any app's headroom is reflected for all the user's apps
+    //once those apps are active/have themselves calculated headroom for 
+    //allocation at least one time
+    assertEquals(2*GB, app_0.getHeadroom().getMemory());
+    assertEquals(0*GB, app_1.getHeadroom().getMemory());//not yet active
+    assertEquals(0*GB, app_2.getHeadroom().getMemory());//not yet active
+
+    app_1.updateResourceRequests(Collections.singletonList(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
+            priority, recordFactory)));
+
+    a.assignContainers(clusterResource, node_0, false);
+    assertEquals(2*GB, a.getUsedResources().getMemory());
+    assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(1*GB, app_0.getHeadroom().getMemory());
+    assertEquals(1*GB, app_1.getHeadroom().getMemory());//now active
+    assertEquals(0*GB, app_2.getHeadroom().getMemory());//not yet active
+
+    //Complete container and verify that headroom is updated, for both apps 
+    //for the user
+    RMContainer rmContainer = app_0.getLiveContainers().iterator().next();
+    a.completedContainer(clusterResource, app_0, node_0, rmContainer,
+    ContainerStatus.newInstance(rmContainer.getContainerId(),
+	ContainerState.COMPLETE, "",
+	ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
+    RMContainerEventType.KILL, null, true);
+
+    assertEquals(2*GB, app_0.getHeadroom().getMemory());
+    assertEquals(2*GB, app_1.getHeadroom().getMemory());
+  }
+
+  @Test
   public void testHeadroomWithMaxCap() throws Exception {
     // Mock the queue
     LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
@@ -710,16 +798,18 @@ public class TestLeafQueue {
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
-    assertEquals(0*GB, app_0.getHeadroom().getMemory()); // User limit = 2G
-    assertEquals(0*GB, app_1.getHeadroom().getMemory()); // User limit = 2G
+    assertEquals(2*GB, app_0.getHeadroom().getMemory()); 
+      // User limit = 4G, 2 in use
+    assertEquals(0*GB, app_1.getHeadroom().getMemory()); 
+      // the application is not yet active
 
     // Again one to user_0 since he hasn't exceeded user limit yet
     a.assignContainers(clusterResource, node_0, false);
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
-    assertEquals(0*GB, app_0.getHeadroom().getMemory()); // 3G - 2G
-    assertEquals(0*GB, app_1.getHeadroom().getMemory()); // 3G - 2G
+    assertEquals(1*GB, app_0.getHeadroom().getMemory()); // 4G - 3G
+    assertEquals(1*GB, app_1.getHeadroom().getMemory()); // 4G - 3G
     
     // Submit requests for app_1 and set max-cap
     a.setMaxCapacity(.1f);


Mime
View raw message