hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vvasu...@apache.org
Subject [1/2] hadoop git commit: YARN-4082. Container shouldn't be killed when node's label updated. Contributed by Wangda Tan.
Date Tue, 01 Sep 2015 08:51:32 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 2345627ad -> 855e0f8b0
  refs/heads/trunk f4d96be6c -> bf669b6d9


YARN-4082. Container shouldn't be killed when node's label updated. Contributed by Wangda
Tan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bf669b6d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bf669b6d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bf669b6d

Branch: refs/heads/trunk
Commit: bf669b6d9f8ba165e30b8823218d625a49958925
Parents: f4d96be
Author: Varun Vasudev <vvasudev@apache.org>
Authored: Tue Sep 1 14:19:11 2015 +0530
Committer: Varun Vasudev <vvasudev@apache.org>
Committed: Tue Sep 1 14:19:11 2015 +0530

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/capacity/AbstractCSQueue.java     |  27 ++
 .../scheduler/capacity/CSQueue.java             |  26 ++
 .../scheduler/capacity/CapacityScheduler.java   |  40 +--
 .../scheduler/capacity/LeafQueue.java           |  16 ++
 .../scheduler/common/fica/FiCaSchedulerApp.java |   9 +
 .../TestCapacitySchedulerNodeLabelUpdate.java   | 249 ++++++++++++++++---
 7 files changed, 314 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf669b6d/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 80cf793..999654d 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -804,6 +804,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3896. RMNode transitioned from RUNNING to REBOOTED because its response id 
     has not been reset synchronously. (Jun Gong via rohithsharmaks)
 
+    YARN-4082. Container shouldn't be killed when node's label updated.
+    (Wangda Tan via vvasudev)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf669b6d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 792c25c..0ae4d1a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -543,6 +544,32 @@ public abstract class AbstractCSQueue implements CSQueue {
     }
   }
   
+  @Override
+  public void incUsedResource(String nodeLabel, Resource resourceToInc,
+      SchedulerApplicationAttempt application) {
+    if (nodeLabel == null) {
+      nodeLabel = RMNodeLabelsManager.NO_LABEL;
+    }
+    // ResourceUsage has its own lock, no addition lock needs here.
+    queueUsage.incUsed(nodeLabel, resourceToInc);
+    if (null != parent) {
+      parent.incUsedResource(nodeLabel, resourceToInc, null);
+    }
+  }
+
+  @Override
+  public void decUsedResource(String nodeLabel, Resource resourceToDec,
+      SchedulerApplicationAttempt application) {
+    if (nodeLabel == null) {
+      nodeLabel = RMNodeLabelsManager.NO_LABEL;
+    }
+    // ResourceUsage has its own lock, no addition lock needs here.
+    queueUsage.decUsed(nodeLabel, resourceToDec);
+    if (null != parent) {
+      parent.decUsedResource(nodeLabel, resourceToDec, null);
+    }
+  }
+
   /**
    * Return if the queue has pending resource on given nodePartition and
    * schedulingMode. 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf669b6d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index b06a646..9855dd4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 
@@ -287,4 +288,29 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue
{
    * @return resourceUsage
    */
   public ResourceUsage getQueueResourceUsage();
+
+  /**
+   * When partition of node updated, we will update queue's resource usage if it
+   * has container(s) running on that.
+   */
+  public void incUsedResource(String nodePartition, Resource resourceToInc,
+      SchedulerApplicationAttempt application);
+
+  /**
+   * When partition of node updated, we will update queue's resource usage if it
+   * has container(s) running on that.
+   */
+  public void decUsedResource(String nodePartition, Resource resourceToDec,
+      SchedulerApplicationAttempt application);
+
+  /**
+   * When an outstanding resource is fulfilled or canceled, calling this will
+   * decrease pending resource in a queue.
+   *
+   * @param nodeLabel
+   *          asked by application
+   * @param resourceToDec
+   *          new resource asked
+   */
+  public void decPendingResource(String nodeLabel, Resource resourceToDec);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf669b6d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index cff1fe5..b5ccbd9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1040,12 +1040,6 @@ public class CapacityScheduler extends
   
   /**
    * Process node labels update on a node.
-   * 
-   * TODO: Currently capacity scheduler will kill containers on a node when
-   * labels on the node changed. It is a simply solution to ensure guaranteed
-   * capacity on labels of queues. When YARN-2498 completed, we can let
-   * preemption policy to decide if such containers need to be killed or just
-   * keep them running.
    */
   private synchronized void updateLabelsOnNode(NodeId nodeId,
       Set<String> newLabels) {
@@ -1060,17 +1054,31 @@ public class CapacityScheduler extends
       return;
     }
     
