hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From haiboc...@apache.org
Subject [1/2] hadoop git commit: YARN-1015. FS should watch node resource utilization and allocate opportunistic containers if appropriate.
Date Fri, 17 Nov 2017 15:48:26 GMT
Repository: hadoop
Updated Branches:
  refs/heads/YARN-1011 a4cfabf28 -> 561410c78


http://git-wip-us.apache.org/repos/asf/hadoop/blob/561410c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 42d4f81..e70053c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -55,13 +55,19 @@ import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -71,6 +77,8 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -92,6 +100,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 
 
@@ -1054,15 +1063,15 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals(
         FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
         scheduler.getQueueManager().getQueue("queue1").
-            getResourceUsage().getMemorySize());
+            getGuaranteedResourceUsage().getMemorySize());
 
     NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
     scheduler.handle(updateEvent2);
 
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
-      getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
     assertEquals(2, scheduler.getQueueManager().getQueue("queue1").
-      getResourceUsage().getVirtualCores());
+        getGuaranteedResourceUsage().getVirtualCores());
 
     // verify metrics
     QueueMetrics queue1Metrics = scheduler.getQueueManager().getQueue("queue1")
@@ -1097,7 +1106,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // Make sure queue 1 is allocated app capacity
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
     // Now queue 2 requests likewise
     ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1);
@@ -1107,7 +1116,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // Make sure queue 2 is waiting with a reservation
     assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
     assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemorySize());
 
     // Now another node checks in with capacity
@@ -1121,7 +1130,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // Make sure this goes to queue 2
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
     // The old reservation should still be there...
     assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemorySize());
@@ -1131,7 +1140,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
   }
 
-  @Test (timeout = 5000)
+  @Test
   public void testOffSwitchAppReservationThreshold() throws Exception {
     conf.setFloat(FairSchedulerConfiguration.RESERVABLE_NODES, 0.50f);
     scheduler.init(conf);
@@ -1171,7 +1180,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // Verify capacity allocation
     assertEquals(6144, scheduler.getQueueManager().getQueue("queue1").
-            getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
     // Create new app with a resource request that can be satisfied by any
     // node but would be
@@ -1203,7 +1212,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     scheduler.update();
     scheduler.handle(new NodeUpdateSchedulerEvent(node4));
     assertEquals(8192, scheduler.getQueueManager().getQueue("queue1").
-            getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
     scheduler.handle(new NodeUpdateSchedulerEvent(node1));
     scheduler.handle(new NodeUpdateSchedulerEvent(node2));
@@ -1264,7 +1273,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // Verify capacity allocation
     assertEquals(8192, scheduler.getQueueManager().getQueue("queue1").
-            getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
     // Create new app with a resource request that can be satisfied by any
     // node but would be
@@ -1309,7 +1318,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     scheduler.update();
     scheduler.handle(new NodeUpdateSchedulerEvent(node4));
     assertEquals(10240, scheduler.getQueueManager().getQueue("queue1").
-            getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
     scheduler.handle(new NodeUpdateSchedulerEvent(node1));
     scheduler.handle(new NodeUpdateSchedulerEvent(node2));
@@ -1353,7 +1362,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // Verify capacity allocation
     assertEquals(8192, scheduler.getQueueManager().getQueue("queue1").
-            getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
     // Verify number of reservations have decremented
     assertEquals(0,
@@ -1397,7 +1406,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // Make sure queue 1 is allocated app capacity
     assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
     // Now queue 2 requests likewise
     createSchedulingRequest(1024, "queue2", "user2", 1);
@@ -1406,7 +1415,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // Make sure queue 2 is allocated app capacity
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
     ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1);
     scheduler.update();
@@ -1532,7 +1541,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // Make sure queue 1 is allocated app capacity
     assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
     // Now queue 2 requests likewise
     createSchedulingRequest(1024, "queue2", "user2", 1);
@@ -1541,7 +1550,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // Make sure queue 2 is allocated app capacity
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
-      getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
     
     ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1);
     scheduler.update();
@@ -1581,12 +1590,12 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // Make sure allocated memory of queue1 doesn't exceed its maximum
     assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
     //the reservation of queue1 should be reclaim
     assertEquals(0, scheduler.getSchedulerApp(attId1).
         getCurrentReservation().getMemorySize());
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
   }
 
   @Test
@@ -1626,7 +1635,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // Make sure queue 1 is allocated app capacity
     assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
     // Now queue 2 requests below threshold
     ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1);
@@ -1635,7 +1644,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // Make sure queue 2 has no reservation
     assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
     assertEquals(0,
         scheduler.getSchedulerApp(attId).getReservedContainers().size());
 
@@ -1646,7 +1655,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // Make sure queue 2 is waiting with a reservation
     assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
     assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation()
         .getVirtualCores());
 
@@ -1661,7 +1670,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // Make sure this goes to queue 2
     assertEquals(3, scheduler.getQueueManager().getQueue("queue2").
-        getResourceUsage().getVirtualCores());
+        getGuaranteedResourceUsage().getVirtualCores());
 
     // The old reservation should still be there...
     assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation()
@@ -2694,7 +2703,361 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         2, liveContainers.iterator().next().getContainer().
             getPriority().getPriority());
   }
