hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject hadoop git commit: YARN-4161. Capacity Scheduler : Assign single or multiple containers per heart beat driven by configuration. (Wei Yan via wangda)
Date Mon, 07 Aug 2017 21:09:02 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 050c50e3c -> e18415a35


YARN-4161. Capacity Scheduler : Assign single or multiple containers per heart beat driven
by configuration. (Wei Yan via wangda)

Change-Id: Ic441ae4e0bf72e7232411eb54243ec143d5fd0d3
(cherry picked from commit adb84f34db7e1cdcd72aa8e3deb464c48da9e353)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e18415a3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e18415a3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e18415a3

Branch: refs/heads/branch-2
Commit: e18415a350eedfd4efd34644f0585ac881a2da3d
Parents: 050c50e
Author: Wangda Tan <wangda@apache.org>
Authored: Mon Aug 7 11:32:12 2017 -0700
Committer: Wangda Tan <wangda@apache.org>
Committed: Mon Aug 7 11:34:45 2017 -0700

----------------------------------------------------------------------
 .../scheduler/capacity/CapacityScheduler.java   |  51 +++-
 .../CapacitySchedulerConfiguration.java         |  23 ++
 .../capacity/TestCapacityScheduler.java         | 232 ++++++++++++++++++-
 3 files changed, 289 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18415a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 3179d07..4e99db1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -162,6 +162,9 @@ public class CapacityScheduler extends
 
   private int offswitchPerHeartbeatLimit;
 
+  private boolean assignMultipleEnabled;
+
+  private int maxAssignPerHeartbeat;
 
   @Override
   public void setConf(Configuration conf) {
@@ -307,6 +310,9 @@ public class CapacityScheduler extends
       asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
           DEFAULT_ASYNC_SCHEDULER_INTERVAL);
 
+      this.assignMultipleEnabled = this.conf.getAssignMultipleEnabled();
+      this.maxAssignPerHeartbeat = this.conf.getMaxAssignPerHeartbeat();
+
       // number of threads for async scheduling
       int maxAsyncSchedulingThreads = this.conf.getInt(
           CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
@@ -1108,17 +1114,29 @@ public class CapacityScheduler extends
       .getAssignmentInformation().getReserved());
   }
 