-    // Kill running containers since label is changed
+    // Get new partition, we have only one partition per node
+    String newPartition;
+    if (newLabels.isEmpty()) {
+      newPartition = RMNodeLabelsManager.NO_LABEL;
+    } else {
+      newPartition = newLabels.iterator().next();
+    }
+
+    // old partition as well
+    String oldPartition = node.getPartition();
+
+    // Update resources of these containers
     for (RMContainer rmContainer : node.getRunningContainers()) {
-      ContainerId containerId = rmContainer.getContainerId();
-      completedContainer(rmContainer, 
-          ContainerStatus.newInstance(containerId,
-              ContainerState.COMPLETE, 
-              String.format(
-                  "Container=%s killed since labels on the node=%s changed",
-                  containerId.toString(), nodeId.toString()),
-              ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
-          RMContainerEventType.KILL);
+      FiCaSchedulerApp application =
+          getApplicationAttempt(rmContainer.getApplicationAttemptId());
+      if (null != application) {
+        application.nodePartitionUpdated(rmContainer, oldPartition,
+            newPartition);
+      } else {
+        LOG.warn("There's something wrong, some RMContainers running on"
+            + " a node, but we cannot find SchedulerApplicationAttempt for it. Node="
+            + node.getNodeID() + " applicationAttemptId="
+            + rmContainer.getApplicationAttemptId());
+        continue;
+      }
     }
     
     // Unreserve container on this node

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf669b6d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index ff1baff..658eae1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -1262,6 +1262,22 @@ public class LeafQueue extends AbstractCSQueue {
     }
   }
 
+  @Override
+  public void incUsedResource(String nodeLabel, Resource resourceToInc,
+      SchedulerApplicationAttempt application) {
+    getUser(application.getUser()).getResourceUsage().incUsed(nodeLabel,
+        resourceToInc);
+    super.incUsedResource(nodeLabel, resourceToInc, application);
+  }
+
+  @Override
+  public void decUsedResource(String nodeLabel, Resource resourceToDec,
+      SchedulerApplicationAttempt application) {
+    getUser(application.getUser()).getResourceUsage().decUsed(nodeLabel,
+        resourceToDec);
+    super.decUsedResource(nodeLabel, resourceToDec, application);
+  }
+
   @VisibleForTesting
   public static class User {
     ResourceUsage userResourceUsage = new ResourceUsage();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf669b6d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 74d77f5..300cba9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -443,4 +443,13 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
           schedulingMode, currentResourceLimits, reservedContainer);
     }
   }