-  
+
+  /**
+   * Test that NO OPPORTUNISTIC containers can be allocated on a node that
+   * is fully allocated and with a very high utilization.
+   */
+  @Test
+  public void testAllocateNoOpportunisticContainersOnBusyNode()
+      throws IOException {
+    conf.setBoolean(
+        YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, true);
+    // disable resource request normalization in fair scheduler
+    int memoryAllocationIncrement = conf.getInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+        FairSchedulerConfiguration.
+            DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+    conf.setInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+    int memoryAllocationMinimum = conf.getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+    try {
+      scheduler.init(conf);
+      scheduler.start();
+      scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+      // Add a node with 4G of memory and 4 vcores and an overallocation
+      // threshold of 0.75f and 0.75f for memory and cpu respectively
+      OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+          ResourceThresholds.newInstance(0.75f, 0.75f));
+      MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+          Resources.createResource(2048, 2), overAllocationInfo);
+      scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+      // create a scheduling request that takes up the node's full memory
+      ApplicationAttemptId appAttempt1 =
+          createSchedulingRequest(2048, "queue1", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers1 =
+          scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers1.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers1.get(0).getExecutionType());
+
+      // node utilization shoots up after the container runs on the node
+      ContainerStatus containerStatus = ContainerStatus.newInstance(
+          allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+              Collections.emptyList()),
+          ResourceUtilization.newInstance(2000, 0, 0.8f));
+
+      // create another scheduling request
+      ApplicationAttemptId appAttempt2
+          = createSchedulingRequest(100, "queue2", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      List<Container> allocatedContainers2 =
+          scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+      assertTrue("Expecting no containers allocated",
+          allocatedContainers2.size() == 0);
+      assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+          getOpportunisticResourceUsage().getMemorySize());
+
+      // verify that a reservation is made for the second resource request
+      Resource reserved = scheduler.getNode(node.getNodeID()).
+          getReservedContainer().getReservedResource();
+      assertTrue("Expect a reservation made for the second resource request",
+          reserved.equals(Resource.newInstance(100, 1)));
+    } finally {
+      conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+          false);
+      conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          memoryAllocationMinimum);
+      conf.setInt(
+          FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+          memoryAllocationIncrement);
+    }
+  }
+
+  /**
+   * Test that OPPORTUNISTIC containers can be allocated on a node with low
+   * utilization even though there is not enough unallocated resource on the
+   * node to accommodate the request.
+   */
+  @Test
+  public void testAllocateOpportunisticContainersOnPartiallyOverAllocatedNode()
+      throws IOException {
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+        true);
+    // disable resource request normalization in fair scheduler
+    int memoryAllocationIncrement = conf.getInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+        FairSchedulerConfiguration.
+            DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+    conf.setInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+    int memoryAllocationMinimum = conf.getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+    try {
+      scheduler.init(conf);
+      scheduler.start();
+      scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+      // Add a node with 4G of memory and 4 vcores and an overallocation
+      // threshold of 0.75f and 0.75f for memory and cpu respectively
+      OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+          ResourceThresholds.newInstance(0.75f, 0.75f));
+      MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+          Resources.createResource(4096, 4), overAllocationInfo);
+      scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+      // create a scheduling request that leaves some unallocated resources
+      ApplicationAttemptId appAttempt1 =
+          createSchedulingRequest(3600, "queue1", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(3600, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers1 =
+          scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers1.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers1.get(0).getExecutionType());
+
+      // node utilization is low after the container is launched on the node
+      ContainerStatus containerStatus = ContainerStatus.newInstance(
+          allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+              Collections.emptyList()),
+          ResourceUtilization.newInstance(1800, 0, 0.5f));
+
+      // create another scheduling request that asks for more than what's left
+      // unallocated on the node but can be served with overallocation.
+      ApplicationAttemptId appAttempt2 =
+          createSchedulingRequest(1024, "queue2", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+          getOpportunisticResourceUsage().getMemorySize());
+      List<Container> allocatedContainers2 =
+          scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers2.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.OPPORTUNISTIC,
+          allocatedContainers2.get(0).getExecutionType());
+
+      // verify that no reservation is made for the second request given
+      // that it's satisfied by an OPPORTUNISTIC container allocation.
+      assertTrue("No reservation should be made because we have satisfied" +
+          " the second request with an OPPORTUNISTIC container allocation",
+          scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+    } finally {
+      conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+          false);
+      conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          memoryAllocationMinimum);
+      conf.setInt(
+          FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+          memoryAllocationIncrement);
+    }
+  }
+
+  /**
+   * Test opportunistic containers can be allocated on a node that is fully
+   * allocated but whose utilization is very low.
+   */
+  @Test
+  public void testAllocateOpportunisticContainersOnFullyAllocatedNode()
+      throws IOException {
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+        true);
+    // disable resource request normalization in fair scheduler
+    int memoryAllocationIncrement = conf.getInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+        FairSchedulerConfiguration.
+            DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+    conf.setInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+    int memoryAllocationMinimum = conf.getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+    try {
+      scheduler.init(conf);
+      scheduler.start();
+      scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+      // Add a node with 4G of memory and 4 vcores and an overallocation
+      // threshold of 0.75f and 0.75f for memory and cpu respectively
+      OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+          ResourceThresholds.newInstance(0.75f, 0.75f));
+      MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+          Resources.createResource(4096, 4), overAllocationInfo);
+      scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+      // create a scheduling request that takes up the whole node
+      ApplicationAttemptId appAttempt1 = createSchedulingRequest(
+          4096, "queue1", "user1", 4);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers1 =
+          scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers1.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers1.get(0).getExecutionType());
+
+      // node utilization is low after the container is launched on the node
+      ContainerStatus containerStatus = ContainerStatus.newInstance(
+          allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+              Collections.emptyList()),
+          ResourceUtilization.newInstance(1800, 0, 0.5f));
+
+      // create another scheduling request now that there is no unallocated
+      // resources left on the node, the request should be served with an
+      // allocation of an opportunistic container
+      ApplicationAttemptId appAttempt2 = createSchedulingRequest(
+          1024, "queue2", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+          getOpportunisticResourceUsage().getMemorySize());
+      List<Container> allocatedContainers2 =
+          scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers2.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.OPPORTUNISTIC,
+          allocatedContainers2.get(0).getExecutionType());
+
+      // verify that no reservation is made for the second request given
+      // that it's satisfied by an OPPORTUNISTIC container allocation.
+      assertTrue("No reservation should be made because we have satisfied" +
+              " the second request with an OPPORTUNISTIC container allocation",
+          scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+    } finally {
+      conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+          false);
+      conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          memoryAllocationMinimum);
+      conf.setInt(
+          FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+          memoryAllocationIncrement);
+    }
+  }
+
+  /**
+   * Test opportunistic containers can be allocated on a node with a low
+   * utilization even though there are GUARANTEED containers allocated.
+   */
+  @Test
+  public void testAllocateOpportunisticContainersWithGuaranteedOnes()
+      throws Exception {
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+        true);
+    // disable resource request normalization in fair scheduler
+    int memoryAllocationIncrement = conf.getInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+        FairSchedulerConfiguration.
+            DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+    conf.setInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+    int memoryAllocationMinimum = conf.getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+    try {
+      scheduler.init(conf);
+      scheduler.start();
+      scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+      // Add a node with 4G of memory and 4 vcores and an overallocation
+      // threshold of 0.75f and 0.75f for memory and cpu respectively
+      OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+          ResourceThresholds.newInstance(0.75f, 0.75f));
+      MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+          Resources.createResource(4096, 4), overAllocationInfo);
+      scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+      // create a scheduling request
+      ApplicationAttemptId appAttempt1 =
+          createSchedulingRequest(3200, "queue1", "user1", 3);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(3200, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers1 =
+          scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers1.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers1.get(0).getExecutionType());
+
+      // node utilization is low after the container is launched on the node
+      ContainerStatus containerStatus = ContainerStatus.newInstance(
+          allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+              Collections.emptyList()),
+          ResourceUtilization.newInstance(512, 0, 0.1f));
+
+      // create two other scheduling requests which in aggregate ask for more
+      // that what's left unallocated on the node.
+      ApplicationAttemptId appAttempt2 =
+          createSchedulingRequest(512, "queue2", "user1", 1);
+      ApplicationAttemptId appAttempt3 =
+          createSchedulingRequest(1024, "queue3", "user1", 1);
+
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(512, scheduler.getQueueManager().getQueue("queue2").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers2 =
+          scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers2.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers2.get(0).getExecutionType());
+
+      List<Container> allocatedContainers3 =
+          scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers3.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.OPPORTUNISTIC,
+          allocatedContainers3.get(0).getExecutionType());
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue3").
+          getOpportunisticResourceUsage().getMemorySize());
+
+      // verify that no reservation is made given that the second request should
+      // be satisfied by a GUARANTEED container allocation, the third by an
+      // OPPORTUNISTIC container allocation.
+      assertTrue("No reservation should be made.",
+          scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+    } finally {
+      conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+          false);
+      conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          memoryAllocationMinimum);
+      conf.setInt(
+          FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+          memoryAllocationIncrement);
+    }
+  }
+
   @Test
   public void testAclSubmitApplication() throws Exception {
     // Set acl's
@@ -3684,7 +4047,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         .createAbnormalContainerStatus(container.getContainerId(),
             SchedulerUtils.COMPLETED_APPLICATION),
         RMContainerEventType.FINISHED);
