hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rake...@apache.org
Subject [32/50] [abbrv] hadoop git commit: YARN-5788. Apps not activiated and AM limit resource in UI and REST not updated after -replaceLabelsOnNode (Bibin A Chundatt via Varun Saxena)
Date Wed, 02 Nov 2016 17:39:11 GMT
YARN-5788. Apps not activiated and AM limit resource in UI and REST not updated after -replaceLabelsOnNode
(Bibin A Chundatt via Varun Saxena)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7d2d8d25
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7d2d8d25
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7d2d8d25

Branch: refs/heads/HDFS-10285
Commit: 7d2d8d25ba0cb10a3c6192d4123f27ede5ef2ba6
Parents: 310aa46
Author: Varun Saxena <varunsaxena@apache.org>
Authored: Tue Nov 1 15:32:04 2016 +0530
Committer: Varun Saxena <varunsaxena@apache.org>
Committed: Tue Nov 1 15:32:04 2016 +0530

----------------------------------------------------------------------
 .../scheduler/capacity/CapacityScheduler.java   | 105 ++++++++++---------
 .../TestCapacitySchedulerNodeLabelUpdate.java   |  74 ++++++++++++-
 2 files changed, 128 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7d2d8d25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index cfdcb10..d759d47 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1126,57 +1126,52 @@ public class CapacityScheduler extends
       writeLock.unlock();
     }
   }
