hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject hadoop git commit: YARN-4719. Add a helper library to maintain node state and allows common queries. (kasha)
Date Mon, 14 Mar 2016 21:25:08 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 dd462bc32 -> b4c869309


YARN-4719. Add a helper library to maintain node state and allows common queries. (kasha)

(cherry picked from commit 20d389ce61eaacb5ddfb329015f50e96ad894f8d)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b4c86930
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b4c86930
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b4c86930

Branch: refs/heads/branch-2
Commit: b4c869309694969cd3f9fda59a6218b32e4d9ece
Parents: dd462bc
Author: Karthik Kambatla <kasha@cloudera.com>
Authored: Mon Mar 14 14:19:05 2016 -0700
Committer: Karthik Kambatla <kasha@cloudera.com>
Committed: Mon Mar 14 14:22:21 2016 -0700

----------------------------------------------------------------------
 .../scheduler/AbstractYarnScheduler.java        | 170 ++---------
 .../scheduler/ClusterNodeTracker.java           | 300 +++++++++++++++++++
 .../resourcemanager/scheduler/NodeFilter.java   |  33 ++
 .../scheduler/capacity/CapacityScheduler.java   |  86 +++---
 .../scheduler/fair/FSAppAttempt.java            |  13 +-
 .../scheduler/fair/FairScheduler.java           | 104 +++----
 .../scheduler/fifo/FifoScheduler.java           |  32 +-
 .../scheduler/TestAbstractYarnScheduler.java    |  14 +-
 .../scheduler/capacity/TestReservations.java    |  19 +-
 .../scheduler/fair/TestFairScheduler.java       |   7 +-
 .../scheduler/fifo/TestFifoScheduler.java       |  19 +-
 11 files changed, 478 insertions(+), 319 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4c86930/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 7ca8671..7d12301 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -27,11 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -92,22 +88,10 @@ public abstract class AbstractYarnScheduler
 
   private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
 
-  // Nodes in the cluster, indexed by NodeId
-  protected Map<NodeId, N> nodes = new ConcurrentHashMap<NodeId, N>();
-
-  // Whole capacity of the cluster
-  protected Resource clusterResource = Resource.newInstance(0, 0);
+  protected final ClusterNodeTracker<N> nodeTracker =
+      new ClusterNodeTracker<>();
 
   protected Resource minimumAllocation;
-  protected Resource maximumAllocation;
-  private Resource configuredMaximumAllocation;
-  private int maxNodeMemory = -1;
-  private int maxNodeVCores = -1;
-  private final ReadLock maxAllocReadLock;
-  private final WriteLock maxAllocWriteLock;
-
-  private boolean useConfiguredMaximumAllocationOnly = true;
-  private long configuredMaximumAllocationWaitTime;
 
   protected RMContext rmContext;
   
@@ -132,9 +116,6 @@ public abstract class AbstractYarnScheduler
    */
   public AbstractYarnScheduler(String name) {
     super(name);
-    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-    this.maxAllocReadLock = lock.readLock();
-    this.maxAllocWriteLock = lock.writeLock();
   }
 
   @Override
@@ -142,14 +123,21 @@ public abstract class AbstractYarnScheduler
     nmExpireInterval =
         conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
           YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
-    configuredMaximumAllocationWaitTime =
+    long configuredMaximumAllocationWaitTime =
         conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
           YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
+    nodeTracker.setConfiguredMaxAllocationWaitTime(
+        configuredMaximumAllocationWaitTime);
     maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);
     createReleaseCache();
     super.serviceInit(conf);
   }
 
+  @VisibleForTesting
+  public ClusterNodeTracker getNodeTracker() {
+    return nodeTracker;
+  }
+
   public List<Container> getTransferredContainers(
       ApplicationAttemptId currentAttempt) {
     ApplicationId appId = currentAttempt.getApplicationId();
@@ -184,20 +172,21 @@ public abstract class AbstractYarnScheduler
    * Add blacklisted NodeIds to the list that is passed.
    *
    * @param app application attempt.
-   * @param blacklistNodeIdList the list to store blacklisted NodeIds.
    */
-  public void addBlacklistedNodeIdsToList(SchedulerApplicationAttempt app,
-      List<NodeId> blacklistNodeIdList) {
-    for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
-      if (SchedulerAppUtils.isBlacklisted(app, nodeEntry.getValue(), LOG)) {
-        blacklistNodeIdList.add(nodeEntry.getKey());
+  public List<N> getBlacklistedNodes(final SchedulerApplicationAttempt app) {
+
+    NodeFilter nodeFilter = new NodeFilter() {
+      @Override
+      public boolean accept(SchedulerNode node) {
+        return SchedulerAppUtils.isBlacklisted(app, node, LOG);
       }
-    }
+    };
+    return nodeTracker.getNodes(nodeFilter);
   }
 
   @Override
   public Resource getClusterResource() {
-    return clusterResource;
+    return nodeTracker.getClusterCapacity();
   }
 
   @Override
@@ -207,22 +196,7 @@ public abstract class AbstractYarnScheduler
 
   @Override
   public Resource getMaximumResourceCapability() {
-    Resource maxResource;
-    maxAllocReadLock.lock();
-    try {
-      if (useConfiguredMaximumAllocationOnly) {
-        if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
-            > configuredMaximumAllocationWaitTime) {
-          useConfiguredMaximumAllocationOnly = false;
-        }
-        maxResource = Resources.clone(configuredMaximumAllocation);
-      } else {
-        maxResource = Resources.clone(maximumAllocation);
-      }
-    } finally {
-      maxAllocReadLock.unlock();
-    }
-    return maxResource;
+    return nodeTracker.getMaxAllowedAllocation();
   }
 
   @Override
@@ -231,15 +205,7 @@ public abstract class AbstractYarnScheduler
   }
 
   protected void initMaximumResourceCapability(Resource maximumAllocation) {
-    maxAllocWriteLock.lock();
-    try {
-      if (this.configuredMaximumAllocation == null) {
-        this.configuredMaximumAllocation = Resources.clone(maximumAllocation);
-        this.maximumAllocation = Resources.clone(maximumAllocation);
-      }
-    } finally {
-      maxAllocWriteLock.unlock();
-    }
+    nodeTracker.setConfiguredMaxAllocation(maximumAllocation);
   }
 
   protected synchronized void containerLaunchedOnNode(
@@ -332,8 +298,7 @@ public abstract class AbstractYarnScheduler
 
   @Override
   public SchedulerNodeReport getNodeReport(NodeId nodeId) {
-    N node = nodes.get(nodeId);
-    return node == null ? null : new SchedulerNodeReport(node);
+    return nodeTracker.getNodeReport(nodeId);
   }
 
   @Override
@@ -431,12 +396,13 @@ public abstract class AbstractYarnScheduler
         container));
 
       // recover scheduler node
