hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From epa...@apache.org
Subject hadoop git commit: YARN-4606. CapacityScheduler: applications could get starved because computation of #activeUsers considers pending apps. Contributed by Manikandan R
Date Wed, 25 Jul 2018 17:03:55 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-3.0 14fd114cd -> 299dffc72


YARN-4606. CapacityScheduler: applications could get starved because computation of #activeUsers
considers pending apps. Contributed by Manikandan R

(cherry picked from commit 9485c9aee6e9bb935c3e6ae4da81d70b621781de)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/299dffc7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/299dffc7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/299dffc7

Branch: refs/heads/branch-3.0
Commit: 299dffc72d2289ed9cd7deccee60fe2a0f48eccb
Parents: 14fd114
Author: Eric E Payne <ericp@oath.com>
Authored: Wed Jul 25 16:22:04 2018 +0000
Committer: Eric E Payne <ericp@oath.com>
Committed: Wed Jul 25 16:49:03 2018 +0000

----------------------------------------------------------------------
 .../scheduler/capacity/UsersManager.java        |  27 +++-
 .../capacity/TestCapacityScheduler.java         | 128 +++++++++++++++++++
 .../capacity/TestContainerAllocation.java       |  43 +++++++
 3 files changed, 197 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/299dffc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
index b2b6e79..02836d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
@@ -85,6 +85,7 @@ public class UsersManager implements AbstractUsersManager {
 
   private final QueueMetrics metrics;
   private AtomicInteger activeUsers = new AtomicInteger(0);
+  private AtomicInteger activeUsersWithOnlyPendingApps = new AtomicInteger(0);
   private Map<String, Set<ApplicationId>> usersApplications =
       new HashMap<String, Set<ApplicationId>>();
 
@@ -671,9 +672,23 @@ public class UsersManager implements AbstractUsersManager {
     // update in local storage
     userLimitPerSchedulingMode.put(schedulingMode, computedUserLimit);
 
+    computeNumActiveUsersWithOnlyPendingApps();
+
     return userLimitPerSchedulingMode;
   }
 
+  // This method is called within the lock.
+  private void computeNumActiveUsersWithOnlyPendingApps() {
+    int numPendingUsers = 0;
+    for (User user : users.values()) {
+      if ((user.getPendingApplications() > 0)
+          && (user.getActiveApplications() <= 0)) {
+        numPendingUsers++;
+      }
+    }
+    activeUsersWithOnlyPendingApps = new AtomicInteger(numPendingUsers);
+  }
+
   private Resource computeUserLimit(String userName, Resource clusterResource,
       String nodePartition, SchedulingMode schedulingMode, boolean activeUser) {
     Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
@@ -842,6 +857,11 @@ public class UsersManager implements AbstractUsersManager {
     try {
       this.writeLock.lock();
 
+      User userDesc = getUser(user);
+      if (userDesc != null && userDesc.getActiveApplications() <= 0) {
+        return;
+      }
+
       Set<ApplicationId> userApps = usersApplications.get(user);
       if (userApps == null) {
         userApps = new HashSet<ApplicationId>();
@@ -896,7 +916,7 @@ public class UsersManager implements AbstractUsersManager {
 
   @Override
   public int getNumActiveUsers() {
-    return activeUsers.get();
+    return activeUsers.get() + activeUsersWithOnlyPendingApps.get();
   }
 
   float sumActiveUsersTimesWeights() {
@@ -1093,4 +1113,9 @@ public class UsersManager implements AbstractUsersManager {
       this.writeLock.unlock();
     }
   }
+
+  @VisibleForTesting
+  public int getNumActiveUsersWithOnlyPendingApps() {
+    return activeUsersWithOnlyPendingApps.get();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/299dffc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 409f2b4..dfc9718 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -4894,4 +4894,132 @@ public class TestCapacityScheduler {
       }
     }
   }
+
+  @Test
+  public void testMoveAppWithActiveUsersWithOnlyPendingApps() throws Exception {
+
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+
+    CapacitySchedulerConfiguration newConf =
+        new CapacitySchedulerConfiguration(conf);
+
+    // Define top-level queues
+    newConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[] { "a", "b" });
+
+    newConf.setCapacity(A, 50);
+    newConf.setCapacity(B, 50);
+
+    // Define 2nd-level queues
+    newConf.setQueues(A, new String[] { "a1" });
+    newConf.setCapacity(A1, 100);
+    newConf.setUserLimitFactor(A1, 2.0f);
+    newConf.setMaximumAMResourcePercentPerPartition(A1, "", 0.1f);
+
+    newConf.setQueues(B, new String[] { "b1" });
+    newConf.setCapacity(B1, 100);
+    newConf.setUserLimitFactor(B1, 2.0f);
+
+    LOG.info("Setup top-level queues a and b");
+
+    MockRM rm = new MockRM(newConf);
+    rm.start();
+
+    CapacityScheduler scheduler =
+        (CapacityScheduler) rm.getResourceScheduler();
+
+    MockNM nm1 = rm.registerNode("h1:1234", 16 * GB);
+
+    // submit an app
+    RMApp app = rm.submitApp(GB, "test-move-1", "u1", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm1);
+
+    ApplicationAttemptId appAttemptId =
+        rm.getApplicationReport(app.getApplicationId())
+            .getCurrentApplicationAttemptId();
+
+    RMApp app2 = rm.submitApp(1 * GB, "app", "u2", null, "a1");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
+
+    RMApp app3 = rm.submitApp(1 * GB, "app", "u3", null, "a1");
+
+    RMApp app4 = rm.submitApp(1 * GB, "app", "u4", null, "a1");
+
+    // Each application asks 50 * 1GB containers
+    am1.allocate("*", 1 * GB, 50, null);
+    am2.allocate("*", 1 * GB, 50, null);
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    // check preconditions
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(4, appsInA1.size());
+    String queue =
+        scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertEquals("a1", queue);
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(appAttemptId));
+    assertEquals(4, appsInA.size());
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(4, appsInRoot.size());
+
+    List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
+    assertTrue(appsInB1.isEmpty());
+
+    List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.isEmpty());
+
+    UsersManager um =
+        (UsersManager) scheduler.getQueue("a1").getAbstractUsersManager();
+
+    assertEquals(4, um.getNumActiveUsers());
+    assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps());
+
+    // now move the app
+    scheduler.moveAllApps("a1", "b1");
+
+    //Triggering this event so that user limit computation can
+    //happen again
+    for (int i = 0; i < 10; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      Thread.sleep(500);
+    }
+
+    // check postconditions
+    appsInB1 = scheduler.getAppsInQueue("b1");
+
+    assertEquals(4, appsInB1.size());
+    queue =
+        scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
+            .getQueueName();
+    Assert.assertEquals("b1", queue);
+
+    appsInB = scheduler.getAppsInQueue("b");
+    assertTrue(appsInB.contains(appAttemptId));
+    assertEquals(4, appsInB.size());
+
+    appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(appAttemptId));
+    assertEquals(4, appsInRoot.size());
+
+    List<ApplicationAttemptId> oldAppsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(0, oldAppsInA1.size());
+
+    UsersManager um_b1 =
+        (UsersManager) scheduler.getQueue("b1").getAbstractUsersManager();
+
+    assertEquals(2, um_b1.getNumActiveUsers());
+    assertEquals(2, um_b1.getNumActiveUsersWithOnlyPendingApps());
+
+    appsInB1 = scheduler.getAppsInQueue("b1");
+    assertEquals(4, appsInB1.size());
+    rm.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/299dffc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index b1ca72a..48a0f3e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -939,4 +939,47 @@ public class TestContainerAllocation {
 
     rm1.close();
   }
+
+  @Test
+  public void testActiveUsersWithOnlyPendingApps() throws Exception {
+
+    CapacitySchedulerConfiguration newConf =
+        new CapacitySchedulerConfiguration(conf);
+    newConf.setMaximumAMResourcePercentPerPartition(
+        CapacitySchedulerConfiguration.ROOT + ".default", "", 0.2f);
+    MockRM rm1 = new MockRM(newConf);
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "u1", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "u2", null, "default");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+    RMApp app3 = rm1.submitApp(1 * GB, "app", "u3", null, "default");
+
+    RMApp app4 = rm1.submitApp(1 * GB, "app", "u4", null, "default");
+
+    // Each application asks 50 * 1GB containers
+    am1.allocate("*", 1 * GB, 50, null);
+    am2.allocate("*", 1 * GB, 50, null);
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    for (int i = 0; i < 10; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      Thread.sleep(1000);
+    }
+    LeafQueue lq = (LeafQueue) cs.getQueue("default");
+    UsersManager um = (UsersManager) lq.getAbstractUsersManager();
+
+    Assert.assertEquals(4, um.getNumActiveUsers());
+    Assert.assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps());
+    Assert.assertEquals(2, lq.getMetrics().getAppsPending());
+    rm1.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message