-    assertEquals(Resources.none(), app1.getResourceUsage());
+    assertEquals(Resources.none(), app1.getGuaranteedResourceUsage());
   }
 
   @Test
@@ -3784,7 +4147,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals("Application1's AM should be finished",
         0, app1.getLiveContainers().size());
     assertEquals("Finished application usage should be none",
-        Resources.none(), app1.getResourceUsage());
+        Resources.none(), app1.getGuaranteedResourceUsage());
     assertEquals("Application3's AM should be running",
         1, app3.getLiveContainers().size());
     assertEquals("Application3's AM requests 1024 MB memory",
@@ -3804,7 +4167,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals("Application4's AM should not be running",
         0, app4.getLiveContainers().size());
     assertEquals("Finished application usage should be none",
-        Resources.none(), app4.getResourceUsage());
+        Resources.none(), app4.getGuaranteedResourceUsage());
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
         2048, queue1.getAmResourceUsage().getMemorySize());
 
@@ -3820,7 +4183,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals("Application5's AM should not be running",
         0, app5.getLiveContainers().size());
     assertEquals("Finished application usage should be none",
-        Resources.none(), app5.getResourceUsage());
+        Resources.none(), app5.getGuaranteedResourceUsage());
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
         2048, queue1.getAmResourceUsage().getMemorySize());
 
