hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [16/50] [abbrv] hadoop git commit: YARN-4390. Do surgical preemption based on reserved container in CapacityScheduler. Contributed by Wangda Tan
Date Mon, 09 May 2016 18:30:47 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
new file mode 100644
index 0000000..08042b5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestProportionalCapacityPreemptionPolicyMockFramework
+    extends ProportionalCapacityPreemptionPolicyMockFramework {
+
+  @Test
+  public void testBuilder() throws Exception {
+    /**
+     * Test of test, make sure we build expected mock schedulable objects
+     */
+    String labelsConfig =
+        "=200,true;" + // default partition
+            "red=100,false;" + // partition=red
+            "blue=200,true"; // partition=blue
+    String nodesConfig =
+        "n1=red;" + // n1 has partition=red
+            "n2=blue;" + // n2 has partition=blue
+            "n3="; // n3 doesn't have partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[200 200 100 100],red=[100 100 100 100],blue=[200 200 200 200]);" + //root
+            "-a(=[100 200 100 100],red=[0 0 0 0],blue=[200 200 200 200]);" + // a
+            "--a1(=[50 100 50 100],red=[0 0 0 0],blue=[100 200 200 0]);" + // a1
+            "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]);" + // a2
+            "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])";
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated)
+        "a1\t" // app1 in a1
+            + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+            "a1\t" // app2 in a1
+            + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved),
+            // 50 * ignore-exclusivity (allocated)
+            + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved),
+            // 50 in n2 (allocated)
+            "a2\t" // app3 in a2
+            + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+            "b\t" // app4 in b
+            + "(1,1,n1,red,100,false);";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+
+    // Check queues:
+    // root
+    checkAbsCapacities(cs.getQueue("root"), "", 1f, 1f, 0.5f);
+    checkPendingResource(cs.getQueue("root"), "", 100);
+    checkAbsCapacities(cs.getQueue("root"), "red", 1f, 1f, 1f);
+    checkPendingResource(cs.getQueue("root"), "red", 100);
+    checkAbsCapacities(cs.getQueue("root"), "blue", 1f, 1f, 1f);
+    checkPendingResource(cs.getQueue("root"), "blue", 200);
+
+    // a
+    checkAbsCapacities(cs.getQueue("a"), "", 0.5f, 1f, 0.5f);
+    checkPendingResource(cs.getQueue("a"), "", 100);
+    checkAbsCapacities(cs.getQueue("a"), "red", 0f, 0f, 0f);
+    checkPendingResource(cs.getQueue("a"), "red", 0);
+    checkAbsCapacities(cs.getQueue("a"), "blue", 1f, 1f, 1f);
+    checkPendingResource(cs.getQueue("a"), "blue", 200);
+
+    // a1
+    checkAbsCapacities(cs.getQueue("a1"), "", 0.25f, 0.5f, 0.25f);
+    checkPendingResource(cs.getQueue("a1"), "", 100);
+    checkAbsCapacities(cs.getQueue("a1"), "red", 0f, 0f, 0f);
+    checkPendingResource(cs.getQueue("a1"), "red", 0);
+    checkAbsCapacities(cs.getQueue("a1"), "blue", 0.5f, 1f, 1f);
+    checkPendingResource(cs.getQueue("a1"), "blue", 0);
+
+    // a2
+    checkAbsCapacities(cs.getQueue("a2"), "", 0.25f, 1f, 0.25f);
+    checkPendingResource(cs.getQueue("a2"), "", 0);
+    checkAbsCapacities(cs.getQueue("a2"), "red", 0f, 0f, 0f);
+    checkPendingResource(cs.getQueue("a2"), "red", 0);
+    checkAbsCapacities(cs.getQueue("a2"), "blue", 0.5f, 1f, 0f);
+    checkPendingResource(cs.getQueue("a2"), "blue", 200);
+
+    // b1
+    checkAbsCapacities(cs.getQueue("b"), "", 0.5f, 1f, 0f);
+    checkPendingResource(cs.getQueue("b"), "", 0);
+    checkAbsCapacities(cs.getQueue("b"), "red", 1f, 1f, 1f);
+    checkPendingResource(cs.getQueue("b"), "red", 100);
+    checkAbsCapacities(cs.getQueue("b"), "blue", 0f, 0f, 0f);
+    checkPendingResource(cs.getQueue("b"), "blue", 0);
+
+    // Check ignored partitioned containers in queue
+    Assert.assertEquals(100, ((LeafQueue) cs.getQueue("a1"))
+        .getIgnoreExclusivityRMContainers().get("blue").size());
+
+    // Check applications
+    Assert.assertEquals(2, ((LeafQueue)cs.getQueue("a1")).getApplications().size());
+    Assert.assertEquals(1, ((LeafQueue)cs.getQueue("a2")).getApplications().size());
+    Assert.assertEquals(1, ((LeafQueue)cs.getQueue("b")).getApplications().size());
+
+    // Check #containers
+    FiCaSchedulerApp app1 = getApp("a1", 1);
+    FiCaSchedulerApp app2 = getApp("a1", 2);
+    FiCaSchedulerApp app3 = getApp("a2", 3);
+    FiCaSchedulerApp app4 = getApp("b", 4);
+
+    Assert.assertEquals(50, app1.getLiveContainers().size());
+    checkContainerNodesInApp(app1, 50, "n3");
+
+    Assert.assertEquals(50, app2.getLiveContainers().size());
+    Assert.assertEquals(150, app2.getReservedContainers().size());
+    checkContainerNodesInApp(app2, 200, "n2");
+
+    Assert.assertEquals(50, app3.getLiveContainers().size());
+    checkContainerNodesInApp(app3, 50, "n3");
+
+    Assert.assertEquals(100, app4.getLiveContainers().size());
+    checkContainerNodesInApp(app4, 100, "n1");
+  }
+
+  @Test
+  public void testBuilderWithReservedResource() throws Exception {
+    String labelsConfig =
+        "=200,true;" + // default partition
+            "red=100,false;" + // partition=red
+            "blue=200,true"; // partition=blue
+    String nodesConfig =
+        "n1=red;" + // n1 has partition=red
+            "n2=blue;" + // n2 has partition=blue
+            "n3="; // n3 doesn't have partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[200 200 100 100 100],red=[100 100 100 100 90],blue=[200 200 200 200 80]);" + //root
+            "-a(=[100 200 100 100 50],red=[0 0 0 0 40],blue=[200 200 200 200 30]);" + // a
+            "--a1(=[50 100 50 100 40],red=[0 0 0 0 20],blue=[100 200 200 0]);" + // a1
+            "--a2(=[50 200 50 0 10],red=[0 0 0 0 20],blue=[100 200 0 200]);" + // a2
+            "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])";
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated)
+        "a1\t" // app1 in a1
+            + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+            "a1\t" // app2 in a1
+            + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved),
+            // 50 * ignore-exclusivity (allocated)
+            + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved),
+            // 50 in n2 (allocated)
+            "a2\t" // app3 in a2
+            + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+            "b\t" // app4 in b
+            + "(1,1,n1,red,100,false);";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+
+    // Check queues:
+    // root
+    checkReservedResource(cs.getQueue("root"), "", 100);
+    checkReservedResource(cs.getQueue("root"), "red", 90);
+
+    // a
+    checkReservedResource(cs.getQueue("a"), "", 50);
+    checkReservedResource(cs.getQueue("a"), "red", 40);
+
+    // a1
+    checkReservedResource(cs.getQueue("a1"), "", 40);
+    checkReservedResource(cs.getQueue("a1"), "red", 20);
+
+    // b
+    checkReservedResource(cs.getQueue("b"), "", 0);
+    checkReservedResource(cs.getQueue("b"), "red", 0);
+  }
+
+  @Test
+  public void testBuilderWithSpecifiedNodeResources() throws Exception {
+    String labelsConfig =
+        "=200,true;" + // default partition
+            "red=100,false;" + // partition=red
+            "blue=200,true"; // partition=blue
+    String nodesConfig =
+        "n1=red res=100;" + // n1 has partition=red
+            "n2=blue;" + // n2 has partition=blue
+            "n3= res=30"; // n3 doesn't have partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[200 200 100 100 100],red=[100 100 100 100 90],blue=[200 200 200 200 80]);" + //root
+            "-a(=[100 200 100 100 50],red=[0 0 0 0 40],blue=[200 200 200 200 30]);" + // a
+            "--a1(=[50 100 50 100 40],red=[0 0 0 0 20],blue=[100 200 200 0]);" + // a1
+            "--a2(=[50 200 50 0 10],red=[0 0 0 0 20],blue=[100 200 0 200]);" + // a2
+            "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])";
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated)
+        "a1\t" // app1 in a1
+            + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+            "a1\t" // app2 in a1
+            + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved),
+            // 50 * ignore-exclusivity (allocated)
+            + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved),
+            // 50 in n2 (allocated)
+            "a2\t" // app3 in a2
+            + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+            "b\t" // app4 in b
+            + "(1,1,n1,red,100,false);";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+
+    // Check host resources
+    Assert.assertEquals(3, this.cs.getAllNodes().size());
+    SchedulerNode node1 = cs.getSchedulerNode(NodeId.newInstance("n1", 1));
+    Assert.assertEquals(100, node1.getTotalResource().getMemory());
+    Assert.assertEquals(100, node1.getCopiedListOfRunningContainers().size());
+    Assert.assertNull(node1.getReservedContainer());
+
+    SchedulerNode node2 = cs.getSchedulerNode(NodeId.newInstance("n2", 1));
+    Assert.assertEquals(0, node2.getTotalResource().getMemory());
+    Assert.assertEquals(50, node2.getCopiedListOfRunningContainers().size());
+    Assert.assertNotNull(node2.getReservedContainer());
+
+    SchedulerNode node3 = cs.getSchedulerNode(NodeId.newInstance("n3", 1));
+    Assert.assertEquals(30, node3.getTotalResource().getMemory());
+    Assert.assertEquals(100, node3.getCopiedListOfRunningContainers().size());
+    Assert.assertNull(node3.getReservedContainer());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
index 88216f8..54166c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.junit.After;
@@ -144,7 +145,7 @@ public class TestSchedulerApplicationAttempt {
   private RMContainer createRMContainer(ApplicationAttemptId appAttId, int id,
       Resource resource) {
     ContainerId containerId = ContainerId.newContainerId(appAttId, id);
-    RMContainer rmContainer = mock(RMContainer.class);
+    RMContainer rmContainer = mock(RMContainerImpl.class);
     Container container = mock(Container.class);
     when(container.getResource()).thenReturn(resource);
     when(container.getNodeId()).thenReturn(nodeId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
new file mode 100644
index 0000000..bd9f615
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.Clock;
+import org.junit.Assert;
+import org.junit.Before;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CapacitySchedulerPreemptionTestBase {
+
+  final int GB = 1024;
+
+  Configuration conf;
+
+  RMNodeLabelsManager mgr;
+
+  Clock clock;
+
+  @Before
+  void setUp() throws Exception {
+    conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+        ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
+    conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
+
+    // Set preemption related configurations
+    conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
+        0);
+    conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+        1.0f);
+    conf.setFloat(
+        CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
+        1.0f);
+    conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
+        60000L);
+    mgr = new NullRMNodeLabelsManager();
+    mgr.init(this.conf);
+    clock = mock(Clock.class);
+    when(clock.getTime()).thenReturn(0L);
+  }
+
+  SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) {
+    ResourceManager.RMActiveServices activeServices = rm.getRMActiveService();
+    SchedulingMonitor mon = null;
+    for (Service service : activeServices.getServices()) {
+      if (service instanceof SchedulingMonitor) {
+        mon = (SchedulingMonitor) service;
+        break;
+      }
+    }
+
+    if (mon != null) {
+      return mon.getSchedulingEditPolicy();
+    }
+    return null;
+  }
+
+  public void waitNumberOfLiveContainersFromApp(FiCaSchedulerApp app,
+      int expected) throws InterruptedException {
+    int waitNum = 0;
+
+    while (waitNum < 10) {
+      System.out.println(app.getLiveContainers().size());
+      if (app.getLiveContainers().size() == expected) {
+        return;
+      }
+      Thread.sleep(100);
+      waitNum++;
+    }
+
+    Assert.fail();
+  }
+
+  public void waitNumberOfReservedContainersFromApp(FiCaSchedulerApp app,
+      int expected) throws InterruptedException {
+    int waitNum = 0;
+
+    while (waitNum < 10) {
+      System.out.println(app.getReservedContainers().size());
+      if (app.getReservedContainers().size() == expected) {
+        return;
+      }
+      Thread.sleep(100);
+      waitNum++;
+    }
+
+    Assert.fail();
+  }
+
+  public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node,
+      ApplicationAttemptId appId, int expected) throws InterruptedException {
+    int waitNum = 0;
+
+    while (waitNum < 500) {
+      int total = 0;
+      for (RMContainer c : node.getCopiedListOfRunningContainers()) {
+        if (c.getApplicationAttemptId().equals(appId)) {
+          total++;
+        }
+      }
+      if (total == expected) {
+        return;
+      }
+      Thread.sleep(10);
+      waitNum++;
+    }
+
+    Assert.fail();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index fba8d2d..16063b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -3302,7 +3302,7 @@ public class TestCapacityScheduler {
           resourceManager
               .getResourceScheduler()
               .getSchedulerNode(resourceEvent.getNodeId())
-              .setTotalResource(resourceEvent.getResourceOption().getResource());
+              .updateTotalResource(resourceEvent.getResourceOption().getResource());
         }
       }
     });

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java
new file mode 100644
index 0000000..b649fc9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java
@@ -0,0 +1,638 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestCapacitySchedulerLazyPreemption
+    extends CapacitySchedulerPreemptionTestBase {
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
+        true);
+  }
+
+  @Test (timeout = 60000)
+  public void testSimplePreemption() throws Exception {
+    /**
+     * Test case: Submit two application (app1/app2) to different queues, queue
+     * structure:
+     *
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * </pre>
+     *
+     * 1) Two nodes in the cluster, each of them has 4G.
+     *
+     * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
+     * more resource available.
+     *
+     * 3) app2 submit to queue-c, ask for one 1G container (for AM)
+     *
+     * Now the cluster is fulfilled.
+     *
+     * 4) app2 asks for another 1G container, system will preempt one container
+     * from app1, and app2 will receive the preempted container
+     */
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+
+    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>());
+
+    // Do allocation 3 times for node1/node2
+    for (int i = 0; i < 3; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    // App1 should have 7 containers now, and no available resource for cluster
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // NM1/NM2 has available resource = 0G
+    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+        .getUnallocatedResource().getMemory());
+    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+        .getUnallocatedResource().getMemory());
+
+    // AM asks for a 1 * GB container
+    am2.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+            Resources.createResource(1 * GB), 1)), null);
+
+    // Get edit policy and do one update
+    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+    // Call edit schedule twice, and check if one container from app1 marked
+    // to be "killable"
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    PreemptionManager pm = cs.getPreemptionManager();
+    Map<ContainerId, RMContainer> killableContainers =
+        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+    Assert.assertEquals(1, killableContainers.size());
+    Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
+        .getApplicationAttemptId(), am1.getApplicationAttemptId());
+
+    // Call CS.handle once to see if container preempted
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        am2.getApplicationAttemptId());
+
+    // App1 has 6 containers, and app2 has 2 containers
+    Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
+
+    rm1.close();
+  }
+
+  @Test (timeout = 60000)
+  public void testPreemptionConsidersNodeLocalityDelay()
+      throws Exception {
+    /**
+     * Test case: same as testSimplePreemption steps 1-3.
+     *
+     * Step 4: app2 asks for 1G container with locality specified, so it needs
+     * to wait for missed-opportunity before get scheduled.
+     * Check if system waits missed-opportunity before finish killable container
+     */
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+    // Do allocation 3 times for node1/node2
+    for (int i = 0; i < 3; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    // App1 should have 7 containers now, and no available resource for cluster
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // NM1/NM2 has available resource = 0G
+    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+        .getUnallocatedResource().getMemory());
+    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+        .getUnallocatedResource().getMemory());
+
+    // AM asks for a 1 * GB container with unknown host and unknown rack
+    am2.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+            Resources.createResource(1 * GB), 1), ResourceRequest
+        .newInstance(Priority.newInstance(1), "unknownhost",
+            Resources.createResource(1 * GB), 1), ResourceRequest
+        .newInstance(Priority.newInstance(1), "/default-rack",
+            Resources.createResource(1 * GB), 1)), null);
+
+    // Get edit policy and do one update
+    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+    // Call edit schedule twice, and check if one container from app1 marked
+    // to be "killable"
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    PreemptionManager pm = cs.getPreemptionManager();
+    Map<ContainerId, RMContainer> killableContainers =
+        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+    Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
+        .getApplicationAttemptId(), am1.getApplicationAttemptId());
+
+    // Call CS.handle once to see if container preempted
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        am2.getApplicationAttemptId());
+
+    // App1 has 7 containers, and app2 has 1 containers (no container preempted)
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+    // Do allocation again, one container will be preempted
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    // App1 has 6 containers, and app2 has 2 containers (new container allocated)
+    Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
+
+    rm1.close();
+  }
+
+  @Test (timeout = 60000)
+  public void testPreemptionConsidersHardNodeLocality()
+      throws Exception {
+    /**
+     * Test case: same as testSimplePreemption steps 1-3.
+     *
+     * Step 4: app2 asks for 1G container with hard locality specified, and
+     *         asked host is not existed
+     * Confirm system doesn't preempt any container.
+     */
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+    // Do allocation 3 times for node1/node2
+    for (int i = 0; i < 3; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+    for (int i = 0; i < 3; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    // App1 should have 7 containers now, and no available resource for cluster
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // NM1/NM2 has available resource = 0G
+    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+        .getUnallocatedResource().getMemory());
+    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+        .getUnallocatedResource().getMemory());
+
+    // AM asks for a 1 * GB container for h3 with hard locality,
+    // h3 doesn't exist in the cluster
+    am2.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+            Resources.createResource(1 * GB), 1, true), ResourceRequest
+        .newInstance(Priority.newInstance(1), "h3",
+            Resources.createResource(1 * GB), 1, false), ResourceRequest
+        .newInstance(Priority.newInstance(1), "/default-rack",
+            Resources.createResource(1 * GB), 1, false)), null);
+
+    // Get edit policy and do one update
+    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+    // Call edit schedule twice, and check if one container from app1 marked
+    // to be "killable"
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    PreemptionManager pm = cs.getPreemptionManager();
+    Map<ContainerId, RMContainer> killableContainers =
+        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+    Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
+        .getApplicationAttemptId(), am1.getApplicationAttemptId());
+
+    // Call CS.handle once to see if container preempted
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        am2.getApplicationAttemptId());
+
+    // App1 has 7 containers, and app2 has 1 containers (no container preempted)
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+    // Do allocation again, nothing will be preempted
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    // App1 has 7 containers, and app2 has 1 containers (no container allocated)
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+    rm1.close();
+  }
+
+  @Test (timeout = 60000)
+  public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers()
+      throws Exception {
+    /**
+     * Test case:
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * </pre>
+     * Submit applications to two queues, one uses more than the other, so
+     * preemption will happen.
+     *
+     * Check:
+     * 1) Killable containers resources will be excluded from PCPP (no duplicated
+     *    container added to killable list)
+     * 2) When more resources need to be preempted, new containers will be selected
+     *    and killable containers will be considered
+     */
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+    // Do allocation 6 times for node1
+    for (int i = 0; i < 6; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+
+    // App1 should have 7 containers now, and no available resource for cluster
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+    // NM1 has available resource = 0G
+    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+        .getUnallocatedResource().getMemory());
+    am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
+
+    // Get edit policy and do one update
+    ProportionalCapacityPreemptionPolicy editPolicy =
+        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+
+    // Call edit schedule twice, and check if one container from app1 marked
+    // to be "killable"
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    PreemptionManager pm = cs.getPreemptionManager();
+    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+
+    // Check killable containers and to-be-preempted containers in edit policy
+    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+    // Run edit schedule again, confirm status doesn't changed
+    editPolicy.editSchedule();
+    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+    // Save current to kill containers
+    Set<ContainerId> previousKillableContainers = new HashSet<>(
+        pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL)
+            .keySet());
+
+    // Update request resource of c from 1 to 2, so we need to preempt
+    // one more container
+    am2.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>());
+
+    // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map
+    // and 1 container in killable map
+    editPolicy.editSchedule();
+    Assert.assertEquals(1, editPolicy.getToPreemptContainers().size());
+
+    // Call editPolicy.editSchedule() once more, we should have 2 containers killable map
+    editPolicy.editSchedule();
+    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+    // Check if previous killable containers included by new killable containers
+    Map<ContainerId, RMContainer> killableContainers =
+        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
+    Assert.assertTrue(
+        Sets.difference(previousKillableContainers, killableContainers.keySet())
+            .isEmpty());
+  }
+
+  /*
+   * Ignore this test now because it could be a premature optimization
+   */
+  @Ignore
+  @Test (timeout = 60000)
+  public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded()
+      throws Exception {
+    /**
+     * Test case:
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * </pre>
+     * Submit applications to two queues, one uses more than the other, so
+     * preemption will happen.
+     *
+     * Check:
+     * 1) Containers will be marked to killable
+     * 2) Cancel resource request
+     * 3) Killable containers will be cancelled from policy and scheduler
+     */
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+    // Do allocation 6 times for node1
+    for (int i = 0; i < 6; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+
+    // App1 should have 7 containers now, and no available resource for cluster
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+    // NM1 has available resource = 0G
+    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+        .getUnallocatedResource().getMemory());
+    am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>());
+
+    // Get edit policy and do one update
+    ProportionalCapacityPreemptionPolicy editPolicy =
+        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+
+    // Call edit schedule twice, and check if 3 container from app1 marked
+    // to be "killable"
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    PreemptionManager pm = cs.getPreemptionManager();
+    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3);
+
+    // Change reqeust from 3G to 2G, now we can preempt one less container. (3->2)
+    am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
+    editPolicy.editSchedule();
+    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
+
+    // Call editSchedule once more to make sure still nothing happens
+    editPolicy.editSchedule();
+    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
+  }
+
+  @Test (timeout = 60000)
+  public void testPreemptionConsidersUserLimit()
+      throws Exception {
+    /**
+     * Test case: Submit two application (app1/app2) to different queues, queue
+     * structure:
+     *
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * </pre>
+     *
+     * Queue-c's user-limit-factor = 0.1, so single user cannot allocate >1 containers in queue-c
+     *
+     * 1) Two nodes in the cluster, each of them has 4G.
+     *
+     * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
+     * more resource available.
+     *
+     * 3) app2 submit to queue-c, ask for one 1G container (for AM)
+     *
+     * Now the cluster is fulfilled.
+     *
+     * 4) app2 asks for another 1G container, system will preempt one container
+     * from app1, and app2 will receive the preempted container
+     */
+    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf);
+    csConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".c", 0.1f);
+    MockRM rm1 = new MockRM(csConf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+    // Do allocation 3 times for node1/node2
+    for (int i = 0; i < 3; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    // App1 should have 7 containers now, and no available resource for cluster
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+    // Submit app2 to queue-c and asks for a 1G container for AM
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // NM1/NM2 has available resource = 0G
+    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+        .getUnallocatedResource().getMemory());
+    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+        .getUnallocatedResource().getMemory());
+
+    // AM asks for a 1 * GB container
+    am2.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+            Resources.createResource(1 * GB), 1)), null);
+
+    // Get edit policy and do one update
+    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+    // Call edit schedule twice, and check if no container from app1 marked
+    // to be "killable"
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    // No preemption happens
+    PreemptionManager pm = cs.getPreemptionManager();
+    Map<ContainerId, RMContainer> killableContainers =
+        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0);
+    Assert.assertEquals(0, killableContainers.size());
+
+    // Call CS.handle once to see if container preempted
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        am2.getApplicationAttemptId());
+
+    // App1 has 7 containers, and app2 has 1 containers (nothing preempted)
+    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+    rm1.close();
+  }
+
+  private Map<ContainerId, RMContainer> waitKillableContainersSize(
+      PreemptionManager pm, String queueName, String partition,
+      int expectedSize) throws InterruptedException {
+    Map<ContainerId, RMContainer> killableContainers =
+        pm.getKillableContainersMap(queueName, partition);
+
+    int wait = 0;
+    // Wait for at most 5 sec (it should be super fast actually)
+    while (expectedSize != killableContainers.size() && wait < 500) {
+      killableContainers = pm.getKillableContainersMap(queueName, partition);
+      Thread.sleep(10);
+      wait++;
+    }
+
+    Assert.assertEquals(expectedSize, killableContainers.size());
+    return killableContainers;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb62e059/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
deleted file mode 100644
index 1a0adc6..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
+++ /dev/null
@@ -1,677 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
-import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
-import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-public class TestCapacitySchedulerPreemption {
-  private static final Log LOG = LogFactory.getLog(
-      TestCapacitySchedulerPreemption.class);
-
-  private final int GB = 1024;
-
-  private Configuration conf;
-
-  RMNodeLabelsManager mgr;
-
-  @Before
-  public void setUp() throws Exception {
-    conf = new YarnConfiguration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-        ResourceScheduler.class);
-    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
-    conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
-        ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
-    conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
-
-    // Set preemption related configurations
-    conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
-        0);
-    conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
-        60000L);
-    conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
-        true);
-    conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
-        1.0f);
-    conf.setFloat(
-        CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
-        1.0f);
-    mgr = new NullRMNodeLabelsManager();
-    mgr.init(this.conf);
-  }
-
-  private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) {
-    RMActiveServices activeServices = rm.getRMActiveService();
-    SchedulingMonitor mon = null;
-    for (Service service : activeServices.getServices()) {
-      if (service instanceof SchedulingMonitor) {
-        mon = (SchedulingMonitor) service;
-        break;
-      }
-    }
-
-    if (mon != null) {
-      return mon.getSchedulingEditPolicy();
-    }
-    return null;
-  }
-
-  @Test (timeout = 60000)
-  public void testSimplePreemption() throws Exception {
-    /**
-     * Test case: Submit two application (app1/app2) to different queues, queue
-     * structure:
-     *
-     * <pre>
-     *             Root
-     *            /  |  \
-     *           a   b   c
-     *          10   20  70
-     * </pre>
-     *
-     * 1) Two nodes in the cluster, each of them has 4G.
-     *
-     * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
-     * more resource available.
-     *
-     * 3) app2 submit to queue-c, ask for one 1G container (for AM)
-     *
-     * Now the cluster is fulfilled.
-     *
-     * 4) app2 asks for another 1G container, system will preempt one container
-     * from app1, and app2 will receive the preempted container
-     */
-    MockRM rm1 = new MockRM(conf);
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-
-    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
-    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
-    // launch an app to queue, AM container should be launched in nm1
-    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    am1.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>());
-
-    // Do allocation 3 times for node1/node2
-    for (int i = 0; i < 3; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-    }
-
-    // App1 should have 7 containers now, and no available resource for cluster
-    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
-    // Submit app2 to queue-c and asks for a 1G container for AM
-    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
-    // NM1/NM2 has available resource = 0G
-    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
-        .getUnallocatedResource().getMemory());
-    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
-        .getUnallocatedResource().getMemory());
-
-    // AM asks for a 1 * GB container
-    am2.allocate(Arrays.asList(ResourceRequest
-        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
-            Resources.createResource(1 * GB), 1)), null);
-
-    // Get edit policy and do one update
-    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
-    // Call edit schedule twice, and check if one container from app1 marked
-    // to be "killable"
-    editPolicy.editSchedule();
-    editPolicy.editSchedule();
-
-    PreemptionManager pm = cs.getPreemptionManager();
-    Map<ContainerId, RMContainer> killableContainers =
-        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
-    Assert.assertEquals(1, killableContainers.size());
-    Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
-        .getApplicationAttemptId(), am1.getApplicationAttemptId());
-
-    // Call CS.handle once to see if container preempted
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
-    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
-        am2.getApplicationAttemptId());
-
-    // App1 has 6 containers, and app2 has 2 containers
-    Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
-    Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
-
-    rm1.close();
-  }
-
-  @Test (timeout = 60000)
-  public void testPreemptionConsidersNodeLocalityDelay()
-      throws Exception {
-    /**
-     * Test case: same as testSimplePreemption steps 1-3.
-     *
-     * Step 4: app2 asks for 1G container with locality specified, so it needs
-     * to wait for missed-opportunity before get scheduled.
-     * Check if system waits missed-opportunity before finish killable container
-     */
-    MockRM rm1 = new MockRM(conf);
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
-    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
-    // launch an app to queue, AM container should be launched in nm1
-    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
-    // Do allocation 3 times for node1/node2
-    for (int i = 0; i < 3; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-    }
-
-    // App1 should have 7 containers now, and no available resource for cluster
-    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
-    // Submit app2 to queue-c and asks for a 1G container for AM
-    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
-    // NM1/NM2 has available resource = 0G
-    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
-        .getUnallocatedResource().getMemory());
-    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
-        .getUnallocatedResource().getMemory());
-
-    // AM asks for a 1 * GB container with unknown host and unknown rack
-    am2.allocate(Arrays.asList(ResourceRequest
-        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
-            Resources.createResource(1 * GB), 1), ResourceRequest
-        .newInstance(Priority.newInstance(1), "unknownhost",
-            Resources.createResource(1 * GB), 1), ResourceRequest
-        .newInstance(Priority.newInstance(1), "/default-rack",
-            Resources.createResource(1 * GB), 1)), null);
-
-    // Get edit policy and do one update
-    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
-    // Call edit schedule twice, and check if one container from app1 marked
-    // to be "killable"
-    editPolicy.editSchedule();
-    editPolicy.editSchedule();
-
-    PreemptionManager pm = cs.getPreemptionManager();
-    Map<ContainerId, RMContainer> killableContainers =
-        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
-    Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
-        .getApplicationAttemptId(), am1.getApplicationAttemptId());
-
-    // Call CS.handle once to see if container preempted
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
-    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
-        am2.getApplicationAttemptId());
-
-    // App1 has 7 containers, and app2 has 1 containers (no container preempted)
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
-    // Do allocation again, one container will be preempted
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
-    // App1 has 6 containers, and app2 has 2 containers (new container allocated)
-    Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
-    Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
-
-    rm1.close();
-  }
-
-  @Test (timeout = 60000)
-  public void testPreemptionConsidersHardNodeLocality()
-      throws Exception {
-    /**
-     * Test case: same as testSimplePreemption steps 1-3.
-     *
-     * Step 4: app2 asks for 1G container with hard locality specified, and
-     *         asked host is not existed
-     * Confirm system doesn't preempt any container.
-     */
-    MockRM rm1 = new MockRM(conf);
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
-    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
-    // launch an app to queue, AM container should be launched in nm1
-    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
-    // Do allocation 3 times for node1/node2
-    for (int i = 0; i < 3; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-    }
-    for (int i = 0; i < 3; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-    }
-
-    // App1 should have 7 containers now, and no available resource for cluster
-    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
-    // Submit app2 to queue-c and asks for a 1G container for AM
-    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
-    // NM1/NM2 has available resource = 0G
-    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
-        .getUnallocatedResource().getMemory());
-    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
-        .getUnallocatedResource().getMemory());
-
-    // AM asks for a 1 * GB container for h3 with hard locality,
-    // h3 doesn't exist in the cluster
-    am2.allocate(Arrays.asList(ResourceRequest
-        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
-            Resources.createResource(1 * GB), 1, true), ResourceRequest
-        .newInstance(Priority.newInstance(1), "h3",
-            Resources.createResource(1 * GB), 1, false), ResourceRequest
-        .newInstance(Priority.newInstance(1), "/default-rack",
-            Resources.createResource(1 * GB), 1, false)), null);
-
-    // Get edit policy and do one update
-    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
-    // Call edit schedule twice, and check if one container from app1 marked
-    // to be "killable"
-    editPolicy.editSchedule();
-    editPolicy.editSchedule();
-
-    PreemptionManager pm = cs.getPreemptionManager();
-    Map<ContainerId, RMContainer> killableContainers =
-        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
-    Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
-        .getApplicationAttemptId(), am1.getApplicationAttemptId());
-
-    // Call CS.handle once to see if container preempted
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
-    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
-        am2.getApplicationAttemptId());
-
-    // App1 has 7 containers, and app2 has 1 containers (no container preempted)
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
-    // Do allocation again, nothing will be preempted
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
-    // App1 has 7 containers, and app2 has 1 containers (no container allocated)
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
-    rm1.close();
-  }
-
-  @Test (timeout = 60000)
-  public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers()
-      throws Exception {
-    /**
-     * Test case:
-     * <pre>
-     *             Root
-     *            /  |  \
-     *           a   b   c
-     *          10   20  70
-     * </pre>
-     * Submit applications to two queues, one uses more than the other, so
-     * preemption will happen.
-     *
-     * Check:
-     * 1) Killable containers resources will be excluded from PCPP (no duplicated
-     *    container added to killable list)
-     * 2) When more resources need to be preempted, new containers will be selected
-     *    and killable containers will be considered
-     */
-    MockRM rm1 = new MockRM(conf);
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-
-    // launch an app to queue, AM container should be launched in nm1
-    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
-    // Do allocation 6 times for node1
-    for (int i = 0; i < 6; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-    }
-
-    // App1 should have 7 containers now, and no available resource for cluster
-    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
-    // Submit app2 to queue-c and asks for a 1G container for AM
-    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
-
-    // NM1 has available resource = 0G
-    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
-        .getUnallocatedResource().getMemory());
-    am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
-
-    // Get edit policy and do one update
-    ProportionalCapacityPreemptionPolicy editPolicy =
-        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
-
-    // Call edit schedule twice, and check if one container from app1 marked
-    // to be "killable"
-    editPolicy.editSchedule();
-    editPolicy.editSchedule();
-
-    PreemptionManager pm = cs.getPreemptionManager();
-    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
-
-    // Check killable containers and to-be-preempted containers in edit policy
-    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-
-    // Run edit schedule again, confirm status doesn't changed
-    editPolicy.editSchedule();
-    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-
-    // Save current to kill containers
-    Set<ContainerId> previousKillableContainers = new HashSet<>(
-        pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL)
-            .keySet());
-
-    // Update request resource of c from 1 to 2, so we need to preempt
-    // one more container
-    am2.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>());
-
-    // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map
-    // and 1 container in killable map
-    editPolicy.editSchedule();
-    Assert.assertEquals(1, editPolicy.getToPreemptContainers().size());
-
-    // Call editPolicy.editSchedule() once more, we should have 2 containers killable map
-    editPolicy.editSchedule();
-    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-
-    // Check if previous killable containers included by new killable containers
-    Map<ContainerId, RMContainer> killableContainers =
-        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
-    Assert.assertTrue(
-        Sets.difference(previousKillableContainers, killableContainers.keySet())
-            .isEmpty());
-  }
-
-  /*
-   * Ignore this test now because it could be a premature optimization
-   */
-  @Ignore
-  @Test (timeout = 60000)
-  public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded()
-      throws Exception {
-    /**
-     * Test case:
-     * <pre>
-     *             Root
-     *            /  |  \
-     *           a   b   c
-     *          10   20  70
-     * </pre>
-     * Submit applications to two queues, one uses more than the other, so
-     * preemption will happen.
-     *
-     * Check:
-     * 1) Containers will be marked to killable
-     * 2) Cancel resource request
-     * 3) Killable containers will be cancelled from policy and scheduler
-     */
-    MockRM rm1 = new MockRM(conf);
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-
-    // launch an app to queue, AM container should be launched in nm1
-    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
-    // Do allocation 6 times for node1
-    for (int i = 0; i < 6; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-    }
-
-    // App1 should have 7 containers now, and no available resource for cluster
-    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
-    // Submit app2 to queue-c and asks for a 1G container for AM
-    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
-
-    // NM1 has available resource = 0G
-    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
-        .getUnallocatedResource().getMemory());
-    am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>());
-
-    // Get edit policy and do one update
-    ProportionalCapacityPreemptionPolicy editPolicy =
-        (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
-
-    // Call edit schedule twice, and check if 3 container from app1 marked
-    // to be "killable"
-    editPolicy.editSchedule();
-    editPolicy.editSchedule();
-
-    PreemptionManager pm = cs.getPreemptionManager();
-    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3);
-
-    // Change reqeust from 3G to 2G, now we can preempt one less container. (3->2)
-    am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
-    editPolicy.editSchedule();
-    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
-
-    // Call editSchedule once more to make sure still nothing happens
-    editPolicy.editSchedule();
-    Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-    waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
-  }
-
-  @Test (timeout = 60000)
-  public void testPreemptionConsidersUserLimit()
-      throws Exception {
-    /**
-     * Test case: Submit two application (app1/app2) to different queues, queue
-     * structure:
-     *
-     * <pre>
-     *             Root
-     *            /  |  \
-     *           a   b   c
-     *          10   20  70
-     * </pre>
-     *
-     * Queue-c's user-limit-factor = 0.1, so single user cannot allocate >1 containers in queue-c
-     *
-     * 1) Two nodes in the cluster, each of them has 4G.
-     *
-     * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
-     * more resource available.
-     *
-     * 3) app2 submit to queue-c, ask for one 1G container (for AM)
-     *
-     * Now the cluster is fulfilled.
-     *
-     * 4) app2 asks for another 1G container, system will preempt one container
-     * from app1, and app2 will receive the preempted container
-     */
-    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf);
-    csConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".c", 0.1f);
-    MockRM rm1 = new MockRM(csConf);
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
-    MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
-    // launch an app to queue, AM container should be launched in nm1
-    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
-    // Do allocation 3 times for node1/node2
-    for (int i = 0; i < 3; i++) {
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
-      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-    }
-
-    // App1 should have 7 containers now, and no available resource for cluster
-    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
-        am1.getApplicationAttemptId());
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
-    // Submit app2 to queue-c and asks for a 1G container for AM
-    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
-    // NM1/NM2 has available resource = 0G
-    Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
-        .getUnallocatedResource().getMemory());
-    Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
-        .getUnallocatedResource().getMemory());
-
-    // AM asks for a 1 * GB container
-    am2.allocate(Arrays.asList(ResourceRequest
-        .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
-            Resources.createResource(1 * GB), 1)), null);
-
-    // Get edit policy and do one update
-    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
-    // Call edit schedule twice, and check if no container from app1 marked
-    // to be "killable"
-    editPolicy.editSchedule();
-    editPolicy.editSchedule();
-
-    // No preemption happens
-    PreemptionManager pm = cs.getPreemptionManager();
-    Map<ContainerId, RMContainer> killableContainers =
-        waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0);
-    Assert.assertEquals(0, killableContainers.size());
-
-    // Call CS.handle once to see if container preempted
-    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
-    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
-        am2.getApplicationAttemptId());
-
-    // App1 has 7 containers, and app2 has 1 containers (nothing preempted)
-    Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
-    rm1.close();
-  }
-
-  private Map<ContainerId, RMContainer> waitKillableContainersSize(
-      PreemptionManager pm, String queueName, String partition,
-      int expectedSize) throws InterruptedException {
-    Map<ContainerId, RMContainer> killableContainers =
-        pm.getKillableContainersMap(queueName, partition);
-
-    int wait = 0;
-    // Wait for at most 5 sec (it should be super fast actually)
-    while (expectedSize != killableContainers.size() && wait < 500) {
-      killableContainers = pm.getKillableContainersMap(queueName, partition);
-      Thread.sleep(10);
-      wait++;
-    }
-
-    Assert.assertEquals(expectedSize, killableContainers.size());
-    return killableContainers;
-  }
-}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message