-  private boolean canAllocateMore(CSAssignment assignment, int offswitchCount) {
-    if (null != assignment && Resources.greaterThan(getResourceCalculator(),
-        getClusterResource(), assignment.getResource(), Resources.none())
-        && offswitchCount < offswitchPerHeartbeatLimit) {
-      // And it should not be a reserved container
-      if (assignment.getAssignmentInformation().getNumReservations() == 0) {
-        return true;
-      }
+  private boolean canAllocateMore(CSAssignment assignment, int offswitchCount,
+      int assignedContainers) {
+    // Current assignment shouldn't be empty
+    if (assignment == null
+            || Resources.equals(assignment.getResource(), Resources.none())) {
+      return false;
     }
 
-    return false;
+    // offswitch assignment should be under threshold
+    if (offswitchCount >= offswitchPerHeartbeatLimit) {
+      return false;
+    }
+
+    // And it should not be a reserved container
+    if (assignment.getAssignmentInformation().getNumReservations() > 0) {
+      return false;
+    }
+
+    // assignMultipleEnabled should be ON,
+    // and assignedContainers should be under threshold
+    return assignMultipleEnabled
+        && (maxAssignPerHeartbeat == -1
+            || assignedContainers < maxAssignPerHeartbeat);
   }
 
   /**
@@ -1130,6 +1148,7 @@ public class CapacityScheduler extends
     FiCaSchedulerNode node = getNode(nodeId);
     if (null != node) {
       int offswitchCount = 0;
+      int assignedContainers = 0;
 
       PlacementSet<FiCaSchedulerNode> ps = new SimplePlacementSet<>(node);
       CSAssignment assignment = allocateContainersToNode(ps, withNodeHeartbeat);
@@ -1140,7 +1159,13 @@ public class CapacityScheduler extends
           offswitchCount++;
         }
 
-        while (canAllocateMore(assignment, offswitchCount)) {
+        if (Resources.greaterThan(calculator, getClusterResource(),
+            assignment.getResource(), Resources.none())) {
+          assignedContainers++;
+        }
+
+        while (canAllocateMore(assignment, offswitchCount,
+            assignedContainers)) {
           // Try to see if it is possible to allocate multiple container for
           // the same node heartbeat
           assignment = allocateContainersToNode(ps, true);
@@ -1149,6 +1174,12 @@ public class CapacityScheduler extends
               && assignment.getType() == NodeType.OFF_SWITCH) {
             offswitchCount++;
           }
+
+          if (null != assignment
+              && Resources.greaterThan(calculator, getClusterResource(),
+                  assignment.getResource(), Resources.none())) {
+            assignedContainers++;
+          }
         }
 
         if (offswitchCount >= offswitchPerHeartbeatLimit) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18415a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 1e29d50..13b9ff6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -301,6 +301,21 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   @Private
   public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false;
 
+  @Private
+  public static final String ASSIGN_MULTIPLE_ENABLED = PREFIX
+      + "per-node-heartbeat.multiple-assignments-enabled";
+
+  @Private
+  public static final boolean DEFAULT_ASSIGN_MULTIPLE_ENABLED = true;
+
+  /** Maximum number of containers to assign on each check-in. */
+  @Private
+  public static final String MAX_ASSIGN_PER_HEARTBEAT = PREFIX
+      + "per-node-heartbeat.maximum-container-assignments";
+
+  @Private
+  public static final int DEFAULT_MAX_ASSIGN_PER_HEARTBEAT = -1;
+
   AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser();
 
   public CapacitySchedulerConfiguration() {
@@ -1473,4 +1488,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
     }
     return userWeights;
   }
+
+  public boolean getAssignMultipleEnabled() {
+    return getBoolean(ASSIGN_MULTIPLE_ENABLED, DEFAULT_ASSIGN_MULTIPLE_ENABLED);
+  }
+
+  public int getMaxAssignPerHeartbeat() {
+    return getInt(MAX_ASSIGN_PER_HEARTBEAT, DEFAULT_MAX_ASSIGN_PER_HEARTBEAT);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18415a3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 24a6575..66ab6ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -231,6 +231,17 @@ public class TestCapacityScheduler {
     }
   }
 
+  private NodeManager registerNode(ResourceManager rm, String hostName,
+      int containerManagerPort, int httpPort, String rackName,
+          Resource capability) throws IOException, YarnException {
+    NodeManager nm = new NodeManager(hostName,
+        containerManagerPort, httpPort, rackName, capability, rm);
+    NodeAddedSchedulerEvent nodeAddEvent1 =
+        new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes()
+            .get(nm.getNodeId()));
+    rm.getResourceScheduler().handle(nodeAddEvent1);
+    return nm;
+  }
 
   @Test (timeout = 30000)
   public void testConfValidation() throws Exception {
@@ -265,12 +276,12 @@ public class TestCapacityScheduler {
     }
   }
 