-      SchedulerNode schedulerNode = nodes.get(nm.getNodeID());
+      SchedulerNode schedulerNode = nodeTracker.getNode(nm.getNodeID());
       schedulerNode.recoverContainer(rmContainer);
 
       // recover queue: update headroom etc.
       Queue queue = schedulerAttempt.getQueue();
-      queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);
+      queue.recoverContainer(
+          getClusterResource(), schedulerAttempt, rmContainer);
 
       // recover scheduler attempt
       schedulerAttempt.recoverContainer(schedulerNode, rmContainer);
@@ -621,7 +587,7 @@ public abstract class AbstractYarnScheduler
 
   @Override
   public SchedulerNode getSchedulerNode(NodeId nodeId) {
-    return nodes.get(nodeId);
+    return nodeTracker.getNode(nodeId);
   }
 
   @Override
@@ -690,18 +656,12 @@ public abstract class AbstractYarnScheduler
           + " from: " + oldResource + ", to: "
           + newResource);
 
-      nodes.remove(nm.getNodeID());
-      updateMaximumAllocation(node, false);
+      nodeTracker.removeNode(nm.getNodeID());
 
       // update resource to node
       node.setTotalResource(newResource);
 
-      nodes.put(nm.getNodeID(), (N)node);
-      updateMaximumAllocation(node, true);
-
-      // update resource to clusterResource
-      Resources.subtractFrom(clusterResource, oldResource);
-      Resources.addTo(clusterResource, newResource);
+      nodeTracker.addNode((N) node);
     } else {
       // Log resource change
       LOG.warn("Update resource on node: " + node.getNodeName() 
@@ -721,80 +681,8 @@ public abstract class AbstractYarnScheduler
         + " does not support reservations");
   }
 
