hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject [2/6] hadoop git commit: YARN-3099. Capacity Scheduler LeafQueue/ParentQueue should use ResourceUsage to track used-resources-by-label. Contributed by Wangda Tan
Date Mon, 02 Feb 2015 17:52:30 GMT
YARN-3099. Capacity Scheduler LeafQueue/ParentQueue should use ResourceUsage to track used-resources-by-label.
Contributed by Wangda Tan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d8ddc154
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d8ddc154
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d8ddc154

Branch: refs/heads/HDFS-EC
Commit: d8ddc1542b5304ee8a2602a3f3e78c12f91586ec
Parents: d4b9b40
Author: Jian He <jianhe@apache.org>
Authored: Fri Jan 30 15:15:20 2015 -0800
Committer: Zhe Zhang <zhz@apache.org>
Committed: Mon Feb 2 09:52:19 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/capacity/AbstractCSQueue.java     |  48 +++----
 .../scheduler/capacity/LeafQueue.java           | 128 +++++++------------
 .../scheduler/capacity/ParentQueue.java         |  25 ++--
 .../TestWorkPreservingRMRestart.java            |   2 +-
 .../scheduler/capacity/TestCSQueueUtils.java    |  24 ++--
 6 files changed, 93 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8ddc154/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index bdaf022..9121dcf 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -446,6 +446,9 @@ Release 2.7.0 - UNRELEASED
     YARN-3029. FSDownload.unpack() uses local locale for FS case conversion, may not
     work everywhere. (Varun Saxena via ozawa)
 