-  private org.apache.hadoop.yarn.server.resourcemanager.NodeManager
+  private NodeManager
       registerNode(String hostName, int containerManagerPort, int httpPort,
           String rackName, Resource capability)
           throws IOException, YarnException {
-    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm =
-        new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(
+    NodeManager nm =
+        new NodeManager(
             hostName, containerManagerPort, httpPort, rackName, capability,
             resourceManager);
     NodeAddedSchedulerEvent nodeAddEvent1 =
@@ -398,8 +409,216 @@ public class TestCapacityScheduler {
     LOG.info("--- END: testCapacityScheduler ---");
   }
 
-  private void nodeUpdate(
-      org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm) {
+  @Test
+  public void testNotAssignMultiple() throws Exception {
+    LOG.info("--- START: testNotAssignMultiple ---");
+    ResourceManager rm = new ResourceManager() {
+      @Override
+      protected RMNodeLabelsManager createNodeLabelManager() {
+        RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+        mgr.init(getConfig());
+        return mgr;
+      }
+    };
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    csConf.setBoolean(
+        CapacitySchedulerConfiguration.ASSIGN_MULTIPLE_ENABLED, false);
+    setupQueueConfiguration(csConf);
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    rm.init(conf);
+    rm.getRMContext().getContainerTokenSecretManager().rollMasterKey();
+    rm.getRMContext().getNMTokenSecretManager().rollMasterKey();
+    ((AsyncDispatcher) rm.getRMContext().getDispatcher()).start();
+    RMContext mC = mock(RMContext.class);
+    when(mC.getConfigurationProvider()).thenReturn(
+        new LocalConfigurationProvider());
+
+    // Register node1
+    String host0 = "host_0";
+    NodeManager nm0 =
+        registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(10 * GB, 10));
+
+    // ResourceRequest priorities
+    Priority priority0 = Priority.newInstance(0);
+    Priority priority1 = Priority.newInstance(1);
+
+    // Submit an application
+    Application application0 = new Application("user_0", "a1", rm);
+    application0.submit();
+    application0.addNodeManager(host0, 1234, nm0);
+
+    Resource capability00 = Resources.createResource(1 * GB, 1);
+    application0.addResourceRequestSpec(priority0, capability00);
+
+    Resource capability01 = Resources.createResource(2 * GB, 1);
+    application0.addResourceRequestSpec(priority1, capability01);
+
+    Task task00 =
+        new Task(application0, priority0, new String[] {host0});
+    Task task01 =
+        new Task(application0, priority1, new String[] {host0});
+    application0.addTask(task00);
+    application0.addTask(task01);
+
+    // Submit another application
+    Application application1 = new Application("user_1", "b2", rm);
+    application1.submit();
+    application1.addNodeManager(host0, 1234, nm0);
+
+    Resource capability10 = Resources.createResource(3 * GB, 1);
+    application1.addResourceRequestSpec(priority0, capability10);
+
+    Resource capability11 = Resources.createResource(4 * GB, 1);
+    application1.addResourceRequestSpec(priority1, capability11);
+
+    Task task10 = new Task(application1, priority0, new String[] {host0});
+    Task task11 = new Task(application1, priority1, new String[] {host0});
+    application1.addTask(task10);
+    application1.addTask(task11);
+
+    // Send resource requests to the scheduler
+    application0.schedule();
+
+    application1.schedule();
+
+    // Send a heartbeat to kick the tires on the Scheduler
+    LOG.info("Kick!");
+
+    // task00, used=1G
+    nodeUpdate(rm, nm0);
+
+    // Get allocations from the scheduler
+    application0.schedule();
+    application1.schedule();
+    // 1 Task per heart beat should be scheduled
+    checkNodeResourceUsage(3 * GB, nm0); // task00 (1G)
+    checkApplicationResourceUsage(0 * GB, application0);
+    checkApplicationResourceUsage(3 * GB, application1);
+
+    // Another heartbeat
+    nodeUpdate(rm, nm0);
+    application0.schedule();
+    checkApplicationResourceUsage(1 * GB, application0);
+    application1.schedule();
+    checkApplicationResourceUsage(3 * GB, application1);
+    checkNodeResourceUsage(4 * GB, nm0);
+    LOG.info("--- START: testNotAssignMultiple ---");
+  }
+
+  @Test
+  public void testAssignMultiple() throws Exception {
+    LOG.info("--- START: testAssignMultiple ---");
+    ResourceManager rm = new ResourceManager() {
+      @Override
+      protected RMNodeLabelsManager createNodeLabelManager() {
+        RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+        mgr.init(getConfig());
+        return mgr;
+      }
+    };
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    csConf.setBoolean(
+        CapacitySchedulerConfiguration.ASSIGN_MULTIPLE_ENABLED, true);
+    // Each heartbeat will assign 2 containers at most
+    csConf.setInt(CapacitySchedulerConfiguration.MAX_ASSIGN_PER_HEARTBEAT, 2);
+    setupQueueConfiguration(csConf);
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    rm.init(conf);
+    rm.getRMContext().getContainerTokenSecretManager().rollMasterKey();
+    rm.getRMContext().getNMTokenSecretManager().rollMasterKey();
+    ((AsyncDispatcher) rm.getRMContext().getDispatcher()).start();
+    RMContext mC = mock(RMContext.class);
+    when(mC.getConfigurationProvider()).thenReturn(
+            new LocalConfigurationProvider());
+
+    // Register node1
+    String host0 = "host_0";
+    NodeManager nm0 =
+        registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+            Resources.createResource(10 * GB, 10));
+
+    // ResourceRequest priorities
+    Priority priority0 = Priority.newInstance(0);
+    Priority priority1 = Priority.newInstance(1);
+
+    // Submit an application
+    Application application0 = new Application("user_0", "a1", rm);
+    application0.submit();
+    application0.addNodeManager(host0, 1234, nm0);
+
+    Resource capability00 = Resources.createResource(1 * GB, 1);
+    application0.addResourceRequestSpec(priority0, capability00);
+
+    Resource capability01 = Resources.createResource(2 * GB, 1);
+    application0.addResourceRequestSpec(priority1, capability01);
+
+    Task task00 = new Task(application0, priority0, new String[] {host0});
+    Task task01 = new Task(application0, priority1, new String[] {host0});
+    application0.addTask(task00);
+    application0.addTask(task01);
+
+    // Submit another application
+    Application application1 = new Application("user_1", "b2", rm);
+    application1.submit();
+    application1.addNodeManager(host0, 1234, nm0);
+
+    Resource capability10 = Resources.createResource(3 * GB, 1);
+    application1.addResourceRequestSpec(priority0, capability10);
+
+    Resource capability11 = Resources.createResource(4 * GB, 1);
+    application1.addResourceRequestSpec(priority1, capability11);
+
+    Task task10 =
+            new Task(application1, priority0, new String[] {host0});
+    Task task11 =
+            new Task(application1, priority1, new String[] {host0});
+    application1.addTask(task10);
+    application1.addTask(task11);
+
+    // Send resource requests to the scheduler
+    application0.schedule();
+
+    application1.schedule();
+
+    // Send a heartbeat to kick the tires on the Scheduler
+    LOG.info("Kick!");
+
+    // task_0_0, used=1G
+    nodeUpdate(rm, nm0);
+
+    // Get allocations from the scheduler
+    application0.schedule();
+    application1.schedule();
+    // 1 Task per heart beat should be scheduled
+    checkNodeResourceUsage(4 * GB, nm0); // task00 (1G)
+    checkApplicationResourceUsage(1 * GB, application0);
+    checkApplicationResourceUsage(3 * GB, application1);
+
+    // Another heartbeat
+    nodeUpdate(rm, nm0);
+    application0.schedule();
+    checkApplicationResourceUsage(3 * GB, application0);
+    application1.schedule();
+    checkApplicationResourceUsage(7 * GB, application1);
+    checkNodeResourceUsage(10 * GB, nm0);
+    LOG.info("--- START: testAssignMultiple ---");
+  }
+
+  private void nodeUpdate(ResourceManager rm, NodeManager nm) {
+    RMNode node = rm.getRMContext().getRMNodes().get(nm.getNodeId());
+    // Send a heartbeat to kick the tires on the Scheduler
+    NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+    rm.getResourceScheduler().handle(nodeUpdate);
+  }
+
+  private void nodeUpdate(NodeManager nm) {
     RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
     // Send a heartbeat to kick the tires on the Scheduler
     NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
@@ -697,8 +916,7 @@ public class TestCapacityScheduler {
     Assert.assertEquals(expected, application.getUsedResources().getMemorySize());
   }
 
-  private void checkNodeResourceUsage(int expected,
-      org.apache.hadoop.yarn.server.resourcemanager.NodeManager node) {
+  private void checkNodeResourceUsage(int expected, NodeManager node) {
     Assert.assertEquals(expected, node.getUsed().getMemorySize());
     node.checkResourceUsage();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message