-  protected void updateMaximumAllocation(SchedulerNode node, boolean add) {
-    Resource totalResource = node.getTotalResource();
-    maxAllocWriteLock.lock();
-    try {
-      if (add) { // added node
-        int nodeMemory = totalResource.getMemory();
-        if (nodeMemory > maxNodeMemory) {
-          maxNodeMemory = nodeMemory;
-          maximumAllocation.setMemory(Math.min(
-              configuredMaximumAllocation.getMemory(), maxNodeMemory));
-        }
-        int nodeVCores = totalResource.getVirtualCores();
-        if (nodeVCores > maxNodeVCores) {
-          maxNodeVCores = nodeVCores;
-          maximumAllocation.setVirtualCores(Math.min(
-              configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
-        }
-      } else {  // removed node
-        if (maxNodeMemory == totalResource.getMemory()) {
-          maxNodeMemory = -1;
-        }
-        if (maxNodeVCores == totalResource.getVirtualCores()) {
-          maxNodeVCores = -1;
-        }
-        // We only have to iterate through the nodes if the current max memory
-        // or vcores was equal to the removed node's
-        if (maxNodeMemory == -1 || maxNodeVCores == -1) {
-          for (Map.Entry<NodeId, N> nodeEntry : nodes.entrySet()) {
-            int nodeMemory =
-                nodeEntry.getValue().getTotalResource().getMemory();
-            if (nodeMemory > maxNodeMemory) {
-              maxNodeMemory = nodeMemory;
-            }
-            int nodeVCores =
-                nodeEntry.getValue().getTotalResource().getVirtualCores();
-            if (nodeVCores > maxNodeVCores) {
-              maxNodeVCores = nodeVCores;
-            }
-          }
-          if (maxNodeMemory == -1) {  // no nodes
-            maximumAllocation.setMemory(configuredMaximumAllocation.getMemory());
-          } else {
-            maximumAllocation.setMemory(
-                Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory));
-          }
-          if (maxNodeVCores == -1) {  // no nodes
-            maximumAllocation.setVirtualCores(configuredMaximumAllocation.getVirtualCores());
-          } else {
-            maximumAllocation.setVirtualCores(
-                Math.min(configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
-          }
-        }
-      }
-    } finally {
-      maxAllocWriteLock.unlock();
-    }
-  }
-
   protected void refreshMaximumAllocation(Resource newMaxAlloc) {
-    maxAllocWriteLock.lock();
-    try {
-      configuredMaximumAllocation = Resources.clone(newMaxAlloc);
-      int maxMemory = newMaxAlloc.getMemory();
-      if (maxNodeMemory != -1) {
-        maxMemory = Math.min(maxMemory, maxNodeMemory);
-      }
-      int maxVcores = newMaxAlloc.getVirtualCores();
-      if (maxNodeVCores != -1) {
-        maxVcores = Math.min(maxVcores, maxNodeVCores);
-      }
-      maximumAllocation = Resources.createResource(maxMemory, maxVcores);
-    } finally {
-      maxAllocWriteLock.unlock();
-    }
+    nodeTracker.setConfiguredMaxAllocation(newMaxAlloc);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4c86930/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
new file mode 100644
index 0000000..34b4267
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Helper library that:
+ * - tracks the state of all cluster {@link SchedulerNode}s
+ * - provides convenience methods to filter and sort nodes
+ */
+@InterfaceAudience.Private
+public class ClusterNodeTracker<N extends SchedulerNode> {
+  private static final Log LOG = LogFactory.getLog(ClusterNodeTracker.class);
+
+  private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
+  private Lock readLock = readWriteLock.readLock();
+  private Lock writeLock = readWriteLock.writeLock();
+
+  private HashMap<NodeId, N> nodes = new HashMap<>();
+  private Map<String, Integer> nodesPerRack = new HashMap<>();
+
+  private Resource clusterCapacity = Resources.clone(Resources.none());
+  private Resource staleClusterCapacity = null;
+
+  // Max allocation
+  private int maxNodeMemory = -1;
+  private int maxNodeVCores = -1;
+  private Resource configuredMaxAllocation;
+  private boolean forceConfiguredMaxAllocation = true;
+  private long configuredMaxAllocationWaitTime;
+
+  public void addNode(N node) {
+    writeLock.lock();
+    try {
+      nodes.put(node.getNodeID(), node);
+
+      // Update nodes per rack as well
+      String rackName = node.getRackName();
+      Integer numNodes = nodesPerRack.get(rackName);
+      if (numNodes == null) {
+        numNodes = 0;
+      }
+      nodesPerRack.put(rackName, ++numNodes);
+
+      // Update cluster capacity
+      Resources.addTo(clusterCapacity, node.getTotalResource());
+
+      // Update maximumAllocation
+      updateMaxResources(node, true);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public boolean exists(NodeId nodeId) {
+    readLock.lock();
+    try {
+      return nodes.containsKey(nodeId);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public N getNode(NodeId nodeId) {
+    readLock.lock();
+    try {
+      return nodes.get(nodeId);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public SchedulerNodeReport getNodeReport(NodeId nodeId) {
+    readLock.lock();
+    try {
+      N n = nodes.get(nodeId);
+      return n == null ? null : new SchedulerNodeReport(n);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public int nodeCount() {
+    readLock.lock();
+    try {
+      return nodes.size();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public int nodeCount(String rackName) {
+    readLock.lock();
+    String rName = rackName == null ? "NULL" : rackName;
+    try {
+      Integer nodeCount = nodesPerRack.get(rName);
+      return nodeCount == null ? 0 : nodeCount;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public Resource getClusterCapacity() {
+    readLock.lock();
+    try {
+      if (staleClusterCapacity == null ||
+          !Resources.equals(staleClusterCapacity, clusterCapacity)) {
+        staleClusterCapacity = Resources.clone(clusterCapacity);
+      }
+      return staleClusterCapacity;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public N removeNode(NodeId nodeId) {
+    writeLock.lock();
+    try {
+      N node = nodes.remove(nodeId);
+      if (node == null) {
+        LOG.warn("Attempting to remove a non-existent node " + nodeId);
+        return null;
+      }
+
+      // Update nodes per rack as well
+      String rackName = node.getRackName();
+      Integer numNodes = nodesPerRack.get(rackName);
+      if (numNodes > 0) {
+        nodesPerRack.put(rackName, --numNodes);
+      } else {
+        LOG.error("Attempting to remove node from an empty rack " + rackName);
+      }
+
+      // Update cluster capacity
+      Resources.subtractFrom(clusterCapacity, node.getTotalResource());
+
+      // Update maximumAllocation
+      updateMaxResources(node, false);
+
+      return node;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public void setConfiguredMaxAllocation(Resource resource) {
+    writeLock.lock();
+    try {
+      configuredMaxAllocation = Resources.clone(resource);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public void setConfiguredMaxAllocationWaitTime(
+      long configuredMaxAllocationWaitTime) {
+    writeLock.lock();
+    try {
+      this.configuredMaxAllocationWaitTime =
+          configuredMaxAllocationWaitTime;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public Resource getMaxAllowedAllocation() {
+    readLock.lock();
+    try {
+      if (forceConfiguredMaxAllocation &&
+          System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
+              > configuredMaxAllocationWaitTime) {
+        forceConfiguredMaxAllocation = false;
+      }
+
+      if (forceConfiguredMaxAllocation
+          || maxNodeMemory == -1 || maxNodeVCores == -1) {
+        return configuredMaxAllocation;
+      }
+
+      return Resources.createResource(
+          Math.min(configuredMaxAllocation.getMemory(), maxNodeMemory),
+          Math.min(configuredMaxAllocation.getVirtualCores(), maxNodeVCores)
+      );
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private void updateMaxResources(SchedulerNode node, boolean add) {
+    Resource totalResource = node.getTotalResource();
+    writeLock.lock();
+    try {
+      if (add) { // added node
+        int nodeMemory = totalResource.getMemory();
+        if (nodeMemory > maxNodeMemory) {
+          maxNodeMemory = nodeMemory;
+        }
+        int nodeVCores = totalResource.getVirtualCores();
+        if (nodeVCores > maxNodeVCores) {
+          maxNodeVCores = nodeVCores;
+        }
+      } else {  // removed node
+        if (maxNodeMemory == totalResource.getMemory()) {
+          maxNodeMemory = -1;
+        }
+        if (maxNodeVCores == totalResource.getVirtualCores()) {
+          maxNodeVCores = -1;
+        }
+        // We only have to iterate through the nodes if the current max memory
+        // or vcores was equal to the removed node's
+        if (maxNodeMemory == -1 || maxNodeVCores == -1) {
+          // Treat it like an empty cluster and add nodes
+          for (N n : nodes.values()) {
+            updateMaxResources(n, true);
+          }
+        }
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public List<N> getAllNodes() {
+    return getNodes(null);
+  }
+
+  /**
+   * Convenience method to filter nodes based on a condition.
+   */
+  public List<N> getNodes(NodeFilter nodeFilter) {
+    List<N> nodeList = new ArrayList<>();
+    readLock.lock();
+    try {
+      if (nodeFilter == null) {
+        nodeList.addAll(nodes.values());
+      } else {
+        for (N node : nodes.values()) {
+          if (nodeFilter.accept(node)) {
+            nodeList.add(node);
+          }
+        }
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return nodeList;
+  }
+
+  /**
+   * Convenience method to sort nodes.
+   *
+   * Note that the sort is performed without holding a lock. We are sorting
+   * here instead of on the caller to allow for future optimizations (e.g.
+   * sort once every x milliseconds).
+   */
+  public List<N> sortedNodeList(Comparator<N> comparator) {
+    List<N> sortedList = null;
+    readLock.lock();
+    try {
+      sortedList = new ArrayList(nodes.values());
+    } finally {
+      readLock.unlock();
+    }
+    Collections.sort(sortedList, comparator);
+    return sortedList;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4c86930/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeFilter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeFilter.java
new file mode 100644
index 0000000..7b3e7a2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeFilter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Convenience way to filter nodes based on a criteria. To be used in
+ * conjunction with {@link ClusterNodeTracker}
+ */
+@InterfaceAudience.Private
+public interface NodeFilter {
+
+  /**
+   * Criteria to accept node in the filtered list.
+   */
+  boolean accept(SchedulerNode node);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4c86930/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 6a1091d..735306a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -34,7 +34,6 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -219,8 +218,6 @@ public class CapacityScheduler extends
 
   private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
 
-  private AtomicInteger numNodeManagers = new AtomicInteger(0);
-
   private ResourceCalculator calculator;
   private boolean usePortForNodeName;
 
@@ -280,7 +277,7 @@ public class CapacityScheduler extends
 
   @Override
   public int getNumClusterNodes() {
-    return numNodeManagers.get();
+    return nodeTracker.nodeCount();
   }
 
   @Override
@@ -387,7 +384,7 @@ public class CapacityScheduler extends
   static void schedule(CapacityScheduler cs) {
     // First randomize the start point
     int current = 0;
-    Collection<FiCaSchedulerNode> nodes = cs.getAllNodes().values();
+    Collection<FiCaSchedulerNode> nodes = cs.nodeTracker.getAllNodes();
     int start = random.nextInt(nodes.size());
     for (FiCaSchedulerNode node : nodes) {
       if (current++ >= start) {
@@ -524,10 +521,11 @@ public class CapacityScheduler extends
     addNewQueues(queues, newQueues);
     
     // Re-configure queues
-    root.reinitialize(newRoot, clusterResource);
+    root.reinitialize(newRoot, getClusterResource());
     updatePlacementRules();
 
     // Re-calculate headroom for active applications
+    Resource clusterResource = getClusterResource();
     root.updateClusterResource(clusterResource, new ResourceLimits(
         clusterResource));
 
@@ -995,7 +993,7 @@ public class CapacityScheduler extends
       
 
       allocation = application.getAllocation(getResourceCalculator(),
-                   clusterResource, getMinimumResourceCapability());
+          getClusterResource(), getMinimumResourceCapability());
     }
 
     if (updateDemandForQueue != null && !application
@@ -1036,7 +1034,8 @@ public class CapacityScheduler extends
 
   private synchronized void nodeUpdate(RMNode nm) {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
+      LOG.debug("nodeUpdate: " + nm +
+          " clusterResources: " + getClusterResource());
     }
 
     Resource releaseResources = Resource.newInstance(0, 0);
@@ -1119,6 +1118,7 @@ public class CapacityScheduler extends
   private synchronized void updateNodeAndQueueResource(RMNode nm, 
       ResourceOption resourceOption) {
     updateNodeResource(nm, resourceOption);
+    Resource clusterResource = getClusterResource();
     root.updateClusterResource(clusterResource, new ResourceLimits(
         clusterResource));
   }
@@ -1128,7 +1128,7 @@ public class CapacityScheduler extends
    */
   private synchronized void updateLabelsOnNode(NodeId nodeId,
       Set<String> newLabels) {
-    FiCaSchedulerNode node = nodes.get(nodeId);
+    FiCaSchedulerNode node = nodeTracker.getNode(nodeId);
     if (null == node) {
       return;
     }
@@ -1230,12 +1230,12 @@ public class CapacityScheduler extends
       LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
       assignment =
           queue.assignContainers(
-              clusterResource,
+              getClusterResource(),
               node,
               // TODO, now we only consider limits for parent for non-labeled
               // resources, should consider labeled resources as well.
               new ResourceLimits(labelManager.getResourceByLabel(
-                  RMNodeLabelsManager.NO_LABEL, clusterResource)),
+                  RMNodeLabelsManager.NO_LABEL, getClusterResource())),
               SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
       if (assignment.isFulfilledReservation()) {
         CSAssignment tmp =
@@ -1261,14 +1261,14 @@ public class CapacityScheduler extends
         }
 
         assignment = root.assignContainers(
-            clusterResource,
+            getClusterResource(),
             node,
             // TODO, now we only consider limits for parent for non-labeled
             // resources, should consider labeled resources as well.
             new ResourceLimits(labelManager.getResourceByLabel(
-                RMNodeLabelsManager.NO_LABEL, clusterResource)),
+                RMNodeLabelsManager.NO_LABEL, getClusterResource())),
             SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-        if (Resources.greaterThan(calculator, clusterResource,
+        if (Resources.greaterThan(calculator, getClusterResource(),
             assignment.getResource(), Resources.none())) {
           updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
           return;
@@ -1294,12 +1294,12 @@ public class CapacityScheduler extends
         
         // Try to use NON_EXCLUSIVE
         assignment = root.assignContainers(
-            clusterResource,
+            getClusterResource(),
             node,
             // TODO, now we only consider limits for parent for non-labeled
             // resources, should consider labeled resources as well.
             new ResourceLimits(labelManager.getResourceByLabel(
-                RMNodeLabelsManager.NO_LABEL, clusterResource)),
+                RMNodeLabelsManager.NO_LABEL, getClusterResource())),
             SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY);
         updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
       }
@@ -1451,24 +1451,22 @@ public class CapacityScheduler extends
   private synchronized void addNode(RMNode nodeManager) {
     FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
         usePortForNodeName, nodeManager.getNodeLabels());
-    this.nodes.put(nodeManager.getNodeID(), schedulerNode);
-    Resources.addTo(clusterResource, schedulerNode.getTotalResource());
+    nodeTracker.addNode(schedulerNode);
 
     // update this node to node label manager
     if (labelManager != null) {
       labelManager.activateNode(nodeManager.getNodeID(),
           schedulerNode.getTotalResource());
     }
-    
+
+    Resource clusterResource = getClusterResource();
     root.updateClusterResource(clusterResource, new ResourceLimits(
         clusterResource));
-    int numNodes = numNodeManagers.incrementAndGet();
-    updateMaximumAllocation(schedulerNode, true);
-    
+
     LOG.info("Added node " + nodeManager.getNodeAddress() + 
         " clusterResource: " + clusterResource);
 
-    if (scheduleAsynchronously && numNodes == 1) {
+    if (scheduleAsynchronously && getNumClusterNodes() == 1) {
       asyncSchedulerThread.beginSchedule();
     }
   }
@@ -1478,20 +1476,14 @@ public class CapacityScheduler extends
     if (labelManager != null) {
       labelManager.deactivateNode(nodeInfo.getNodeID());
     }
-    
-    FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID());
+
+    NodeId nodeId = nodeInfo.getNodeID();
+    FiCaSchedulerNode node = nodeTracker.getNode(nodeId);
     if (node == null) {
+      LOG.error("Attempting to remove non-existent node " + nodeId);
       return;
     }
-    Resources.subtractFrom(clusterResource, node.getTotalResource());
-    root.updateClusterResource(clusterResource, new ResourceLimits(
-        clusterResource));
-    int numNodes = numNodeManagers.decrementAndGet();
 
-    if (scheduleAsynchronously && numNodes == 0) {
-      asyncSchedulerThread.suspendSchedule();
-    }
-    
     // Remove running containers
     List<RMContainer> runningContainers = node.getRunningContainers();
     for (RMContainer container : runningContainers) {
@@ -1512,11 +1504,18 @@ public class CapacityScheduler extends
           RMContainerEventType.KILL);
     }
 
-    this.nodes.remove(nodeInfo.getNodeID());
-    updateMaximumAllocation(node, false);
+    nodeTracker.removeNode(nodeId);
+    Resource clusterResource = getClusterResource();
+    root.updateClusterResource(clusterResource, new ResourceLimits(
+        clusterResource));
+    int numNodes = nodeTracker.nodeCount();
+
+    if (scheduleAsynchronously && numNodes == 0) {
+      asyncSchedulerThread.suspendSchedule();
+    }
 
     LOG.info("Removed node " + nodeInfo.getNodeAddress() + 
-        " clusterResource: " + clusterResource);
+        " clusterResource: " + getClusterResource());
   }
 
   private void rollbackContainerResource(
@@ -1568,7 +1567,7 @@ public class CapacityScheduler extends
     
     // Inform the queue
     LeafQueue queue = (LeafQueue)application.getQueue();
-    queue.completedContainer(clusterResource, application, node, 
+    queue.completedContainer(getClusterResource(), application, node,
         rmContainer, containerStatus, event, null, true);
 
     if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) {
@@ -1594,7 +1593,7 @@ public class CapacityScheduler extends
     FiCaSchedulerApp app = (FiCaSchedulerApp)attempt;
     LeafQueue queue = (LeafQueue) attempt.getQueue();
     try {
-      queue.decreaseContainer(clusterResource, decreaseRequest, app);
+      queue.decreaseContainer(getClusterResource(), decreaseRequest, app);
       // Notify RMNode that the container can be pulled by NodeManager in the
       // next heartbeat
       this.rmContext.getDispatcher().getEventHandler()
@@ -1617,14 +1616,9 @@ public class CapacityScheduler extends
   
   @Lock(Lock.NoLock.class)
   public FiCaSchedulerNode getNode(NodeId nodeId) {
-    return nodes.get(nodeId);
+    return nodeTracker.getNode(nodeId);
   }
   
-  @Lock(Lock.NoLock.class)
-  Map<NodeId, FiCaSchedulerNode> getAllNodes() {
-    return nodes;
-  }
-
   @Override
   @Lock(Lock.NoLock.class)
   public void recover(RMState state) throws Exception {
@@ -1869,9 +1863,9 @@ public class CapacityScheduler extends
     }
     // Move all live containers
     for (RMContainer rmContainer : app.getLiveContainers()) {
-      source.detachContainer(clusterResource, app, rmContainer);
+      source.detachContainer(getClusterResource(), app, rmContainer);
       // attach the Container to another queue
-      dest.attachContainer(clusterResource, app, rmContainer);
+      dest.attachContainer(getClusterResource(), app, rmContainer);
     }
     // Detach the application..
     source.finishApplicationAttempt(app, sourceQueueName);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4c86930/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index f1cefad..e426da6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -86,7 +86,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   // Key = RackName, Value = Set of Nodes reserved by app on rack
   private Map<String, Set<String>> reservations = new HashMap<>();
 
-  private List<NodeId> blacklistNodeIds = new ArrayList<NodeId>();
+  private List<FSSchedulerNode> blacklistNodeIds = new ArrayList<>();
   /**
    * Delay scheduling: We often want to prioritize scheduling of node-local
    * containers over rack-local or off-switch containers. To achieve this
@@ -185,14 +185,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       Resource availableResources) {
     if (appSchedulingInfo.getAndResetBlacklistChanged()) {
       blacklistNodeIds.clear();
-      scheduler.addBlacklistedNodeIdsToList(this, blacklistNodeIds);
+      blacklistNodeIds.addAll(scheduler.getBlacklistedNodes(this));
     }
-    for (NodeId nodeId: blacklistNodeIds) {
-      SchedulerNode node = scheduler.getSchedulerNode(nodeId);
-      if (node != null) {
-        Resources.subtractFrom(availableResources,
-            node.getUnallocatedResource());
-      }
+    for (FSSchedulerNode node: blacklistNodeIds) {
+      Resources.subtractFrom(availableResources,
+          node.getUnallocatedResource());
     }
     if (availableResources.getMemory() < 0) {
       availableResources.setMemory(0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4c86930/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 917fc8a..ba90e21 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -20,13 +20,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -186,14 +184,11 @@ public class FairScheduler extends
   private float reservableNodesRatio; // percentage of available nodes
                                       // an app can be reserved on
 
-  // Count of number of nodes per rack
-  private Map<String, Integer> nodesPerRack = new ConcurrentHashMap<>();
-
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
   protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
   protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
   protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
-  private Comparator<NodeId> nodeAvailableResourceComparator =
+  private Comparator<FSSchedulerNode> nodeAvailableResourceComparator =
           new NodeAvailableResourceComparator(); // Node available resource comparator
   protected double nodeLocalityThreshold; // Cluster threshold for node locality
   protected double rackLocalityThreshold; // Cluster threshold for rack locality
@@ -225,8 +220,8 @@ public class FairScheduler extends
 
   public boolean isAtLeastReservationThreshold(
       ResourceCalculator resourceCalculator, Resource resource) {
-    return Resources.greaterThanOrEqual(
-        resourceCalculator, clusterResource, resource, reservationThreshold);
+    return Resources.greaterThanOrEqual(resourceCalculator,
+        getClusterResource(), resource, reservationThreshold);
   }
 
   private void validateConf(Configuration conf) {
@@ -272,11 +267,7 @@ public class FairScheduler extends
   }
 
   public int getNumNodesInRack(String rackName) {
-    String rName = rackName == null ? "NULL" : rackName;
-    if (nodesPerRack.containsKey(rName)) {
-      return nodesPerRack.get(rName);
-    }
-    return 0;
+    return nodeTracker.nodeCount(rackName);
   }
 
   public QueueManager getQueueManager() {
@@ -352,6 +343,7 @@ public class FairScheduler extends
     // Recursively update demands for all queues
     rootQueue.updateDemand();
 
+    Resource clusterResource = getClusterResource();
     rootQueue.setFairShare(clusterResource);
     // Recursively compute fair shares for all queues
     // and update metrics
@@ -526,6 +518,7 @@ public class FairScheduler extends
     Resource resDueToMinShare = Resources.none();
     Resource resDueToFairShare = Resources.none();
     ResourceCalculator calc = sched.getPolicy().getResourceCalculator();
+    Resource clusterResource = getClusterResource();
     if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
       Resource target = Resources.componentwiseMin(
           sched.getMinShare(), sched.getDemand());
@@ -577,7 +570,7 @@ public class FairScheduler extends
   }
 
   private FSSchedulerNode getFSSchedulerNode(NodeId nodeId) {
-    return nodes.get(nodeId);
+    return nodeTracker.getNode(nodeId);
   }
 
   public double getNodeLocalityThreshold() {
@@ -882,18 +875,11 @@ public class FairScheduler extends
   private synchronized void addNode(List<NMContainerStatus> containerReports,
       RMNode node) {
     FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
-    nodes.put(node.getNodeID(), schedulerNode);
-    String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
-    if (nodesPerRack.containsKey(rackName)) {
-      nodesPerRack.put(rackName, nodesPerRack.get(rackName) + 1);
-    } else {
-      nodesPerRack.put(rackName, 1);
-    }
-    Resources.addTo(clusterResource, schedulerNode.getTotalResource());
-    updateMaximumAllocation(schedulerNode, true);
+    nodeTracker.addNode(schedulerNode);
 
     triggerUpdate();
 
+    Resource clusterResource = getClusterResource();
     queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
     queueMgr.getRootQueue().recomputeSteadyShares();
     LOG.info("Added node " + node.getNodeAddress() +
@@ -904,15 +890,12 @@ public class FairScheduler extends
   }
 
   private synchronized void removeNode(RMNode rmNode) {
-    FSSchedulerNode node = getFSSchedulerNode(rmNode.getNodeID());
-    // This can occur when an UNHEALTHY node reconnects
+    NodeId nodeId = rmNode.getNodeID();
+    FSSchedulerNode node = nodeTracker.getNode(nodeId);
     if (node == null) {
+      LOG.error("Attempting to remove non-existent node " + nodeId);
       return;
     }
-    Resources.subtractFrom(clusterResource, node.getTotalResource());
-    updateRootQueueMetrics();
-
-    triggerUpdate();
 
     // Remove running containers
     List<RMContainer> runningContainers = node.getRunningContainers();
@@ -934,18 +917,13 @@ public class FairScheduler extends
           RMContainerEventType.KILL);
     }
 
-    nodes.remove(rmNode.getNodeID());
-    String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
-    if (nodesPerRack.containsKey(rackName)
-            && (nodesPerRack.get(rackName) > 0)) {
-      nodesPerRack.put(rackName, nodesPerRack.get(rackName) - 1);
-    } else {
-      LOG.error("Node [" + rmNode.getNodeAddress() + "] being removed from" +
-              " unknown rack [" + rackName + "] !!");
-    }
+    nodeTracker.removeNode(nodeId);
+    Resource clusterResource = getClusterResource();
     queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
     queueMgr.getRootQueue().recomputeSteadyShares();
-    updateMaximumAllocation(node, false);
+    updateRootQueueMetrics();
+    triggerUpdate();
+
     LOG.info("Removed node " + rmNode.getNodeAddress() +
         " cluster capacity: " + clusterResource);
   }
@@ -967,7 +945,7 @@ public class FairScheduler extends
 
     // Sanity check
     SchedulerUtils.normalizeRequests(ask, DOMINANT_RESOURCE_CALCULATOR,
-        clusterResource, minimumAllocation, getMaximumResourceCapability(),
+        getClusterResource(), minimumAllocation, getMaximumResourceCapability(),
         incrAllocation);
 
     // Record container allocation start time
@@ -1034,7 +1012,8 @@ public class FairScheduler extends
   private synchronized void nodeUpdate(RMNode nm) {
     long start = getClock().getTime();
     if (LOG.isDebugEnabled()) {
-      LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
+      LOG.debug("nodeUpdate: " + nm +
+          " cluster capacity: " + getClusterResource());
     }
     eventLog.log("HEARTBEAT", nm.getHostName());
     FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
@@ -1091,20 +1070,13 @@ public class FairScheduler extends
 
   void continuousSchedulingAttempt() throws InterruptedException {
     long start = getClock().getTime();
-    List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
-    // Sort the nodes by space available on them, so that we offer
-    // containers on emptier nodes first, facilitating an even spread. This
-    // requires holding the scheduler lock, so that the space available on a
-    // node doesn't change during the sort.
-    synchronized (this) {
-      Collections.sort(nodeIdList, nodeAvailableResourceComparator);
-    }
+    List<FSSchedulerNode> nodeIdList =
+        nodeTracker.sortedNodeList(nodeAvailableResourceComparator);
 
     // iterate all nodes
-    for (NodeId nodeId : nodeIdList) {
-      FSSchedulerNode node = getFSSchedulerNode(nodeId);
+    for (FSSchedulerNode node : nodeIdList) {
       try {
-        if (node != null && Resources.fitsIn(minimumAllocation,
+        if (Resources.fitsIn(minimumAllocation,
             node.getUnallocatedResource())) {
           attemptScheduling(node);
         }
@@ -1126,19 +1098,14 @@ public class FairScheduler extends
   }
 
   /** Sort nodes by available resource */
-  private class NodeAvailableResourceComparator implements Comparator<NodeId> {
+  private class NodeAvailableResourceComparator
+      implements Comparator<FSSchedulerNode> {
 
     @Override
-    public int compare(NodeId n1, NodeId n2) {
-      if (!nodes.containsKey(n1)) {
-        return 1;
-      }
-      if (!nodes.containsKey(n2)) {
-        return -1;
-      }
-      return RESOURCE_CALCULATOR.compare(clusterResource,
-              nodes.get(n2).getUnallocatedResource(),
-              nodes.get(n1).getUnallocatedResource());
+    public int compare(FSSchedulerNode n1, FSSchedulerNode n2) {
+      return RESOURCE_CALCULATOR.compare(getClusterResource(),
+          n2.getUnallocatedResource(),
+          n1.getUnallocatedResource());
     }
   }
 
@@ -1150,7 +1117,7 @@ public class FairScheduler extends
     }
 
     final NodeId nodeID = node.getNodeID();
-    if (!nodes.containsKey(nodeID)) {
+    if (!nodeTracker.exists(nodeID)) {
       // The node might have just been removed while this thread was waiting
       // on the synchronized lock before it entered this synchronized method
       LOG.info("Skipping scheduling as the node " + nodeID +
@@ -1203,7 +1170,7 @@ public class FairScheduler extends
   private void updateRootQueueMetrics() {
     rootMetrics.setAvailableResourcesToQueue(
         Resources.subtract(
-            clusterResource, rootMetrics.getAllocatedResources()));
+            getClusterResource(), rootMetrics.getAllocatedResources()));
   }
 
   /**
@@ -1214,6 +1181,7 @@ public class FairScheduler extends
    */
   private boolean shouldAttemptPreemption() {
     if (preemptionEnabled) {
+      Resource clusterResource = getClusterResource();
       return (preemptionUtilizationThreshold < Math.max(
           (float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(),
           (float) rootMetrics.getAllocatedVirtualCores() /
@@ -1547,7 +1515,7 @@ public class FairScheduler extends
 
   @Override
   public int getNumClusterNodes() {
-    return nodes.size();
+    return nodeTracker.nodeCount();
   }
 
   @Override
@@ -1577,7 +1545,7 @@ public class FairScheduler extends
       // if it does not already exist, so it can be displayed on the web UI.
       synchronized (FairScheduler.this) {
         allocConf = queueInfo;
-        allocConf.getDefaultSchedulingPolicy().initialize(clusterResource);
+        allocConf.getDefaultSchedulingPolicy().initialize(getClusterResource());
         queueMgr.updateAllocationConfiguration(allocConf);
         maxRunningEnforcer.updateRunnabilityOnReload();
       }
@@ -1721,7 +1689,7 @@ public class FairScheduler extends
       ResourceOption resourceOption) {
     super.updateNodeResource(nm, resourceOption);
     updateRootQueueMetrics();
-    queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
+    queueMgr.getRootQueue().setSteadyFairShare(getClusterResource());
     queueMgr.getRootQueue().recomputeSteadyShares();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4c86930/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 147c3f3..cf12501 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -142,6 +142,7 @@ public class FifoScheduler extends
       QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
       queueInfo.setQueueName(DEFAULT_QUEUE.getQueueName());
       queueInfo.setCapacity(1.0f);
+      Resource clusterResource = getClusterResource();
       if (clusterResource.getMemory() == 0) {
         queueInfo.setCurrentCapacity(0.0f);
       } else {
@@ -297,7 +298,7 @@ public class FifoScheduler extends
 
   @Override
   public int getNumClusterNodes() {
-    return nodes.size();
+    return nodeTracker.nodeCount();
   }
 
   @Override
@@ -327,7 +328,8 @@ public class FifoScheduler extends
 
     // Sanity check
     SchedulerUtils.normalizeRequests(ask, resourceCalculator, 
-        clusterResource, minimumAllocation, getMaximumResourceCapability());
+        getClusterResource(), minimumAllocation,
+        getMaximumResourceCapability());
 
     // Release containers
     releaseContainers(release, application);
@@ -377,7 +379,7 @@ public class FifoScheduler extends
   }
 
   private FiCaSchedulerNode getNode(NodeId nodeId) {
-    return nodes.get(nodeId);
+    return nodeTracker.getNode(nodeId);
   }
 
   @VisibleForTesting
@@ -526,7 +528,7 @@ public class FifoScheduler extends
       application.showRequests();
 
       // Done
-      if (Resources.lessThan(resourceCalculator, clusterResource,
+      if (Resources.lessThan(resourceCalculator, getClusterResource(),
               node.getUnallocatedResource(), minimumAllocation)) {
         break;
       }
@@ -764,7 +766,7 @@ public class FifoScheduler extends
       return;
     }
 
-    if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
+    if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
             node.getUnallocatedResource(), minimumAllocation)) {
       LOG.debug("Node heartbeat " + rmNode.getNodeID() + 
           " available resource = " + node.getUnallocatedResource());
@@ -783,13 +785,13 @@ public class FifoScheduler extends
   }
 
   private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) {
-    schedulerAttempt.setHeadroom(Resources.subtract(clusterResource,
+    schedulerAttempt.setHeadroom(Resources.subtract(getClusterResource(),
       usedResource));
   }
 
   private void updateAvailableResourcesMetrics() {
-    metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource,
-      usedResource));
+    metrics.setAvailableResourcesToQueue(
+        Resources.subtract(getClusterResource(), usedResource));
   }
 
   @Override
@@ -925,7 +927,7 @@ public class FifoScheduler extends
   private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
 
   private synchronized void removeNode(RMNode nodeInfo) {
-    FiCaSchedulerNode node = getNode(nodeInfo.getNodeID());
+    FiCaSchedulerNode node = nodeTracker.getNode(nodeInfo.getNodeID());
     if (node == null) {
       return;
     }
@@ -937,13 +939,7 @@ public class FifoScheduler extends
               SchedulerUtils.LOST_CONTAINER),
               RMContainerEventType.KILL);
     }
-    
-    //Remove the node
-    this.nodes.remove(nodeInfo.getNodeID());
-    updateMaximumAllocation(node, false);
-    
-    // Update cluster metrics
-    Resources.subtractFrom(clusterResource, node.getTotalResource());
+    nodeTracker.removeNode(nodeInfo.getNodeID());
   }
 
   @Override
@@ -965,9 +961,7 @@ public class FifoScheduler extends
   private synchronized void addNode(RMNode nodeManager) {
     FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
         usePortForNodeName);
-    this.nodes.put(nodeManager.getNodeID(), schedulerNode);
-    Resources.addTo(clusterResource, schedulerNode.getTotalResource());
-    updateMaximumAllocation(schedulerNode, true);
+    nodeTracker.addNode(schedulerNode);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4c86930/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
index e7ba58d..81c8fe6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
@@ -300,22 +300,16 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
 
       verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
 
-      scheduler.nodes = new HashMap<NodeId, SchedulerNode>();
-
-      scheduler.nodes.put(mockNode1.getNodeID(), mockNode1);
-      scheduler.updateMaximumAllocation(mockNode1, true);
+      scheduler.nodeTracker.addNode(mockNode1);
       verifyMaximumResourceCapability(fullResource1, scheduler);
 
-      scheduler.nodes.put(mockNode2.getNodeID(), mockNode2);
-      scheduler.updateMaximumAllocation(mockNode2, true);
+      scheduler.nodeTracker.addNode(mockNode2);
       verifyMaximumResourceCapability(fullResource2, scheduler);
 
-      scheduler.nodes.remove(mockNode2.getNodeID());
-      scheduler.updateMaximumAllocation(mockNode2, false);
+      scheduler.nodeTracker.removeNode(mockNode2.getNodeID());
       verifyMaximumResourceCapability(fullResource1, scheduler);
 
-      scheduler.nodes.remove(mockNode1.getNodeID());
-      scheduler.updateMaximumAllocation(mockNode1, false);
+      scheduler.nodeTracker.removeNode(mockNode1.getNodeID());
       verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
     } finally {
       rm.stop();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4c86930/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index 9047138..2ef5e39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -183,6 +183,7 @@ public class TestReservations {
   }
 
   @Test
+  @SuppressWarnings("unchecked")
   public void testReservation() throws Exception {
     // Test that we now unreserve and use a node that has space
 
@@ -231,9 +232,9 @@ public class TestReservations {
     when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
     when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
 
-    cs.getAllNodes().put(node_0.getNodeID(), node_0);
-    cs.getAllNodes().put(node_1.getNodeID(), node_1);
-    cs.getAllNodes().put(node_2.getNodeID(), node_2);
+    cs.getNodeTracker().addNode(node_0);
+    cs.getNodeTracker().addNode(node_1);
+    cs.getNodeTracker().addNode(node_2);
 
     final int numNodes = 3;
     Resource clusterResource = Resources.createResource(numNodes * (8 * GB));
@@ -346,6 +347,7 @@ public class TestReservations {
   // Test that hitting a reservation limit and needing to unreserve
   // does not affect assigning containers for other users
   @Test
+  @SuppressWarnings("unchecked")
   public void testReservationLimitOtherUsers() throws Exception {
     CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
     setup(csConf, true);
@@ -395,9 +397,9 @@ public class TestReservations {
     when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
     when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
 
-    cs.getAllNodes().put(node_0.getNodeID(), node_0);
-    cs.getAllNodes().put(node_1.getNodeID(), node_1);
-    cs.getAllNodes().put(node_2.getNodeID(), node_2);
+    cs.getNodeTracker().addNode(node_0);
+    cs.getNodeTracker().addNode(node_1);
+    cs.getNodeTracker().addNode(node_2);
 
     final int numNodes = 3;
     Resource clusterResource = Resources.createResource(numNodes * (8 * GB));
@@ -641,6 +643,7 @@ public class TestReservations {
   }
 
   @Test
+  @SuppressWarnings("unchecked")
   public void testAssignContainersNeedToUnreserve() throws Exception {
     // Test that we now unreserve and use a node that has space
     Logger rootLogger = LogManager.getRootLogger();
@@ -684,8 +687,8 @@ public class TestReservations {
     FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
         8 * GB);
 
-    cs.getAllNodes().put(node_0.getNodeID(), node_0);
-    cs.getAllNodes().put(node_1.getNodeID(), node_1);
+    cs.getNodeTracker().addNode(node_0);
+    cs.getNodeTracker().addNode(node_1);
 
     when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
     when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4c86930/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index a196576..c7154d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -75,12 +75,10 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.resourcemanager.Application;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.Task;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -108,7 +106,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -2751,8 +2748,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1");
     RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 2, "127.0.0.2");
-    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent2);
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
 
     ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1",
         "user1", 0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b4c86930/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
index 9bfc283..44877fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
@@ -306,12 +306,7 @@ public class TestFifoScheduler {
     nmTokenSecretManager.rollMasterKey();
     RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
     
-    FifoScheduler scheduler = new FifoScheduler(){
-      @SuppressWarnings("unused")
-      public Map<NodeId, FiCaSchedulerNode> getNodes(){
-        return nodes;
-      }
-    };
+    FifoScheduler scheduler = new FifoScheduler();
     RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
         null, containerTokenSecretManager, nmTokenSecretManager, null, scheduler);
     rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
@@ -331,11 +326,7 @@ public class TestFifoScheduler {
     NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node0);
     scheduler.handle(nodeEvent1);
     
-    Method method = scheduler.getClass().getDeclaredMethod("getNodes");
-    @SuppressWarnings("unchecked")
-    Map<NodeId, FiCaSchedulerNode> schedulerNodes = 
-        (Map<NodeId, FiCaSchedulerNode>) method.invoke(scheduler);
-    assertEquals(schedulerNodes.values().size(), 1);
+    assertEquals(scheduler.getNumClusterNodes(), 1);
     
     Resource newResource = Resources.createResource(1024, 4);
     
@@ -345,9 +336,9 @@ public class TestFifoScheduler {
     scheduler.handle(node0ResourceUpdate);
     
     // SchedulerNode's total resource and available resource are changed.
-    assertEquals(schedulerNodes.get(node0.getNodeID()).getTotalResource()
-        .getMemory(), 1024);
-    assertEquals(schedulerNodes.get(node0.getNodeID()).
+    assertEquals(1024, scheduler.getNodeTracker().getNode(node0.getNodeID())
+        .getTotalResource().getMemory());
+    assertEquals(1024, scheduler.getNodeTracker().getNode(node0.getNodeID()).
         getUnallocatedResource().getMemory(), 1024);
     QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false);
     Assert.assertEquals(0.0f, queueInfo.getCurrentCapacity(), 0.0f);


Mime
View raw message