hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject hadoop git commit: YARN-3849. Too much of preemption activity causing continuos killing of containers across queues. (Sunil G via wangda)
Date Sat, 21 Nov 2015 00:17:33 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 9074714a1 -> 2ad1386b2


YARN-3849. Too much of preemption activity causing continuos killing of containers across
queues. (Sunil G via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2ad1386b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2ad1386b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2ad1386b

Branch: refs/heads/branch-2.7
Commit: 2ad1386b2b3d4602c787bd2cce455bec5f8ca666
Parents: 9074714
Author: Wangda Tan <wangda@apache.org>
Authored: Fri Nov 20 11:45:23 2015 -0800
Committer: Wangda Tan <wangda@apache.org>
Committed: Fri Nov 20 16:17:23 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../ProportionalCapacityPreemptionPolicy.java   |   3 +-
 ...estProportionalCapacityPreemptionPolicy.java | 241 ++++++++++++++-----
 3 files changed, 189 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ad1386b/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 1fe828f..8eea1e7 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -22,6 +22,9 @@ Release 2.7.3 - UNRELEASED
     YARN-4374. RM capacity scheduler UI rounds user limit factor (Chang Li via
     jlowe)
 
+    YARN-3849. Too much of preemption activity causing continuos killing of containers 
+    across queues. (Sunil G via wangda)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ad1386b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index abcb1a2..1fe38b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -742,12 +742,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     TempQueue ret;
     synchronized (root) {
       String queueName = root.getQueueName();
-      float absUsed = root.getAbsoluteUsedCapacity();
       float absCap = root.getAbsoluteCapacity();
       float absMaxCap = root.getAbsoluteMaximumCapacity();
       boolean preemptionDisabled = root.getPreemptionDisabled();
 
-      Resource current = Resources.multiply(clusterResources, absUsed);
+      Resource current = root.getQueueResourceUsage().getUsed();
       Resource guaranteed = Resources.multiply(clusterResources, absCap);
       Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ad1386b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 8e9545d..cc55eda 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
@@ -81,6 +82,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Before;
@@ -371,7 +373,7 @@ public class TestProportionalCapacityPreemptionPolicy {
             appA.getApplicationId(), appA.getAttemptId());
     assertTrue("appA should be running on queueB",
         mCS.getAppsInQueue("queueB").contains(expectedAttemptOnQueueB));
-    verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
 
     // Need to call setup() again to reset mDisp
     setup();
@@ -394,7 +396,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     // Resources should have come from queueE (appC) and neither of queueA's
     // children.
     verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
-    verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC)));
+    verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
   }
 
   @Test
@@ -469,8 +471,8 @@ public class TestProportionalCapacityPreemptionPolicy {
     // With all queues preemptable, resources should be taken from queueC(appA)
     // and queueD(appB). Resources taken more from queueD(appB) than
     // queueC(appA) because it's over its capacity by a larger percentage.
-    verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
-    verify(mDisp, times(182)).handle(argThat(new IsPreemptionRequestFor(appB)));
+    verify(mDisp, times(17)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    verify(mDisp, times(183)).handle(argThat(new IsPreemptionRequestFor(appB)));
 
     // Turn off preemption for queueA and it's children. queueF(appC)'s request
     // should starve.
@@ -634,7 +636,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     policy.editSchedule();
     // verify capacity taken from A1, not B1 despite B1 being far over
     // its absolute guaranteed capacity
-    verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
   }
 
   @Test
@@ -675,7 +677,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     // we verify both that C has priority on B and D (has it has >0 guarantees)
     // and that B and D are force to share their over capacity fairly (as they
     // are both zero-guarantees) hence D sees some of its containers preempted
-    verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC)));
+    verify(mDisp, times(15)).handle(argThat(new IsPreemptionRequestFor(appC)));
   }
   
   
@@ -702,8 +704,8 @@ public class TestProportionalCapacityPreemptionPolicy {
 
     // XXX note: compensating for rounding error in Resources.multiplyTo
     // which is likely triggered since we use small numbers for readability
-    verify(mDisp, times(7)).handle(argThat(new IsPreemptionRequestFor(appA)));
-    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appE)));
+    verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    verify(mDisp, times(6)).handle(argThat(new IsPreemptionRequestFor(appE)));
   }
 
   @Test
