hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sj...@apache.org
Subject [06/50] [abbrv] hadoop git commit: YARN-2920. Changed CapacityScheduler to kill containers on nodes where node labels are changed. Contributed by Wangda Tan (cherry picked from commit fdf042dfffa4d2474e3cac86cfb8fe9ee4648beb)
Date Sat, 26 Sep 2015 16:05:13 GMT
YARN-2920. Changed CapacityScheduler to kill containers on nodes where node labels are changed. Contributed by  Wangda Tan
(cherry picked from commit fdf042dfffa4d2474e3cac86cfb8fe9ee4648beb)

(cherry picked from commit 411836b74c6c02c0b5aebbbce29c209d93db1de2)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/88f022da
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/88f022da
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/88f022da

Branch: refs/heads/branch-2.6
Commit: 88f022da245c5d34997494a822124d07f8a5f72f
Parents: 2073fc0
Author: Jian He <jianhe@apache.org>
Authored: Mon Dec 22 16:50:15 2014 -0800
Committer: Vinod Kumar Vavilapalli <vinodkv@apache.org>
Committed: Sat Sep 5 20:54:18 2015 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/sls/nodemanager/NodeInfo.java   |   3 +-
 .../yarn/sls/scheduler/RMNodeWrapper.java       |   3 +-
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../server/resourcemanager/ResourceManager.java |   1 +
 .../nodelabels/RMNodeLabelsManager.java         |  23 +++
 .../resourcemanager/rmnode/RMNodeImpl.java      |   6 +-
 .../scheduler/SchedulerNode.java                |  24 ++-
 .../scheduler/capacity/AbstractCSQueue.java     |   5 +
 .../scheduler/capacity/CSQueue.java             |   8 +
 .../scheduler/capacity/CapacityScheduler.java   |  88 +++++++--
 .../scheduler/capacity/LeafQueue.java           |  26 +--
 .../scheduler/capacity/ParentQueue.java         |  24 ++-
 .../common/fica/FiCaSchedulerNode.java          |  11 +-
 .../event/NodeLabelsUpdateSchedulerEvent.java   |  37 ++++
 .../scheduler/event/SchedulerEventType.java     |   1 +
 .../yarn/server/resourcemanager/MockNodes.java  |   3 +-
 .../TestCapacitySchedulerNodeLabelUpdate.java   | 193 +++++++++++++++++++
 17 files changed, 414 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index fdddcf4..ee6eb7b 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
         .UpdatedContainerInfo;