-  
+
   /**
    * Process node labels update on a node.
    */
   private void updateLabelsOnNode(NodeId nodeId,
       Set<String> newLabels) {
-    try {
-      writeLock.lock();
-      FiCaSchedulerNode node = nodeTracker.getNode(nodeId);
-      if (null == node) {
-        return;
-      }
-
-      // Get new partition, we have only one partition per node
-      String newPartition;
-      if (newLabels.isEmpty()) {
-        newPartition = RMNodeLabelsManager.NO_LABEL;
-      } else{
-        newPartition = newLabels.iterator().next();
-      }
+    FiCaSchedulerNode node = nodeTracker.getNode(nodeId);
+    if (null == node) {
+      return;
+    }
 
-      // old partition as well
-      String oldPartition = node.getPartition();
+    // Get new partition, we have only one partition per node
+    String newPartition;
+    if (newLabels.isEmpty()) {
+      newPartition = RMNodeLabelsManager.NO_LABEL;
+    } else{
+      newPartition = newLabels.iterator().next();
+    }
 
-      // Update resources of these containers
-      for (RMContainer rmContainer : node.getCopiedListOfRunningContainers()) {
-        FiCaSchedulerApp application = getApplicationAttempt(
-            rmContainer.getApplicationAttemptId());
-        if (null != application) {
-          application.nodePartitionUpdated(rmContainer, oldPartition,
-              newPartition);
-        } else{
-          LOG.warn("There's something wrong, some RMContainers running on"
-              + " a node, but we cannot find SchedulerApplicationAttempt "
-              + "for it. Node=" + node.getNodeID() + " applicationAttemptId="
-              + rmContainer.getApplicationAttemptId());
-          continue;
-        }
-      }
+    // old partition as well
+    String oldPartition = node.getPartition();
 
-      // Unreserve container on this node
-      RMContainer reservedContainer = node.getReservedContainer();
-      if (null != reservedContainer) {
-        killReservedContainer(reservedContainer);
+    // Update resources of these containers
+    for (RMContainer rmContainer : node.getCopiedListOfRunningContainers()) {
+      FiCaSchedulerApp application = getApplicationAttempt(
+          rmContainer.getApplicationAttemptId());
+      if (null != application) {
+        application.nodePartitionUpdated(rmContainer, oldPartition,
+            newPartition);
+      } else{
+        LOG.warn("There's something wrong, some RMContainers running on"
+            + " a node, but we cannot find SchedulerApplicationAttempt "
+            + "for it. Node=" + node.getNodeID() + " applicationAttemptId="
+            + rmContainer.getApplicationAttemptId());
+        continue;
       }
+    }
 
-      // Update node labels after we've done this
-      node.updateLabels(newLabels);
-    } finally {
-      writeLock.unlock();
+    // Unreserve container on this node
+    RMContainer reservedContainer = node.getReservedContainer();
+    if (null != reservedContainer) {
+      killReservedContainer(reservedContainer);
     }
+
+    // Update node labels after we've done this
+    node.updateLabels(newLabels);
   }
 
   private void updateSchedulerHealth(long now, FiCaSchedulerNode node,
@@ -1371,13 +1366,8 @@ public class CapacityScheduler extends
     {
       NodeLabelsUpdateSchedulerEvent labelUpdateEvent =
           (NodeLabelsUpdateSchedulerEvent) event;
-      
-      for (Entry<NodeId, Set<String>> entry : labelUpdateEvent
-          .getUpdatedNodeToLabels().entrySet()) {
-        NodeId id = entry.getKey();
-        Set<String> labels = entry.getValue();
-        updateLabelsOnNode(id, labels);
-      }
+
+      updateNodeLabelsAndQueueResource(labelUpdateEvent);
     }
     break;
     case NODE_UPDATE:
@@ -1482,6 +1472,27 @@ public class CapacityScheduler extends
     }
   }
 
+  /**
+   * Process node labels update.
+   */
+  private void updateNodeLabelsAndQueueResource(
+      NodeLabelsUpdateSchedulerEvent labelUpdateEvent) {
+    try {
+      writeLock.lock();
+      for (Entry<NodeId, Set<String>> entry : labelUpdateEvent
+          .getUpdatedNodeToLabels().entrySet()) {
+        NodeId id = entry.getKey();
+        Set<String> labels = entry.getValue();
+        updateLabelsOnNode(id, labels);
+      }
+      Resource clusterResource = getClusterResource();
+      root.updateClusterResource(clusterResource,
+          new ResourceLimits(clusterResource));
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
   private void addNode(RMNode nodeManager) {
     try {
       writeLock.lock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7d2d8d25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
index 0ae77f2..439e9df 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
@@ -413,7 +413,7 @@ public class TestCapacitySchedulerNodeLabelUpdate {
     rm.close();
   }
 
-  @Test(timeout = 3000000)
+  @Test(timeout = 300000)
   public void testMoveApplicationWithLabel() throws Exception {
     // set node -> label
     mgr.addToCluserNodeLabelsWithDefaultExclusivity(
@@ -589,7 +589,49 @@ public class TestCapacitySchedulerNodeLabelUpdate {
     rm.close();
   }
 
-  @Test (timeout = 60000)
+  @Test
+  public void testAMResourceLimitNodeUpdatePartition() throws Exception {
+    conf.setInt("yarn.scheduler.minimum-allocation-mb", 64);
+    // inject node label manager
+    MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    rm.getRMContext().setNodeLabelManager(mgr);
+    rm.start();
+    rm.registerNode("h1:1234", 6400);
+    mgr.addToCluserNodeLabelsWithDefaultExclusivity(
+        ImmutableSet.of("x", "y", "z"));
+
+    // .1 percentage of 6400 will be for am
+    checkAMResourceLimit(rm, "a", 640, "");
+    checkAMResourceLimit(rm, "a", 0, "x");
+    checkAMResourceLimit(rm, "a", 0, "y");
+    checkAMResourceLimit(rm, "a", 0, "z");
+
+    mgr.replaceLabelsOnNode(
+        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+    rm.drainEvents();
+
+    checkAMResourceLimit(rm, "a", 640, "x");
+    checkAMResourceLimit(rm, "a", 0, "y");
+    checkAMResourceLimit(rm, "a", 0, "z");
+    checkAMResourceLimit(rm, "a", 0, "");
+
+    // Switch
+    mgr.replaceLabelsOnNode(
+        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
+    rm.drainEvents();
+
+    checkAMResourceLimit(rm, "a", 0, "x");
+    checkAMResourceLimit(rm, "a", 640, "y");
+    checkAMResourceLimit(rm, "a", 0, "z");
+    checkAMResourceLimit(rm, "a", 0, "");
+  }
+
+  @Test(timeout = 60000)
   public void testAMResourceUsageWhenNodeUpdatesPartition()
       throws Exception {
     // set node -> label
@@ -638,8 +680,8 @@ public class TestCapacitySchedulerNodeLabelUpdate {
     FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId());
 
     // change h1's label to z
-    cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
-        toSet("z"))));
+    cs.handle(new NodeLabelsUpdateSchedulerEvent(
+        ImmutableMap.of(nm1.getNodeId(), toSet("z"))));
 
     // Now the resources also should change from x to z. Verify AM and normal
     // used resource are successfully changed.
@@ -677,4 +719,28 @@ public class TestCapacitySchedulerNodeLabelUpdate {
 
     rm.close();
   }
+
+  private void checkAMResourceLimit(MockRM rm, String queuename, int memory,
+      String label) throws InterruptedException {
+    Assert.assertEquals(memory,
+        waitForResourceUpdate(rm, queuename, memory, label, 3000L));
+  }
+
+  private long waitForResourceUpdate(MockRM rm, String queuename, long memory,
+      String label, long timeout) throws InterruptedException {
+    long start = System.currentTimeMillis();
+    long memorySize = 0;
+    while (System.currentTimeMillis() - start < timeout) {
+      CapacityScheduler scheduler =
+          (CapacityScheduler) rm.getResourceScheduler();
+      CSQueue queue = scheduler.getQueue(queuename);
+      memorySize =
+          queue.getQueueResourceUsage().getAMLimit(label).getMemorySize();
+      if (memory == memorySize) {
+        return memorySize;
+      }
+      Thread.sleep(100);
+    }
+    return memorySize;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message