@@ -828,15 +830,15 @@ public class TestProportionalCapacityPreemptionPolicy {
     // By skipping AM Container and Labeled container, all other 18 containers
     // of appD will be
     // preempted
-    verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appD)));
+    verify(mDisp, times(18)).handle(argThat(new IsPreemptionRequestFor(appD)));
 
     // By skipping AM Container and Labeled container, all other 18 containers
     // of appC will be
     // preempted
-    verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appC)));
+    verify(mDisp, times(18)).handle(argThat(new IsPreemptionRequestFor(appC)));
 
     // rest 4 containers from appB will be preempted
-    verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB)));
+    verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
     setAMContainer = false;
     setLabeledContainer = false;
   }
@@ -911,6 +913,34 @@ public class TestProportionalCapacityPreemptionPolicy {
     setAMContainer = false;
   }
 
+  @Test
+  public void testPreemptionWithVCoreResource() {
+    int[][] qData = new int[][]{
+        // / A B
+        {100, 100, 100}, // maxcap
+        {5, 1, 1}, // apps
+        {2, 0, 0}, // subqueues
+    };
+
+    // Resources can be set like memory:vcores
+    String[][] resData = new String[][]{
+        // / A B
+        {"100:100", "50:50", "50:50"}, // abs
+        {"10:100", "10:100", "0"}, // used
+        {"70:20", "70:20", "10:100"}, // pending
+        {"0", "0", "0"}, // reserved
+        {"-1", "1:10", "1:10"}, // req granularity
+    };
+
+    // Passing last param as TRUE to use DominantResourceCalculator
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData, resData,
+        true);
+    policy.editSchedule();
+
+    // 5 containers will be preempted here
+    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA)));
+  }
+
   static class IsPreemptionRequestFor
       extends ArgumentMatcher<ContainerPreemptEvent> {
     private final ApplicationAttemptId appAttId;
@@ -937,37 +967,109 @@ public class TestProportionalCapacityPreemptionPolicy {
   ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
     ProportionalCapacityPreemptionPolicy policy =
       new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock);
+    clusterResources =
+        Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
     ParentQueue mRoot = buildMockRootQueue(rand, qData);
     when(mCS.getRootQueue()).thenReturn(mRoot);
 
-    clusterResources =
-      Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
-    when(mCS.getClusterResource()).thenReturn(clusterResources);
+    setResourceAndNodeDetails();
+    return policy;
+  }
+
+  ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
+      String[][] resData) {
+    return buildPolicy(qData, resData, false);
+  }
+
+  ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
+      String[][] resData, boolean useDominantResourceCalculator) {
+    if (useDominantResourceCalculator) {
+      when(mCS.getResourceCalculator()).thenReturn(
+          new DominantResourceCalculator());
+    }
+    ProportionalCapacityPreemptionPolicy policy = new ProportionalCapacityPreemptionPolicy(
+        conf, rmContext, mCS, mClock);
+    clusterResources = leafAbsCapacities(parseResourceDetails(resData[0]),
+        qData[2]);
+    ParentQueue mRoot = buildMockRootQueue(rand, resData, qData);
+    when(mCS.getRootQueue()).thenReturn(mRoot);
+
+    setResourceAndNodeDetails();
     return policy;
   }
 
+  private void setResourceAndNodeDetails() {
+    when(mCS.getClusterResource()).thenReturn(clusterResources);
+    when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
+        clusterResources);
+  }
+
   ParentQueue buildMockRootQueue(Random r, int[]... queueData) {
-    int[] abs      = queueData[0];
-    int[] maxCap   = queueData[1];
-    int[] used     = queueData[2];
-    int[] pending  = queueData[3];
-    int[] reserved = queueData[4];
-    int[] apps     = queueData[5];
-    int[] gran     = queueData[6];
-    int[] queues   = queueData[7];
-
-    return mockNested(abs, maxCap, used, pending,  reserved, apps, gran, queues);
+    Resource[] abs = generateResourceList(queueData[0]);
+    Resource[] used = generateResourceList(queueData[2]);
+    Resource[] pending = generateResourceList(queueData[3]);
+    Resource[] reserved = generateResourceList(queueData[4]);
+    Resource[] gran = generateResourceList(queueData[6]);
+    int[] maxCap = queueData[1];
+    int[] apps = queueData[5];
+    int[] queues = queueData[7];
+
+    return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
+  }
+
+  private ParentQueue buildMockRootQueue(Random rand2, String[][] resData,
+      int[][] queueData) {
+    Resource[] abs = parseResourceDetails(resData[0]);
+    Resource[] used = parseResourceDetails(resData[1]);
+    Resource[] pending = parseResourceDetails(resData[2]);
+    Resource[] reserved = parseResourceDetails(resData[3]);
+    Resource[] gran = parseResourceDetails(resData[4]);
+    int[] maxCap = queueData[0];
+    int[] apps = queueData[1];
+    int[] queues = queueData[2];
+
+    return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues);
   }
 