+
+  public void nodePartitionUpdated(RMContainer rmContainer, String oldPartition,
+      String newPartition) {
+    Resource containerResource = rmContainer.getAllocatedResource();
+    this.attemptResourceUsage.decUsed(oldPartition, containerResource);
+    this.attemptResourceUsage.incUsed(newPartition, containerResource);
+    getCSLeafQueue().decUsedResource(oldPartition, containerResource, this);
+    getCSLeafQueue().incUsedResource(newPartition, containerResource, this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf669b6d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
index 0a701d8..94af4e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
@@ -19,22 +19,29 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
 import org.junit.Assert;
 import org.junit.Before;
@@ -97,8 +104,18 @@ public class TestCapacitySchedulerNodeLabelUpdate {
         .getMemory());
   }
 
+  private void checkUserUsedResource(MockRM rm, String queueName,
+      String userName, String partition, int memory) {
+    CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
+    LeafQueue queue = (LeafQueue) scheduler.getQueue(queueName);
+    LeafQueue.User user = queue.getUser(userName);
+    Assert.assertEquals(memory,
+        user.getResourceUsage().getUsed(partition).getMemory());
+  }
+
   @Test(timeout = 60000)
-  public void testResourceUsage() throws Exception {
+  public void testRequestContainerAfterNodePartitionUpdated()
+      throws Exception {
     // set node -> label
     mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y",
         "z"));
@@ -160,7 +177,8 @@ public class TestCapacitySchedulerNodeLabelUpdate {
   }
 
   @Test (timeout = 60000)
-  public void testNodeUpdate() throws Exception {
+  public void testResourceUsageWhenNodeUpdatesPartition()
+      throws Exception {
     // set node -> label
     mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z"));
     
@@ -183,8 +201,9 @@ public class TestCapacitySchedulerNodeLabelUpdate {
     MockNM nm1 = rm.registerNode("h1:1234", 8000);
     MockNM nm2 = rm.registerNode("h2:1234", 8000);
     MockNM nm3 = rm.registerNode("h3:1234", 8000);
-    
-    ContainerId containerId;
+
+    ContainerId containerId1;
+    ContainerId containerId2;
 
     // launch an app to queue a1 (label = x), and check all container will
     // be allocated in h1
@@ -193,9 +212,9 @@ public class TestCapacitySchedulerNodeLabelUpdate {
 
     // request a container.
     am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
-    containerId =
-        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
-    Assert.assertTrue(rm.waitForState(nm1, containerId,
+    containerId1 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+    containerId2 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    Assert.assertTrue(rm.waitForState(nm1, containerId2,
         RMContainerState.ALLOCATED, 10 * 1000));
     
     // check used resource:
@@ -203,55 +222,205 @@ public class TestCapacitySchedulerNodeLabelUpdate {
     checkUsedResource(rm, "a", 1024, "x");
     checkUsedResource(rm, "a", 1024);
     
-    // change h1's label to z, container should be killed
-    mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
-        toSet("z")));
-    Assert.assertTrue(rm.waitForState(nm1, containerId,
-        RMContainerState.KILLED, 10 * 1000));
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId());
     
-    // check used resource:
-    // queue-a used x=0G, ""=1G ("" not changed)
+    // change h1's label to z
+    cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
+        toSet("z"))));
     checkUsedResource(rm, "a", 0, "x");
+    checkUsedResource(rm, "a", 1024, "z");
     checkUsedResource(rm, "a", 1024);
+    checkUsedResource(rm, "root", 0, "x");
+    checkUsedResource(rm, "root", 1024, "z");
+    checkUsedResource(rm, "root", 1024);
+    checkUserUsedResource(rm, "a", "user", "x", 0);
+    checkUserUsedResource(rm, "a", "user", "z", 1024);
+    Assert.assertEquals(0,
+        app.getAppAttemptResourceUsage().getUsed("x").getMemory());
+    Assert.assertEquals(1024,
+        app.getAppAttemptResourceUsage().getUsed("z").getMemory());
     
-    // request a container with label = y
-    am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "y");
-    containerId =
-        ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
-    Assert.assertTrue(rm.waitForState(nm2, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    
-    // check used resource:
-    // queue-a used y=1G, ""=1G
+    // change h1's label to y
+    cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
+        toSet("y"))));
+    checkUsedResource(rm, "a", 0, "x");
     checkUsedResource(rm, "a", 1024, "y");
+    checkUsedResource(rm, "a", 0, "z");
     checkUsedResource(rm, "a", 1024);