@@ -162,7 +163,7 @@ public class NodeInfo {
 
     @Override
     public Set<String> getNodeLabels() {
-      return null;
+      return RMNodeLabelsManager.EMPTY_STRING_SET;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 3b185ae..b64be1b 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
         .UpdatedContainerInfo;
@@ -150,6 +151,6 @@ public class RMNodeWrapper implements RMNode {
 
   @Override
   public Set<String> getNodeLabels() {
-    return null;
+    return RMNodeLabelsManager.EMPTY_STRING_SET;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8244d61..fa1e120 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -171,6 +171,9 @@ Release 2.6.1 - UNRELEASED
     YARN-3733. Fix DominantRC#compare() does not work as expected if
     cluster resource is empty. (Rohith Sharmaks via wangda)
 
+    YARN-2920. Changed CapacityScheduler to kill containers on nodes where
+    node labels are changed. (Wangda Tan via jianhe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index ea762c0..353851c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -430,6 +430,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
       rmContext.setAMFinishingMonitor(amFinishingMonitor);
       
       RMNodeLabelsManager nlm = createNodeLabelManager();
+      nlm.setRMContext(rmContext);
       addService(nlm);
       rmContext.setNodeLabelManager(nlm);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
index ba1727c..d9828e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
@@ -35,7 +35,10 @@ import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.collect.ImmutableSet;
@@ -57,6 +60,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
       new ConcurrentHashMap<String, Queue>();
   protected AccessControlList adminAcl;
   
+  private RMContext rmContext = null;
+  
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     super.serviceInit(conf);
@@ -331,6 +336,7 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
     return map;
   }
 
+  @SuppressWarnings("unchecked")
   private void updateResourceMappings(Map<String, Host> before,
       Map<String, Host> after) {
     // Get NMs in before only
@@ -341,6 +347,10 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
     for (Entry<String, Host> entry : after.entrySet()) {
       allNMs.addAll(entry.getValue().nms.keySet());
     }
+    
+    // Map used to notify RM
+    Map<NodeId, Set<String>> newNodeToLabelsMap =
+        new HashMap<NodeId, Set<String>>();
 
     // traverse all nms
     for (NodeId nodeId : allNMs) {
@@ -379,6 +389,9 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
       Node newNM;
       if ((newNM = getNMInNodeSet(nodeId, after, true)) != null) {
         Set<String> newLabels = getLabelsByNode(nodeId, after);
+        
+        newNodeToLabelsMap.put(nodeId, ImmutableSet.copyOf(newLabels));
+        
         // no label in the past
         if (newLabels.isEmpty()) {
           // update labels
@@ -405,6 +418,12 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
         }
       }
     }
+    
+    // Notify RM
+    if (rmContext != null && rmContext.getDispatcher() != null) {
+      rmContext.getDispatcher().getEventHandler().handle(
+          new NodeLabelsUpdateSchedulerEvent(newNodeToLabelsMap));
+    }
   }
   
   public Resource getResourceByLabel(String label, Resource clusterResource) {
@@ -452,4 +471,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
     }
     return false;
   }
+  
+  public void setRMContext(RMContext rmContext) {
+    this.rmContext = rmContext;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index b92f399..b57b7cc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -860,9 +861,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
   @Override
   public Set<String> getNodeLabels() {
-    if (context.getNodeLabelManager() == null) {
+    RMNodeLabelsManager nlm = context.getNodeLabelManager();
+    if (nlm == null || nlm.getLabelsOnNode(nodeId) == null) {
       return CommonNodeLabelsManager.EMPTY_STRING_SET;
     }
-    return context.getNodeLabelManager().getLabelsOnNode(nodeId);
+    return nlm.getLabelsOnNode(nodeId);
   }
  }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index f4d8731..d1922ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,11 +34,14 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import com.google.common.collect.ImmutableSet;
+
 
 /**
  * Represents a YARN Cluster Node from the viewpoint of the scheduler.
@@ -61,8 +65,11 @@ public abstract class SchedulerNode {
 
   private final RMNode rmNode;
   private final String nodeName;
-
-  public SchedulerNode(RMNode node, boolean usePortForNodeName) {
+  
+  private volatile Set<String> labels = null;
+  
+  public SchedulerNode(RMNode node, boolean usePortForNodeName,
+      Set<String> labels) {
     this.rmNode = node;
     this.availableResource = Resources.clone(node.getTotalCapability());
     this.totalResourceCapability = Resources.clone(node.getTotalCapability());
@@ -71,6 +78,11 @@ public abstract class SchedulerNode {
     } else {
       nodeName = rmNode.getHostName();
     }
+    this.labels = ImmutableSet.copyOf(labels);
+  }
+
+  public SchedulerNode(RMNode node, boolean usePortForNodeName) {
+    this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET);
   }
 
   public RMNode getRMNode() {
@@ -275,4 +287,12 @@ public abstract class SchedulerNode {
     }
     allocateContainer(rmContainer);
   }
+  
+  public Set<String> getLabels() {
+    return labels;
+  }
+  
+  public void updateLabels(Set<String> labels) {
+    this.labels = labels;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index fc0fbb4..1f6696d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -447,4 +447,9 @@ public abstract class AbstractCSQueue implements CSQueue {
   public Map<QueueACL, AccessControlList> getACLs() {
     return acls;
   }
+  
+  @Private
+  public Resource getUsedResourceByLabel(String nodeLabel) {
+    return usedResourcesByNodeLabels.get(nodeLabel);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 6438d6c..07a7e0e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -144,6 +144,14 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
   public Resource getUsedResources();
   
   /**
+   * Get the currently utilized resources which allocated at nodes with label
+   * specified
+   * 
+   * @return used resources by the queue and it's children
+   */
+  public Resource getUsedResourceByLabel(String nodeLabel);
+  
+  /**
    * Get the current run-state of the queue
    * @return current run-state
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 042c83c..c49c498 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -25,15 +25,15 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.HashSet;
-import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,12 +48,15 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -79,12 +82,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -93,6 +99,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptR
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -106,11 +113,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
-import org.apache.hadoop.yarn.api.records.ReservationId;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
-
 @LimitedPrivate("yarn")
 @Evolving
 @SuppressWarnings("unchecked")
@@ -966,6 +968,51 @@ public class CapacityScheduler extends
     updateNodeResource(nm, resourceOption);
     root.updateClusterResource(clusterResource);
   }
+  
+  /**
+   * Process node labels update on a node.
+   * 
+   * TODO: Currently capacity scheduler will kill containers on a node when
+   * labels on the node changed. It is a simply solution to ensure guaranteed
+   * capacity on labels of queues. When YARN-2498 completed, we can let
+   * preemption policy to decide if such containers need to be killed or just
+   * keep them running.
+   */
+  private synchronized void updateLabelsOnNode(NodeId nodeId,
+      Set<String> newLabels) {
+    FiCaSchedulerNode node = nodes.get(nodeId);
+    if (null == node) {
+      return;
+    }
+    
+    // labels is same, we don't need do update
+    if (node.getLabels().size() == newLabels.size()
+        && node.getLabels().containsAll(newLabels)) {
+      return;
+    }
+    
+    // Kill running containers since label is changed
+    for (RMContainer rmContainer : node.getRunningContainers()) {
+      ContainerId containerId = rmContainer.getContainerId();
+      completedContainer(rmContainer, 
+          ContainerStatus.newInstance(containerId,
+              ContainerState.COMPLETE, 
+              String.format(
+                  "Container=%s killed since labels on the node=%s changed",
+                  containerId.toString(), nodeId.toString()),
+              ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
+          RMContainerEventType.KILL);
+    }
+    
+    // Unreserve container on this node
+    RMContainer reservedContainer = node.getReservedContainer();
+    if (null != reservedContainer) {
+      dropContainerReservation(reservedContainer);
+    }
+    
+    // Update node labels after we've done this
+    node.updateLabels(newLabels);
+  }
 
   private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
     if (rmContext.isWorkPreservingRecoveryEnabled()
@@ -1049,6 +1096,19 @@ public class CapacityScheduler extends
         nodeResourceUpdatedEvent.getResourceOption());
     }
     break;
+    case NODE_LABELS_UPDATE:
+    {
+      NodeLabelsUpdateSchedulerEvent labelUpdateEvent =
+          (NodeLabelsUpdateSchedulerEvent) event;
+      
+      for (Entry<NodeId, Set<String>> entry : labelUpdateEvent
+          .getUpdatedNodeToLabels().entrySet()) {
+        NodeId id = entry.getKey();
+        Set<String> labels = entry.getValue();
+        updateLabelsOnNode(id, labels);
+      }
+    }
+    break;
     case NODE_UPDATE:
     {
       NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
@@ -1117,14 +1177,8 @@ public class CapacityScheduler extends
   }
 
   private synchronized void addNode(RMNode nodeManager) {
-    // update this node to node label manager
-    if (labelManager != null) {
-      labelManager.activateNode(nodeManager.getNodeID(),
-          nodeManager.getTotalCapability());
-    }
-    
     this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
-        usePortForNodeName));
+        usePortForNodeName, nodeManager.getNodeLabels()));
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
     root.updateClusterResource(clusterResource);
     int numNodes = numNodeManagers.incrementAndGet();
@@ -1135,6 +1189,12 @@ public class CapacityScheduler extends
     if (scheduleAsynchronously && numNodes == 1) {
       asyncSchedulerThread.beginSchedule();
     }
+    
+    // update this node to node label manager
+    if (labelManager != null) {
+      labelManager.activateNode(nodeManager.getNodeID(),
+          nodeManager.getTotalCapability());
+    }
   }
 
   private synchronized void removeNode(RMNode nodeInfo) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 60b5a59..509b0f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -809,7 +809,7 @@ public class LeafQueue extends AbstractCSQueue {
     
     // if our queue cannot access this node, just return
     if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
-        labelManager.getLabelsOnNode(node.getNodeID()))) {
+        node.getLabels())) {
       return NULL_ASSIGNMENT;
     }
     