@@ -3833,7 +4196,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals("Application5's AM should not be running",
         0, app5.getLiveContainers().size());
     assertEquals("Finished application usage should be none",
-        Resources.none(), app5.getResourceUsage());
+        Resources.none(), app5.getGuaranteedResourceUsage());
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
         2048, queue1.getAmResourceUsage().getMemorySize());
 
@@ -3849,11 +4212,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals("Application2's AM should be finished",
         0, app2.getLiveContainers().size());
     assertEquals("Finished application usage should be none",
-        Resources.none(), app2.getResourceUsage());
+        Resources.none(), app2.getGuaranteedResourceUsage());
     assertEquals("Application3's AM should be finished",
         0, app3.getLiveContainers().size());
     assertEquals("Finished application usage should be none",
-        Resources.none(), app3.getResourceUsage());
+        Resources.none(), app3.getGuaranteedResourceUsage());
     assertEquals("Application5's AM should be running",
         1, app5.getLiveContainers().size());
     assertEquals("Application5's AM requests 2048 MB memory",
@@ -3874,7 +4237,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals("Application5's AM should have 0 container",
         0, app5.getLiveContainers().size());
     assertEquals("Finished application usage should be none",
-        Resources.none(), app5.getResourceUsage());
+        Resources.none(), app5.getGuaranteedResourceUsage());
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
         2048, queue1.getAmResourceUsage().getMemorySize());
     scheduler.update();