+    checkUsedResource(rm, "root", 0, "x");
+    checkUsedResource(rm, "root", 1024, "y");
+    checkUsedResource(rm, "root", 0, "z");
+    checkUsedResource(rm, "root", 1024);
+    checkUserUsedResource(rm, "a", "user", "x", 0);
+    checkUserUsedResource(rm, "a", "user", "y", 1024);
+    checkUserUsedResource(rm, "a", "user", "z", 0);
+    Assert.assertEquals(0,
+        app.getAppAttemptResourceUsage().getUsed("x").getMemory());
+    Assert.assertEquals(1024,
+        app.getAppAttemptResourceUsage().getUsed("y").getMemory());
+    Assert.assertEquals(0,
+        app.getAppAttemptResourceUsage().getUsed("z").getMemory());
     
-    // change h2's label to no label, container should be killed
-    mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h2", 0),
-        CommonNodeLabelsManager.EMPTY_STRING_SET));
-    Assert.assertTrue(rm.waitForState(nm1, containerId,
-        RMContainerState.KILLED, 10 * 1000));
+    // change h1's label to no label
+    Set<String> emptyLabels = new HashSet<>();
+    Map<NodeId,Set<String>> map = ImmutableMap.of(nm1.getNodeId(),
+        emptyLabels);
+    cs.handle(new NodeLabelsUpdateSchedulerEvent(map));
+    checkUsedResource(rm, "a", 0, "x");
+    checkUsedResource(rm, "a", 0, "y");
+    checkUsedResource(rm, "a", 0, "z");
+    checkUsedResource(rm, "a", 2048);
+    checkUsedResource(rm, "root", 0, "x");
+    checkUsedResource(rm, "root", 0, "y");
+    checkUsedResource(rm, "root", 0, "z");
+    checkUsedResource(rm, "root", 2048);
+    checkUserUsedResource(rm, "a", "user", "x", 0);
+    checkUserUsedResource(rm, "a", "user", "y", 0);
+    checkUserUsedResource(rm, "a", "user", "z", 0);
+    checkUserUsedResource(rm, "a", "user", "", 2048);
+    Assert.assertEquals(0,
+        app.getAppAttemptResourceUsage().getUsed("x").getMemory());
+    Assert.assertEquals(0,
+        app.getAppAttemptResourceUsage().getUsed("y").getMemory());
+    Assert.assertEquals(0,
+        app.getAppAttemptResourceUsage().getUsed("z").getMemory());
+    Assert.assertEquals(2048,
+        app.getAppAttemptResourceUsage().getUsed("").getMemory());
+
+    // Finish the two containers, we should see used resource becomes 0
+    cs.completedContainer(cs.getRMContainer(containerId2),
+        ContainerStatus.newInstance(containerId2, ContainerState.COMPLETE, "",
+            ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
+        RMContainerEventType.KILL);
+    cs.completedContainer(cs.getRMContainer(containerId1),
+        ContainerStatus.newInstance(containerId1, ContainerState.COMPLETE, "",
+            ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
+        RMContainerEventType.KILL);
     
-    // check used resource:
-    // queue-a used x=0G, y=0G, ""=1G ("" not changed)
     checkUsedResource(rm, "a", 0, "x");
     checkUsedResource(rm, "a", 0, "y");
-    checkUsedResource(rm, "a", 1024);
+    checkUsedResource(rm, "a", 0, "z");
+    checkUsedResource(rm, "a", 0);
+    checkUsedResource(rm, "root", 0, "x");
+    checkUsedResource(rm, "root", 0, "y");
+    checkUsedResource(rm, "root", 0, "z");
+    checkUsedResource(rm, "root", 0);
+    checkUserUsedResource(rm, "a", "user", "x", 0);
+    checkUserUsedResource(rm, "a", "user", "y", 0);
+    checkUserUsedResource(rm, "a", "user", "z", 0);
+    checkUserUsedResource(rm, "a", "user", "", 0);
+
+    rm.close();
+  }
+
+
+  @Test (timeout = 60000)
+  public void testComplexResourceUsageWhenNodeUpdatesPartition()
+      throws Exception {
+    /*
+     * This test is similar to testResourceUsageWhenNodeUpdatesPartition, this
+     * will include multiple applications, multiple users and multiple
+     * containers running on a single node, size of each container is 1G
+     *
+     * Node 1
+     * ------
+     * App1-container3
+     * App2-container2
+     * App2-Container3
+     *
+     * Node 2
+     * ------
+     * App2-container1
+     * App1-container1
+     * App1-container2
+     */
+    // set node -> label
+    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z"));
     
+    // set mapping:
+    // h1 -> x
+    // h2 -> y
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm.getRMContext().setNodeLabelManager(mgr);
+    rm.start();
+    MockNM nm1 = rm.registerNode("h1:1234", 80000);
+    MockNM nm2 = rm.registerNode("h2:1234", 80000);
+
+    // app1
+    RMApp app1 = rm.submitApp(GB, "app", "u1", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+
+    // c2 on n1, c3 on n2
+    am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
+    ContainerId containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    Assert.assertTrue(rm.waitForState(nm1, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    am1.allocate("*", GB, 1, new ArrayList<ContainerId>());
     containerId =
-        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+    Assert.assertTrue(rm.waitForState(nm2, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
     
-    // change h3's label to z, AM container should be killed
-    mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h3", 0),
-        toSet("z")));
+    // app2
+    RMApp app2 = rm.submitApp(GB, "app", "u2", null, "a");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+
+    // c2/c3 on n1
+    am2.allocate("*", GB, 2, new ArrayList<ContainerId>(), "x");
+    containerId =
+        ContainerId.newContainerId(am2.getApplicationAttemptId(), 3);
     Assert.assertTrue(rm.waitForState(nm1, containerId,
-        RMContainerState.KILLED, 10 * 1000));
+        RMContainerState.ALLOCATED, 10 * 1000));
     
     // check used resource:
-    // queue-a used x=0G, y=0G, ""=1G ("" not changed)
+    // queue-a used x=1G, ""=1G
+    checkUsedResource(rm, "a", 3 * GB, "x");
+    checkUsedResource(rm, "a", 3 * GB);
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    FiCaSchedulerApp application1 =
+        cs.getApplicationAttempt(am1.getApplicationAttemptId());
+    FiCaSchedulerApp application2 =
+        cs.getApplicationAttempt(am2.getApplicationAttemptId());
+
+    // change h1's label to z
+    cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
+        toSet("z"))));
     checkUsedResource(rm, "a", 0, "x");