-  ParentQueue mockNested(int[] abs, int[] maxCap, int[] used,
-      int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) {
-    float tot = leafAbsCapacities(abs, queues);
+  Resource[] parseResourceDetails(String[] resData) {
+    List<Resource> resourceList = new ArrayList<Resource>();
+    for (int i = 0; i < resData.length; i++) {
+      String[] resource = resData[i].split(":");
+      if (resource.length == 1) {
+        resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]), 0));
+      } else {
+        resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]),
+            Integer.valueOf(resource[1])));
+      }
+    }
+    return resourceList.toArray(new Resource[resourceList.size()]);
+  }
+
+  Resource[] generateResourceList(int[] qData) {
+    List<Resource> resourceList = new ArrayList<Resource>();
+    for (int i = 0; i < qData.length; i++) {
+      resourceList.add(Resource.newInstance(qData[i], 0));
+    }
+    return resourceList.toArray(new Resource[resourceList.size()]);
+  }
+
+  ParentQueue mockNested(Resource[] abs, int[] maxCap, Resource[] used,
+      Resource[] pending, Resource[] reserved, int[] apps, Resource[] gran,
+      int[] queues) {
+    ResourceCalculator rc = mCS.getResourceCalculator();
+    Resource tot = leafAbsCapacities(abs, queues);
     Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
     ParentQueue root = mockParentQueue(null, queues[0], pqs);
+    ResourceUsage resUsage = new ResourceUsage();
+    resUsage.setUsed(used[0]);
     when(root.getQueueName()).thenReturn("/");
-    when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
-    when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
-    when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
+    when(root.getAbsoluteUsedCapacity()).thenReturn(
+        Resources.divide(rc, tot, used[0], tot));
+    when(root.getAbsoluteCapacity()).thenReturn(
+        Resources.divide(rc, tot, abs[0], tot));
+    when(root.getAbsoluteMaximumCapacity()).thenReturn(
+        maxCap[0] / (float) tot.getMemory());
+    when(root.getQueueResourceUsage()).thenReturn(resUsage);
     when(root.getQueuePath()).thenReturn("root");
     boolean preemptionDisabled = mockPreemptionStatus("root");
     when(root.getPreemptionDisabled()).thenReturn(preemptionDisabled);
@@ -978,14 +1080,20 @@ public class TestProportionalCapacityPreemptionPolicy {
       final String queueName = "queue" + ((char)('A' + i - 1));
       if (queues[i] > 0) {
         q = mockParentQueue(p, queues[i], pqs);
+        ResourceUsage resUsagePerQueue = new ResourceUsage();
+        resUsagePerQueue.setUsed(used[i]);
+        when(q.getQueueResourceUsage()).thenReturn(resUsagePerQueue);
       } else {
         q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran);
       }
       when(q.getParent()).thenReturn(p);
       when(q.getQueueName()).thenReturn(queueName);
-      when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
-      when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
-      when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
+      when(q.getAbsoluteUsedCapacity()).thenReturn(
+          Resources.divide(rc, tot, used[i], tot));
+      when(q.getAbsoluteCapacity()).thenReturn(
+          Resources.divide(rc, tot, abs[i], tot));
+      when(q.getAbsoluteMaximumCapacity()).thenReturn(
+          maxCap[i] / (float) tot.getMemory());
       String parentPathName = p.getQueuePath();
       parentPathName = (parentPathName == null) ? "root" : parentPathName;
       String queuePathName = (parentPathName+"."+queueName).replace("/","root");
@@ -1027,13 +1135,19 @@ public class TestProportionalCapacityPreemptionPolicy {
     return pq;
   }
 
-  LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, 
-      int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
+  LeafQueue mockLeafQueue(ParentQueue p, Resource tot, int i, Resource[] abs,
+      Resource[] used, Resource[] pending, Resource[] reserved, int[] apps,
+      Resource[] gran) {
     LeafQueue lq = mock(LeafQueue.class);
+    ResourceCalculator rc = mCS.getResourceCalculator();
     List<ApplicationAttemptId> appAttemptIdList = 
         new ArrayList<ApplicationAttemptId>();
-    when(lq.getTotalResourcePending()).thenReturn(
-        Resource.newInstance(pending[i], 0));
+    when(lq.getTotalResourcePending()).thenReturn(pending[i]);
+    // need to set pending resource in resource usage as well
+    ResourceUsage ru = new ResourceUsage();
+    ru.setPending(pending[i]);
+    ru.setUsed(used[i]);
+    when(lq.getQueueResourceUsage()).thenReturn(ru);
     // consider moving where CapacityScheduler::comparator accessible
     NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
       new Comparator<FiCaSchedulerApp>() {
@@ -1045,9 +1159,9 @@ public class TestProportionalCapacityPreemptionPolicy {
       });
     // applications are added in global L->R order in queues
     if (apps[i] != 0) {
-      int aUsed    = used[i] / apps[i];
-      int aPending = pending[i] / apps[i];
-      int aReserve = reserved[i] / apps[i];
+      Resource aUsed = Resources.divideAndCeil(rc, used[i], apps[i]);
+      Resource aPending = Resources.divideAndCeil(rc, pending[i], apps[i]);
+      Resource aReserve = Resources.divideAndCeil(rc, reserved[i], apps[i]);
       for (int a = 0; a < apps[i]; ++a) {
         FiCaSchedulerApp mockFiCaApp =
             mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]);
@@ -1066,9 +1180,10 @@ public class TestProportionalCapacityPreemptionPolicy {
     return lq;
   }
 
-  FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved,
-      int gran) {
+  FiCaSchedulerApp mockApp(int qid, int id, Resource used, Resource pending,
+      Resource reserved, Resource gran) {
     FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
+    ResourceCalculator rc = mCS.getResourceCalculator();
 
     ApplicationId appId = ApplicationId.newInstance(TS, id);
     ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0);
@@ -1076,30 +1191,34 @@ public class TestProportionalCapacityPreemptionPolicy {
     when(app.getApplicationAttemptId()).thenReturn(appAttId);
 
     int cAlloc = 0;
-    Resource unit = Resource.newInstance(gran, 0);
+    Resource unit = gran;
     List<RMContainer> cReserved = new ArrayList<RMContainer>();
-    for (int i = 0; i < reserved; i += gran) {
-      cReserved.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER
-          .getValue()));
+    Resource resIter = Resource.newInstance(0, 0);
+    for (; Resources.lessThan(rc, clusterResources, resIter, reserved); Resources
+        .addTo(resIter, gran)) {
+      cReserved.add(mockContainer(appAttId, cAlloc, unit,
+          priority.CONTAINER.getValue()));
       ++cAlloc;
     }
     when(app.getReservedContainers()).thenReturn(cReserved);
 
     List<RMContainer> cLive = new ArrayList<RMContainer>();
-    for (int i = 0; i < used; i += gran) {
-      if(setAMContainer && i == 0){
-        cLive.add(mockContainer(appAttId, cAlloc, unit, priority.AMCONTAINER
-            .getValue()));
-      }else if(setLabeledContainer && i ==1){
+    Resource usedIter = Resource.newInstance(0, 0);
+    int i = 0;
+    for (; Resources.lessThan(rc, clusterResources, usedIter, used); Resources
+        .addTo(usedIter, gran)) {
+      if (setAMContainer && i == 0) {
+        cLive.add(mockContainer(appAttId, cAlloc, unit,
+            priority.AMCONTAINER.getValue()));
+      } else if (setLabeledContainer && i == 1) {
         cLive.add(mockContainer(appAttId, cAlloc, unit,
             priority.LABELEDCONTAINER.getValue()));
-        ++used;
-      }
-      else{
-        cLive.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER
-            .getValue()));
+      } else {
+        cLive.add(mockContainer(appAttId, cAlloc, unit,
+            priority.CONTAINER.getValue()));
       }
       ++cAlloc;
+      ++i;
     }
     when(app.getLiveContainers()).thenReturn(cLive);
     return app;
@@ -1134,6 +1253,16 @@ public class TestProportionalCapacityPreemptionPolicy {
     return ret;
   }
 
+  static Resource leafAbsCapacities(Resource[] abs, int[] subqueues) {
+    Resource ret = Resource.newInstance(0, 0);
+    for (int i = 0; i < abs.length; ++i) {
+      if (0 == subqueues[i]) {
+        Resources.addTo(ret, abs[i]);
+      }
+    }
+    return ret;
+  }
+
   void printString(CSQueue nq, String indent) {
     if (nq instanceof ParentQueue) {
       System.out.println(indent + nq.getQueueName()


Mime
View raw message