@@ -878,7 +878,7 @@ public class LeafQueue extends AbstractCSQueue {
           
           // Check queue max-capacity limit
           if (!canAssignToThisQueue(clusterResource, required,
-              labelManager.getLabelsOnNode(node.getNodeID()), application, true)) {
+              node.getLabels(), application, true)) {
             return NULL_ASSIGNMENT;
           }
 
@@ -911,7 +911,7 @@ public class LeafQueue extends AbstractCSQueue {
             // Book-keeping 
             // Note: Update headroom to account for current allocation too...
             allocateResource(clusterResource, application, assigned,
-                labelManager.getLabelsOnNode(node.getNodeID()));
+                node.getLabels());
             
             // Don't reset scheduling opportunities for non-local assignments
             // otherwise the app will be delayed for each non-local assignment.
@@ -1561,7 +1561,7 @@ public class LeafQueue extends AbstractCSQueue {
     
     // check if the resource request can access the label
     if (!SchedulerUtils.checkNodeLabelExpression(
-        labelManager.getLabelsOnNode(node.getNodeID()),
+        node.getLabels(),
         request.getNodeLabelExpression())) {
       // this is a reserved container, but we cannot allocate it now according
       // to label not match. This can be caused by node label changed
@@ -1752,8 +1752,7 @@ public class LeafQueue extends AbstractCSQueue {
         // Book-keeping
         if (removed) {
           releaseResource(clusterResource, application,
-              container.getResource(),
-              labelManager.getLabelsOnNode(node.getNodeID()));
+              container.getResource(), node.getLabels());
           LOG.info("completedContainer" +
               " container=" + container +
               " queue=" + this +
@@ -1950,9 +1949,10 @@ public class LeafQueue extends AbstractCSQueue {
     }
     // Careful! Locking order is important! 
     synchronized (this) {
+      FiCaSchedulerNode node =
+          scheduler.getNode(rmContainer.getContainer().getNodeId());
       allocateResource(clusterResource, attempt, rmContainer.getContainer()
-          .getResource(), labelManager.getLabelsOnNode(rmContainer
-          .getContainer().getNodeId()));
+          .getResource(), node.getLabels());
     }
     getParent().recoverContainer(clusterResource, attempt, rmContainer);
   }
@@ -1989,9 +1989,10 @@ public class LeafQueue extends AbstractCSQueue {
   public void attachContainer(Resource clusterResource,
       FiCaSchedulerApp application, RMContainer rmContainer) {
     if (application != null) {
+      FiCaSchedulerNode node =
+          scheduler.getNode(rmContainer.getContainer().getNodeId());
       allocateResource(clusterResource, application, rmContainer.getContainer()
-          .getResource(), labelManager.getLabelsOnNode(rmContainer
-          .getContainer().getNodeId()));
+          .getResource(), node.getLabels());
       LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
           + " resource=" + rmContainer.getContainer().getResource()
           + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
@@ -2006,9 +2007,10 @@ public class LeafQueue extends AbstractCSQueue {
   public void detachContainer(Resource clusterResource,
       FiCaSchedulerApp application, RMContainer rmContainer) {
     if (application != null) {
+      FiCaSchedulerNode node =
+          scheduler.getNode(rmContainer.getContainer().getNodeId());
       releaseResource(clusterResource, application, rmContainer.getContainer()
-          .getResource(), labelManager.getLabelsOnNode(rmContainer.getContainer()
-          .getNodeId()));
+          .getResource(), node.getLabels());
       LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
           + " resource=" + rmContainer.getContainer().getResource()
           + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 6ffaf4c..fd598f2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -73,6 +73,7 @@ public class ParentQueue extends AbstractCSQueue {
   private final boolean rootQueue;
   final Comparator<CSQueue> queueComparator;
   volatile int numApplications;
+  private final CapacitySchedulerContext scheduler;
 
   private final RecordFactory recordFactory = 
     RecordFactoryProvider.getRecordFactory(null);
@@ -80,7 +81,7 @@ public class ParentQueue extends AbstractCSQueue {
   public ParentQueue(CapacitySchedulerContext cs, 
       String queueName, CSQueue parent, CSQueue old) throws IOException {
     super(cs, queueName, parent, old);
-    
+    this.scheduler = cs;
     this.queueComparator = cs.getQueueComparator();
 
     this.rootQueue = (parent == null);
@@ -420,10 +421,10 @@ public class ParentQueue extends AbstractCSQueue {
       Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
+    Set<String> nodeLabels = node.getLabels();
     
     // if our queue cannot access this node, just return
-    if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
-        labelManager.getLabelsOnNode(node.getNodeID()))) {
+    if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, nodeLabels)) {
       return assignment;
     }
     
@@ -434,7 +435,6 @@ public class ParentQueue extends AbstractCSQueue {
       }
       
       boolean localNeedToUnreserve = false;
-      Set<String> nodeLabels = labelManager.getLabelsOnNode(node.getNodeID()); 
       
       // Are we over maximum-capacity for this queue?
       if (!canAssignToThisQueue(clusterResource, nodeLabels)) {
@@ -641,7 +641,7 @@ public class ParentQueue extends AbstractCSQueue {
       // Book keeping
       synchronized (this) {
         super.releaseResource(clusterResource, rmContainer.getContainer()
-            .getResource(), labelManager.getLabelsOnNode(node.getNodeID()));
+            .getResource(), node.getLabels());
 
         LOG.info("completedContainer" +
             " queue=" + getQueueName() + 
@@ -703,9 +703,10 @@ public class ParentQueue extends AbstractCSQueue {
     }
     // Careful! Locking order is important! 
     synchronized (this) {
+      FiCaSchedulerNode node =
+          scheduler.getNode(rmContainer.getContainer().getNodeId());
       super.allocateResource(clusterResource, rmContainer.getContainer()
-          .getResource(), labelManager.getLabelsOnNode(rmContainer
-          .getContainer().getNodeId()));
+          .getResource(), node.getLabels());
     }
     if (parent != null) {
       parent.recoverContainer(clusterResource, attempt, rmContainer);
@@ -730,9 +731,10 @@ public class ParentQueue extends AbstractCSQueue {
   public void attachContainer(Resource clusterResource,
       FiCaSchedulerApp application, RMContainer rmContainer) {
     if (application != null) {
+      FiCaSchedulerNode node =
+          scheduler.getNode(rmContainer.getContainer().getNodeId());
       super.allocateResource(clusterResource, rmContainer.getContainer()
-          .getResource(), labelManager.getLabelsOnNode(rmContainer
-          .getContainer().getNodeId()));
+          .getResource(), node.getLabels());
       LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
           + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
           + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
@@ -748,9 +750,11 @@ public class ParentQueue extends AbstractCSQueue {
   public void detachContainer(Resource clusterResource,
       FiCaSchedulerApp application, RMContainer rmContainer) {
     if (application != null) {
+      FiCaSchedulerNode node =
+          scheduler.getNode(rmContainer.getContainer().getNodeId());
       super.releaseResource(clusterResource,
           rmContainer.getContainer().getResource(),
-          labelManager.getLabelsOnNode(rmContainer.getContainer().getNodeId()));
+          node.getLabels());
       LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
           + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
           + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
index 5227aac..fe6db47 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
@@ -19,12 +19,14 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
 
 
+import java.util.Set;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@@ -32,9 +34,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 public class FiCaSchedulerNode extends SchedulerNode {
 
   private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class);
+  
+  public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName,
+      Set<String> nodeLabels) {
+    super(node, usePortForNodeName, nodeLabels);
+  }
 
   public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) {
-    super(node, usePortForNodeName);
+    this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeLabelsUpdateSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeLabelsUpdateSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeLabelsUpdateSchedulerEvent.java
new file mode 100644
index 0000000..7723e25
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeLabelsUpdateSchedulerEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class NodeLabelsUpdateSchedulerEvent extends SchedulerEvent {
+  private Map<NodeId, Set<String>> nodeToLabels;
+
+  public NodeLabelsUpdateSchedulerEvent(Map<NodeId, Set<String>> nodeToLabels) {
+    super(SchedulerEventType.NODE_LABELS_UPDATE);
+    this.nodeToLabels = nodeToLabels;
+  }
+  
+  public Map<NodeId, Set<String>> getUpdatedNodeToLabels() {
+    return nodeToLabels;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
index 062f831..13aecb3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
@@ -25,6 +25,7 @@ public enum SchedulerEventType {
   NODE_REMOVED,
   NODE_UPDATE,
   NODE_RESOURCE_UPDATE,
+  NODE_LABELS_UPDATE,
 
   // Source: RMApp
   APP_ADDED,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 228f200..278c151 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 
@@ -206,7 +207,7 @@ public class MockNodes {
 
     @Override
     public Set<String> getNodeLabels() {
-      return null;
+      return RMNodeLabelsManager.EMPTY_STRING_SET;
     }
   };
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88f022da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
new file mode 100644
index 0000000..261fa01
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class TestCapacitySchedulerNodeLabelUpdate {
+  private final int GB = 1024;
+
+  private YarnConfiguration conf;
+  
+  RMNodeLabelsManager mgr;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+    mgr = new MemoryRMNodeLabelsManager();
+    mgr.init(conf);
+  }
+  
+  private Configuration getConfigurationWithQueueLabels(Configuration config) {
+    CapacitySchedulerConfiguration conf =
+        new CapacitySchedulerConfiguration(config);
+    
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a"});
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
+
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    conf.setCapacity(A, 100);
+    conf.setAccessibleNodeLabels(A, ImmutableSet.of("x", "y", "z"));
+    conf.setCapacityByLabel(A, "x", 100);
+    conf.setCapacityByLabel(A, "y", 100);
+    conf.setCapacityByLabel(A, "z", 100);
+    
+    return conf;
+  }
+  
+  private Set<String> toSet(String... elements) {
+    Set<String> set = Sets.newHashSet(elements);
+    return set;
+  }
+  
+  private void checkUsedResource(MockRM rm, String queueName, int memory) {
+    checkUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL);
+  }
+  
+  private void checkUsedResource(MockRM rm, String queueName, int memory,
+      String label) {
+    CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
+    CSQueue queue = scheduler.getQueue(queueName);
+    Assert.assertEquals(memory, queue.getUsedResourceByLabel(label).getMemory());
+  }
+
+  @Test (timeout = 30000)
+  public void testNodeUpdate() throws Exception {
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z"));
+    
+    // set mapping:
+    // h1 -> x
+    // h2 -> y
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));
+
+    // inject node label manager
+    MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm.getRMContext().setNodeLabelManager(mgr);
+    rm.start();
+    MockNM nm1 = rm.registerNode("h1:1234", 8000);
+    MockNM nm2 = rm.registerNode("h2:1234", 8000);
+    MockNM nm3 = rm.registerNode("h3:1234", 8000);
+    
+    ContainerId containerId;
+
+    // launch an app to queue a1 (label = x), and check all container will
+    // be allocated in h1
+    RMApp app1 = rm.submitApp(GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm3);
+
+    // request a container.
+    am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
+    containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    Assert.assertTrue(rm.waitForState(nm1, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    
+    // check used resource:
+    // queue-a used x=1G, ""=1G
+    checkUsedResource(rm, "a", 1024, "x");
+    checkUsedResource(rm, "a", 1024);
+    
+    // change h1's label to z, container should be killed
+    mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
+        toSet("z")));
+    Assert.assertTrue(rm.waitForState(nm1, containerId,
+        RMContainerState.KILLED, 10 * 1000));
+    
+    // check used resource:
+    // queue-a used x=0G, ""=1G ("" not changed)
+    checkUsedResource(rm, "a", 0, "x");
+    checkUsedResource(rm, "a", 1024);
+    
+    // request a container with label = y
+    am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "y");
+    containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+    Assert.assertTrue(rm.waitForState(nm2, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    
+    // check used resource:
+    // queue-a used y=1G, ""=1G
+    checkUsedResource(rm, "a", 1024, "y");
+    checkUsedResource(rm, "a", 1024);
+    
+    // change h2's label to no label, container should be killed
+    mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h2", 0),
+        CommonNodeLabelsManager.EMPTY_STRING_SET));
+    Assert.assertTrue(rm.waitForState(nm1, containerId,
+        RMContainerState.KILLED, 10 * 1000));
+    
+    // check used resource:
+    // queue-a used x=0G, y=0G, ""=1G ("" not changed)
+    checkUsedResource(rm, "a", 0, "x");
+    checkUsedResource(rm, "a", 0, "y");
+    checkUsedResource(rm, "a", 1024);
+    
+    containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+    
+    // change h3's label to z, AM container should be killed
+    mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h3", 0),
+        toSet("z")));
+    Assert.assertTrue(rm.waitForState(nm1, containerId,
+        RMContainerState.KILLED, 10 * 1000));
+    
+    // check used resource:
+    // queue-a used x=0G, y=0G, ""=1G ("" not changed)
+    checkUsedResource(rm, "a", 0, "x");
+    checkUsedResource(rm, "a", 0, "y");
+    checkUsedResource(rm, "a", 0);
+
+    rm.close();
+  }
+}


Mime
View raw message