-    checkUsedResource(rm, "a", 0, "y");
-    checkUsedResource(rm, "a", 0);
+    checkUsedResource(rm, "a", 3 * GB, "z");
+    checkUsedResource(rm, "a", 3 * GB);
+    checkUsedResource(rm, "root", 0, "x");
+    checkUsedResource(rm, "root", 3 * GB, "z");
+    checkUsedResource(rm, "root", 3 * GB);
+    checkUserUsedResource(rm, "a", "u1", "x", 0 * GB);
+    checkUserUsedResource(rm, "a", "u1", "z", 1 * GB);
+    checkUserUsedResource(rm, "a", "u1", "", 2 * GB);
+    checkUserUsedResource(rm, "a", "u2", "x", 0 * GB);
+    checkUserUsedResource(rm, "a", "u2", "z", 2 * GB);
+    checkUserUsedResource(rm, "a", "u2", "", 1 * GB);
+    Assert.assertEquals(0,
+        application1.getAppAttemptResourceUsage().getUsed("x").getMemory());
+    Assert.assertEquals(1 * GB,
+        application1.getAppAttemptResourceUsage().getUsed("z").getMemory());
+    Assert.assertEquals(2 * GB,
+        application1.getAppAttemptResourceUsage().getUsed("").getMemory());
+    Assert.assertEquals(0,
+        application2.getAppAttemptResourceUsage().getUsed("x").getMemory());
+    Assert.assertEquals(2 * GB,
+        application2.getAppAttemptResourceUsage().getUsed("z").getMemory());
+    Assert.assertEquals(1 * GB,
+        application2.getAppAttemptResourceUsage().getUsed("").getMemory());
 
     rm.close();
   }


Mime
View raw message