+    YARN-3099. Capacity Scheduler LeafQueue/ParentQueue should use ResourceUsage
+    to track used-resources-by-label.(Wangda Tan via jianhe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8ddc154/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 5ac6058..c4651da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -36,9 +36,11 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 
 public abstract class AbstractCSQueue implements CSQueue {
@@ -63,10 +65,8 @@ public abstract class AbstractCSQueue implements CSQueue {
   Set<String> accessibleLabels;
   RMNodeLabelsManager labelManager;
   String defaultLabelExpression;
-  Resource usedResources = Resources.createResource(0, 0);
   Map<String, Float> absoluteCapacityByNodeLabels;
   Map<String, Float> capacitiyByNodeLabels;
-  Map<String, Resource> usedResourcesByNodeLabels = new HashMap<String, Resource>();
   Map<String, Float> absoluteMaxCapacityByNodeLabels;
   Map<String, Float> maxCapacityByNodeLabels;
   
@@ -75,6 +75,9 @@ public abstract class AbstractCSQueue implements CSQueue {
   boolean reservationsContinueLooking;
   private boolean preemptionDisabled;
 
+  // Track resource usage-by-label like used-resource/pending-resource, etc.
+  ResourceUsage queueUsage;
+  
   private final RecordFactory recordFactory = 
       RecordFactoryProvider.getRecordFactory(null);
   private CapacitySchedulerContext csContext;
@@ -121,8 +124,8 @@ public abstract class AbstractCSQueue implements CSQueue {
     maxCapacityByNodeLabels =
         cs.getConfiguration().getMaximumNodeLabelCapacities(getQueuePath(),
             accessibleLabels, labelManager);
-
     this.csContext = cs;
+    queueUsage = new ResourceUsage();
   }
   
   @Override
@@ -156,8 +159,8 @@ public abstract class AbstractCSQueue implements CSQueue {
   }
 
   @Override
-  public synchronized Resource getUsedResources() {
-    return usedResources;
+  public Resource getUsedResources() {
+    return queueUsage.getUsed();
   }
 
   public synchronized int getNumContainers() {
@@ -349,22 +352,13 @@ public abstract class AbstractCSQueue implements CSQueue {
   
   synchronized void allocateResource(Resource clusterResource, 
       Resource resource, Set<String> nodeLabels) {
-    Resources.addTo(usedResources, resource);
     
     // Update usedResources by labels
     if (nodeLabels == null || nodeLabels.isEmpty()) {
-      if (!usedResourcesByNodeLabels.containsKey(RMNodeLabelsManager.NO_LABEL)) {
-        usedResourcesByNodeLabels.put(RMNodeLabelsManager.NO_LABEL,
-            Resources.createResource(0));
-      }
-      Resources.addTo(usedResourcesByNodeLabels.get(RMNodeLabelsManager.NO_LABEL),
-          resource);
+      queueUsage.incUsed(resource);
     } else {
       for (String label : Sets.intersection(accessibleLabels, nodeLabels)) {
-        if (!usedResourcesByNodeLabels.containsKey(label)) {
-          usedResourcesByNodeLabels.put(label, Resources.createResource(0));
-        }
-        Resources.addTo(usedResourcesByNodeLabels.get(label), resource);
+        queueUsage.incUsed(label, resource);
       }
     }
 
@@ -375,23 +369,12 @@ public abstract class AbstractCSQueue implements CSQueue {
   
   protected synchronized void releaseResource(Resource clusterResource,
       Resource resource, Set<String> nodeLabels) {
-    // Update queue metrics
-    Resources.subtractFrom(usedResources, resource);
-
     // Update usedResources by labels
     if (null == nodeLabels || nodeLabels.isEmpty()) {
-      if (!usedResourcesByNodeLabels.containsKey(RMNodeLabelsManager.NO_LABEL)) {
-        usedResourcesByNodeLabels.put(RMNodeLabelsManager.NO_LABEL,
-            Resources.createResource(0));
-      }
-      Resources.subtractFrom(
-          usedResourcesByNodeLabels.get(RMNodeLabelsManager.NO_LABEL), resource);
+      queueUsage.decUsed(resource);
     } else {
       for (String label : Sets.intersection(accessibleLabels, nodeLabels)) {
-        if (!usedResourcesByNodeLabels.containsKey(label)) {
-          usedResourcesByNodeLabels.put(label, Resources.createResource(0));
-        }
-        Resources.subtractFrom(usedResourcesByNodeLabels.get(label), resource);
+        queueUsage.decUsed(label, resource);
       }
     }
 
@@ -457,7 +440,12 @@ public abstract class AbstractCSQueue implements CSQueue {
   
   @Private
   public Resource getUsedResourceByLabel(String nodeLabel) {
-    return usedResourcesByNodeLabels.get(nodeLabel);
+    return queueUsage.getUsed(nodeLabel);
+  }
+  
+  @VisibleForTesting
+  public ResourceUsage getResourceUsage() {
+    return queueUsage;
   }
 
   @Private

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8ddc154/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 9ae7e60..adb86a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@@ -115,10 +116,6 @@ public class LeafQueue extends AbstractCSQueue {
   
   private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo();
   
-  // sum of resources used by application masters for applications
-  // running in this queue
-  private final Resource usedAMResources = Resource.newInstance(0, 0);
-  
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, CSQueue parent, CSQueue old) throws IOException {
     super(cs, queueName, parent, old);
@@ -435,7 +432,7 @@ public class LeafQueue extends AbstractCSQueue {
     return queueName + ": " + 
         "capacity=" + capacity + ", " + 
         "absoluteCapacity=" + absoluteCapacity + ", " + 
-        "usedResources=" + usedResources +  ", " +
+        "usedResources=" + queueUsage.getUsed() +  ", " +
         "usedCapacity=" + getUsedCapacity() + ", " + 
         "absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " +
         "numApps=" + getNumApplications() + ", " + 
@@ -464,7 +461,7 @@ public class LeafQueue extends AbstractCSQueue {
     ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>();
     for (Map.Entry<String, User> entry: users.entrySet()) {
       usersToReturn.add(new UserInfo(entry.getKey(), Resources.clone(
-        entry.getValue().consumed), entry.getValue().getActiveApplications(),
+        entry.getValue().getUsed()), entry.getValue().getActiveApplications(),
         entry.getValue().getPendingApplications()));
     }
     return usersToReturn;
@@ -633,7 +630,7 @@ public class LeafQueue extends AbstractCSQueue {
       
       // Check am resource limit
       Resource amIfStarted = 
-        Resources.add(application.getAMResource(), usedAMResources);
+        Resources.add(application.getAMResource(), queueUsage.getAMUsed());
       
       if (LOG.isDebugEnabled()) {
         LOG.debug("application AMResource " + application.getAMResource() +
@@ -678,9 +675,8 @@ public class LeafQueue extends AbstractCSQueue {
       }
       user.activateApplication();
       activeApplications.add(application);
-      Resources.addTo(usedAMResources, application.getAMResource());
-      Resources.addTo(user.getConsumedAMResources(), 
-        application.getAMResource());
+      queueUsage.incAMUsed(application.getAMResource());
+      user.getResourceUsage().incAMUsed(application.getAMResource());
       i.remove();
       LOG.info("Application " + application.getApplicationId() +
           " from user: " + application.getUser() + 
@@ -731,9 +727,8 @@ public class LeafQueue extends AbstractCSQueue {
     if (!wasActive) {
       pendingApplications.remove(application);
     } else {
-      Resources.subtractFrom(usedAMResources, application.getAMResource());
-      Resources.subtractFrom(user.getConsumedAMResources(),
-        application.getAMResource());
+      queueUsage.decAMUsed(application.getAMResource());
+      user.getResourceUsage().decAMUsed(application.getAMResource());
     }
     applicationAttemptMap.remove(application.getApplicationAttemptId());
 
@@ -972,8 +967,8 @@ public class LeafQueue extends AbstractCSQueue {
      */
     Resource headroom = 
       Resources.min(resourceCalculator, clusterResource,
-        Resources.subtract(userLimit, user.getTotalConsumedResources()),
-        Resources.subtract(queueMaxCap, usedResources)
+        Resources.subtract(userLimit, user.getUsed()),
+        Resources.subtract(queueMaxCap, queueUsage.getUsed())
         );
     return headroom;
   }
@@ -993,12 +988,8 @@ public class LeafQueue extends AbstractCSQueue {
     
     boolean canAssign = true;
     for (String label : labelCanAccess) {
-      if (!usedResourcesByNodeLabels.containsKey(label)) {
-        usedResourcesByNodeLabels.put(label, Resources.createResource(0));
-      }
-      
       Resource potentialTotalCapacity =
-          Resources.add(usedResourcesByNodeLabels.get(label), required);
+          Resources.add(queueUsage.getUsed(label), required);
       
       float potentialNewCapacity =
           Resources.divide(resourceCalculator, clusterResource,
@@ -1021,14 +1012,14 @@ public class LeafQueue extends AbstractCSQueue {
             LOG.debug("try to use reserved: "
                 + getQueueName()
                 + " usedResources: "
-                + usedResources
+                + queueUsage.getUsed()
                 + " clusterResources: "
                 + clusterResource
                 + " reservedResources: "
                 + application.getCurrentReservation()
                 + " currentCapacity "
                 + Resources.divide(resourceCalculator, clusterResource,
-                    usedResources, clusterResource) + " required " + required
+                    queueUsage.getUsed(), clusterResource) + " required " + required
                 + " potentialNewWithoutReservedCapacity: "
                 + potentialNewWithoutReservedCapacity + " ( " + " max-capacity: "
                 + absoluteMaxCapacity + ")");
@@ -1048,11 +1039,11 @@ public class LeafQueue extends AbstractCSQueue {
       if (LOG.isDebugEnabled()) {
         LOG.debug(getQueueName()
             + "Check assign to queue, label=" + label
-            + " usedResources: " + usedResourcesByNodeLabels.get(label)
+            + " usedResources: " + queueUsage.getUsed(label)
             + " clusterResources: " + clusterResource
             + " currentCapacity "
             + Resources.divide(resourceCalculator, clusterResource,
-                usedResourcesByNodeLabels.get(label),
+                queueUsage.getUsed(label),
                 labelManager.getResourceByLabel(label, clusterResource))
             + " potentialNewCapacity: " + potentialNewCapacity + " ( "
             + " max-capacity: " + absoluteMaxCapacity + ")");
@@ -1109,7 +1100,7 @@ public class LeafQueue extends AbstractCSQueue {
       LOG.debug("Headroom calculation for user " + user + ": " + 
           " userLimit=" + userLimit + 
           " queueMaxCap=" + queueMaxCap + 
-          " consumed=" + queueUser.getTotalConsumedResources() + 
+          " consumed=" + queueUser.getUsed() + 
           " headroom=" + headroom);
     }
     
@@ -1164,8 +1155,8 @@ public class LeafQueue extends AbstractCSQueue {
 
     Resource currentCapacity =
         Resources.lessThan(resourceCalculator, clusterResource, 
-            usedResources, queueCapacity) ?
-            queueCapacity : Resources.add(usedResources, required);
+            queueUsage.getUsed(), queueCapacity) ?
+            queueCapacity : Resources.add(queueUsage.getUsed(), required);
     
     // Never allow a single user to take more than the 
     // queue's configured capacity * user-limit-factor.
@@ -1200,10 +1191,10 @@ public class LeafQueue extends AbstractCSQueue {
           " userLimit=" + userLimit +
           " userLimitFactor=" + userLimitFactor +
           " required: " + required + 
-          " consumed: " + user.getTotalConsumedResources() + 
+          " consumed: " + user.getUsed() + 
           " limit: " + limit +
           " queueCapacity: " + queueCapacity + 
-          " qconsumed: " + usedResources +
+          " qconsumed: " + queueUsage.getUsed() +
           " currentCapacity: " + currentCapacity +
           " activeUsers: " + activeUsers +
           " clusterCapacity: " + clusterResource
@@ -1228,7 +1219,7 @@ public class LeafQueue extends AbstractCSQueue {
     // overhead of the AM, but it's a > check, not a >= check, so...
     if (Resources
         .greaterThan(resourceCalculator, clusterResource,
-            user.getConsumedResourceByLabel(label),
+            user.getUsed(label),
             limit)) {
       // if enabled, check to see if could we potentially use this node instead
       // of a reserved node if the application has reserved containers
@@ -1236,13 +1227,13 @@ public class LeafQueue extends AbstractCSQueue {
         if (Resources.lessThanOrEqual(
             resourceCalculator,
             clusterResource,
-            Resources.subtract(user.getTotalConsumedResources(),
+            Resources.subtract(user.getUsed(),
                 application.getCurrentReservation()), limit)) {
 
           if (LOG.isDebugEnabled()) {
             LOG.debug("User " + userName + " in queue " + getQueueName()
                 + " will exceed limit based on reservations - " + " consumed: "
-                + user.getTotalConsumedResources() + " reserved: "
+                + user.getUsed() + " reserved: "
                 + application.getCurrentReservation() + " limit: " + limit);
           }
           return true;
@@ -1251,7 +1242,7 @@ public class LeafQueue extends AbstractCSQueue {
       if (LOG.isDebugEnabled()) {
         LOG.debug("User " + userName + " in queue " + getQueueName()
             + " will exceed limit - " + " consumed: "
-            + user.getTotalConsumedResources() + " limit: " + limit);
+            + user.getUsed() + " limit: " + limit);
       }
       return false;
     }
@@ -1673,7 +1664,7 @@ public class LeafQueue extends AbstractCSQueue {
             " queue=" + this.toString() + 
             " usedCapacity=" + getUsedCapacity() + 
             " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + 
-            " used=" + usedResources +
+            " used=" + queueUsage.getUsed() +
             " cluster=" + clusterResource);
 
         return request.getCapability();
@@ -1774,9 +1765,9 @@ public class LeafQueue extends AbstractCSQueue {
     if (LOG.isDebugEnabled()) {
       LOG.info(getQueueName() + 
           " user=" + userName + 
-          " used=" + usedResources + " numContainers=" + numContainers +
+          " used=" + queueUsage.getUsed() + " numContainers=" + numContainers +
           " headroom = " + application.getHeadroom() +
-          " user-resources=" + user.getTotalConsumedResources()
+          " user-resources=" + user.getUsed()
           );
     }
   }
@@ -1792,8 +1783,8 @@ public class LeafQueue extends AbstractCSQueue {
     metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
       
     LOG.info(getQueueName() + 
-        " used=" + usedResources + " numContainers=" + numContainers + 
-        " user=" + userName + " user-resources=" + user.getTotalConsumedResources());
+        " used=" + queueUsage.getUsed() + " numContainers=" + numContainers + 
+        " user=" + userName + " user-resources=" + user.getUsed());
   }
   
   private void updateAbsoluteCapacityResource(Resource clusterResource) {
@@ -1835,22 +1826,20 @@ public class LeafQueue extends AbstractCSQueue {
 
   @VisibleForTesting
   public static class User {
-    Resource consumed = Resources.createResource(0, 0);
-    Resource consumedAMResources = Resources.createResource(0, 0);
-    Map<String, Resource> consumedByLabel = new HashMap<String, Resource>();
+    ResourceUsage userResourceUsage = new ResourceUsage();
     int pendingApplications = 0;
     int activeApplications = 0;
 
-    public Resource getTotalConsumedResources() {
-      return consumed;
+    public ResourceUsage getResourceUsage() {
+      return userResourceUsage;
     }
     
-    public Resource getConsumedResourceByLabel(String label) {
-      Resource r = consumedByLabel.get(label);
-      if (null != r) {
-        return r;
-      }
-      return Resources.none();
+    public Resource getUsed() {
+      return userResourceUsage.getUsed();
+    }
+    
+    public Resource getUsed(String label) {
+      return userResourceUsage.getUsed(label);
     }
 
     public int getPendingApplications() {
@@ -1862,7 +1851,7 @@ public class LeafQueue extends AbstractCSQueue {
     }
     
     public Resource getConsumedAMResources() {
-      return consumedAMResources; 
+      return userResourceUsage.getAMUsed();
     }
 
     public int getTotalApplications() {
@@ -1887,47 +1876,26 @@ public class LeafQueue extends AbstractCSQueue {
       }
     }
 
-    public synchronized void assignContainer(Resource resource,
+    public void assignContainer(Resource resource,
         Set<String> nodeLabels) {
-      Resources.addTo(consumed, resource);
-      
       if (nodeLabels == null || nodeLabels.isEmpty()) {
-        if (!consumedByLabel.containsKey(RMNodeLabelsManager.NO_LABEL)) {
-          consumedByLabel.put(RMNodeLabelsManager.NO_LABEL,
-              Resources.createResource(0));
-        }
-        Resources.addTo(consumedByLabel.get(RMNodeLabelsManager.NO_LABEL),
-            resource);
+        userResourceUsage.incUsed(resource);
       } else {
         for (String label : nodeLabels) {
-          if (!consumedByLabel.containsKey(label)) {
-            consumedByLabel.put(label, Resources.createResource(0));
-          }
-          Resources.addTo(consumedByLabel.get(label), resource);
+          userResourceUsage.incUsed(label, resource);
         }
       }
     }
 
-    public synchronized void releaseContainer(Resource resource, Set<String> nodeLabels)
{
-      Resources.subtractFrom(consumed, resource);
-      
-      // Update usedResources by labels
+    public void releaseContainer(Resource resource, Set<String> nodeLabels) {
       if (nodeLabels == null || nodeLabels.isEmpty()) {
-        if (!consumedByLabel.containsKey(RMNodeLabelsManager.NO_LABEL)) {
-          consumedByLabel.put(RMNodeLabelsManager.NO_LABEL,
-              Resources.createResource(0));
-        }
-        Resources.subtractFrom(
-            consumedByLabel.get(RMNodeLabelsManager.NO_LABEL), resource);
+        userResourceUsage.decUsed(resource);
       } else {
         for (String label : nodeLabels) {
-          if (!consumedByLabel.containsKey(label)) {
-            consumedByLabel.put(label, Resources.createResource(0));
-          }
-          Resources.subtractFrom(consumedByLabel.get(label), resource);
+          userResourceUsage.decUsed(label, resource);
         }
       }
-    }  
+    }
   }
 
   @Override
@@ -1986,7 +1954,7 @@ public class LeafQueue extends AbstractCSQueue {
           + " resource=" + rmContainer.getContainer().getResource()
           + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
           + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
-          + usedResources + " cluster=" + clusterResource);
+          + queueUsage.getUsed() + " cluster=" + clusterResource);
       // Inform the parent queue
       getParent().attachContainer(clusterResource, application, rmContainer);
     }
@@ -2004,7 +1972,7 @@ public class LeafQueue extends AbstractCSQueue {
           + " resource=" + rmContainer.getContainer().getResource()
           + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
           + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
-          + usedResources + " cluster=" + clusterResource);
+          + queueUsage.getUsed() + " cluster=" + clusterResource);
       // Inform the parent queue
       getParent().detachContainer(clusterResource, application, rmContainer);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8ddc154/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index f820cca..de92c9c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -256,7 +256,7 @@ public class ParentQueue extends AbstractCSQueue {
         "numChildQueue= " + childQueues.size() + ", " + 
         "capacity=" + capacity + ", " +  
         "absoluteCapacity=" + absoluteCapacity + ", " +
-        "usedResources=" + usedResources + 
+        "usedResources=" + queueUsage.getUsed() + 
         "usedCapacity=" + getUsedCapacity() + ", " + 
         "numApps=" + getNumApplications() + ", " + 
         "numContainers=" + getNumContainers();
@@ -463,7 +463,7 @@ public class ParentQueue extends AbstractCSQueue {
             " queue=" + getQueueName() + 
             " usedCapacity=" + getUsedCapacity() +
             " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
-            " used=" + usedResources + 
+            " used=" + queueUsage.getUsed() + 
             " cluster=" + clusterResource);
 
       } else {
@@ -506,19 +506,16 @@ public class ParentQueue extends AbstractCSQueue {
     
     boolean canAssign = true;
     for (String label : labelCanAccess) {
-      if (!usedResourcesByNodeLabels.containsKey(label)) {
-        usedResourcesByNodeLabels.put(label, Resources.createResource(0));
-      }
       float currentAbsoluteLabelUsedCapacity =
           Resources.divide(resourceCalculator, clusterResource,
-              usedResourcesByNodeLabels.get(label),
+              queueUsage.getUsed(label),
               labelManager.getResourceByLabel(label, clusterResource));
       // if any of the label doesn't beyond limit, we can allocate on this node
       if (currentAbsoluteLabelUsedCapacity >= 
             getAbsoluteMaximumCapacityByNodeLabel(label)) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug(getQueueName() + " used=" + usedResources
-              + " current-capacity (" + usedResourcesByNodeLabels.get(label) + ") "
+          LOG.debug(getQueueName() + " used=" + queueUsage.getUsed()
+              + " current-capacity (" + queueUsage.getUsed(label) + ") "
               + " >= max-capacity ("
               + labelManager.getResourceByLabel(label, clusterResource) + ")");
         }
@@ -540,16 +537,16 @@ public class ParentQueue extends AbstractCSQueue {
           .getReservedMB(), getMetrics().getReservedVirtualCores());
       float capacityWithoutReservedCapacity = Resources.divide(
           resourceCalculator, clusterResource,
-          Resources.subtract(usedResources, reservedResources),
+          Resources.subtract(queueUsage.getUsed(), reservedResources),
           clusterResource);
 
       if (capacityWithoutReservedCapacity <= absoluteMaxCapacity) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("parent: try to use reserved: " + getQueueName()
-            + " usedResources: " + usedResources.getMemory()
+            + " usedResources: " + queueUsage.getUsed().getMemory()
             + " clusterResources: " + clusterResource.getMemory()
             + " reservedResources: " + reservedResources.getMemory()
-            + " currentCapacity " + ((float) usedResources.getMemory())
+            + " currentCapacity " + ((float) queueUsage.getUsed().getMemory())
             / clusterResource.getMemory()
             + " potentialNewWithoutReservedCapacity: "
             + capacityWithoutReservedCapacity + " ( " + " max-capacity: "
@@ -645,7 +642,7 @@ public class ParentQueue extends AbstractCSQueue {
             " queue=" + getQueueName() + 
             " usedCapacity=" + getUsedCapacity() +
             " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
-            " used=" + usedResources + 
+            " used=" + queueUsage.getUsed() + 
             " cluster=" + clusterResource);
       }
 
@@ -735,7 +732,7 @@ public class ParentQueue extends AbstractCSQueue {
           .getResource(), node.getLabels());
       LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
           + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
-          + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
+          + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
           + clusterResource);
       // Inform the parent
       if (parent != null) {
@@ -755,7 +752,7 @@ public class ParentQueue extends AbstractCSQueue {
           node.getLabels());
       LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
           + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
-          + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
+          + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
           + clusterResource);
       // Inform the parent
       if (parent != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8ddc154/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index e21fcf9..a9caf77 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -321,7 +321,7 @@ public class TestWorkPreservingRMRestart {
       1e-8);
     // assert user consumed resources.
     assertEquals(usedResource, leafQueue.getUser(app.getUser())
-      .getTotalConsumedResources());
+      .getUsed());
   }
 
   private void checkFifoQueue(ResourceManager rm,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8ddc154/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java
index a62889b..d643c9d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java
@@ -202,33 +202,33 @@ public class TestCSQueueUtils {
     LOG.info("t2 l2q2 " + result);
     
     //some usage, but below the base capacity
-    Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.1f));
-    Resources.addTo(l1q2.getUsedResources(), Resources.multiply(clusterResource, 0.1f));
+    root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
+    l1q2.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
     result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
       resourceCalculator, clusterResource, l2q2);
     assertEquals( 0.4f, result, 0.000001f);
     LOG.info("t2 l2q2 " + result);
     
     //usage gt base on parent sibling
-    Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.3f));
-    Resources.addTo(l1q2.getUsedResources(), Resources.multiply(clusterResource, 0.3f));
+    root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f));
+    l1q2.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f));
     result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
       resourceCalculator, clusterResource, l2q2);
     assertEquals( 0.3f, result, 0.000001f);
     LOG.info("t2 l2q2 " + result);
     
     //same as last, but with usage also on direct parent
-    Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.1f));
-    Resources.addTo(l1q1.getUsedResources(), Resources.multiply(clusterResource, 0.1f));
+    root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
+    l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
     result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
       resourceCalculator, clusterResource, l2q2);
     assertEquals( 0.3f, result, 0.000001f);
     LOG.info("t2 l2q2 " + result);
     
     //add to direct sibling, below the threshold of effect at present
-    Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.2f));
-    Resources.addTo(l1q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f));
-    Resources.addTo(l2q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f));
+    root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
+    l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
+    l2q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
     result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
       resourceCalculator, clusterResource, l2q2);
     assertEquals( 0.3f, result, 0.000001f);
@@ -236,9 +236,9 @@ public class TestCSQueueUtils {
     
     //add to direct sibling, now above the threshold of effect
     //(it's cumulative with prior tests)
-    Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.2f));
-    Resources.addTo(l1q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f));
-    Resources.addTo(l2q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f));
+    root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
+    l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
+    l2q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
     result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
       resourceCalculator, clusterResource, l2q2);
     assertEquals( 0.1f, result, 0.000001f);


Mime
View raw message