@@ -3898,7 +4261,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals("Application6's AM should not be running",
         0, app6.getLiveContainers().size());
     assertEquals("Finished application usage should be none",
-        Resources.none(), app6.getResourceUsage());
+        Resources.none(), app6.getGuaranteedResourceUsage());
     assertEquals("Application6's AM resource shouldn't be updated",
         0, app6.getAMResource().getMemorySize());
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
@@ -4614,17 +4977,25 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     FSQueue queue2 = queueMgr.getLeafQueue("parent2.queue2", true);
     FSQueue queue1 = queueMgr.getLeafQueue("parent1.queue1", true);
 
-    Assert.assertEquals(parent2.getResourceUsage().getMemorySize(), 0);
-    Assert.assertEquals(queue2.getResourceUsage().getMemorySize(), 0);
-    Assert.assertEquals(parent1.getResourceUsage().getMemorySize(), 1 * GB);
-    Assert.assertEquals(queue1.getResourceUsage().getMemorySize(), 1 * GB);
+    Assert.assertEquals(parent2.getGuaranteedResourceUsage().getMemorySize(),
+        0);
+    Assert.assertEquals(queue2.getGuaranteedResourceUsage().getMemorySize(),
+        0);
+    Assert.assertEquals(parent1.getGuaranteedResourceUsage().getMemorySize(),
+        1 * GB);
+    Assert.assertEquals(queue1.getGuaranteedResourceUsage().getMemorySize(),
+        1 * GB);
 
     scheduler.moveApplication(appAttId.getApplicationId(), "parent2.queue2");
 
-    Assert.assertEquals(parent2.getResourceUsage().getMemorySize(), 1 * GB);
-    Assert.assertEquals(queue2.getResourceUsage().getMemorySize(), 1 * GB);
-    Assert.assertEquals(parent1.getResourceUsage().getMemorySize(), 0);
-    Assert.assertEquals(queue1.getResourceUsage().getMemorySize(), 0);
+    Assert.assertEquals(parent2.getGuaranteedResourceUsage().getMemorySize(),
+        1 * GB);
+    Assert.assertEquals(queue2.getGuaranteedResourceUsage().getMemorySize(),
+        1 * GB);
+    Assert.assertEquals(parent1.getGuaranteedResourceUsage().getMemorySize(),
+        0);
+    Assert.assertEquals(queue1.getGuaranteedResourceUsage().getMemorySize(),
+        0);
   }
     
   @Test (expected = YarnException.class)
@@ -4664,7 +5035,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     scheduler.handle(updateEvent);
     scheduler.handle(updateEvent);
     
-    assertEquals(Resource.newInstance(2048, 2), oldQueue.getResourceUsage());
+    assertEquals(Resource.newInstance(2048, 2),
+        oldQueue.getGuaranteedResourceUsage());
     scheduler.moveApplication(appAttId.getApplicationId(), "queue2");
   }
   
@@ -5088,7 +5460,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     scheduler.handle(new NodeUpdateSchedulerEvent(node2));
 
     assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
     //container will be reserved at node1
     RMContainer reservedContainer1 =
@@ -5108,7 +5480,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         app1, RMAppAttemptState.KILLED, false));
 
     assertEquals(0, scheduler.getQueueManager().getQueue("queue1").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
     // container will be allocated at node2
     scheduler.handle(new NodeUpdateSchedulerEvent(node2));
@@ -5256,10 +5628,12 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     FSAppAttempt app1 = mock(FSAppAttempt.class);
     Mockito.when(app1.getDemand()).thenReturn(maxResource);
-    Mockito.when(app1.getResourceUsage()).thenReturn(Resources.none());
+    Mockito.when(app1.getGuaranteedResourceUsage()).
+        thenReturn(Resources.none());
     FSAppAttempt app2 = mock(FSAppAttempt.class);
     Mockito.when(app2.getDemand()).thenReturn(maxResource);
-    Mockito.when(app2.getResourceUsage()).thenReturn(Resources.none());
+    Mockito.when(app2.getGuaranteedResourceUsage()).
+        thenReturn(Resources.none());
 
     QueueManager queueManager = scheduler.getQueueManager();
     FSParentQueue queue1 = queueManager.getParentQueue("queue1", true);
@@ -5315,7 +5689,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     child1.setMaxShare(new ConfigurableResource(resource));
     FSAppAttempt app = mock(FSAppAttempt.class);
     Mockito.when(app.getDemand()).thenReturn(resource);
-    Mockito.when(app.getResourceUsage()).thenReturn(resource);
+    Mockito.when(app.getGuaranteedResourceUsage()).thenReturn(resource);
     child1.addApp(app, true);
     child1.updateDemand();
 
@@ -5351,7 +5725,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         + " SteadyFairShare: <memory:0, vCores:0>,"
         + " MaxShare: <memory:4096, vCores:4>,"
         + " MinShare: <memory:0, vCores:0>,"
-        + " ResourceUsage: <memory:4096, vCores:4>,"
+        + " Guaranteed ResourceUsage: <memory:4096, vCores:4>,"
         + " Demand: <memory:4096, vCores:4>,"
         + " MaxAMShare: 0.5,"
         + " Runnable: 0}";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/561410c7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
index b016c1b..6777b5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
@@ -243,11 +243,16 @@ public class TestSchedulingPolicy {
       }
 
       @Override
-      public Resource getResourceUsage() {
+      public Resource getGuaranteedResourceUsage() {
         return usage;
       }
 
       @Override
+      public Resource getOpportunisticResourceUsage() {
+        return Resource.newInstance(0, 0);
+      }
+
+      @Override
       public Resource getMinShare() {
         return minShare;
       }
@@ -278,7 +283,8 @@ public class TestSchedulingPolicy {
       }
 
       @Override
-      public Resource assignContainer(FSSchedulerNode node) {
+      public Resource assignContainer(FSSchedulerNode node,
+          boolean opportunistic) {
         throw new UnsupportedOperationException();
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message