hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject hadoop git commit: YARN-3102. Decommisioned Nodes not listed in Web UI. Contributed by Kuhu Shukla
Date Mon, 01 Feb 2016 23:19:17 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 69c61fae0 -> 6b6167d40


YARN-3102. Decommisioned Nodes not listed in Web UI. Contributed by Kuhu Shukla


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6b6167d4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6b6167d4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6b6167d4

Branch: refs/heads/branch-2.7
Commit: 6b6167d40171cd185f81e1783e6be4c9c1ad6b43
Parents: 69c61fa
Author: Jason Lowe <jlowe@apache.org>
Authored: Mon Feb 1 23:18:44 2016 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Mon Feb 1 23:18:44 2016 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../resourcemanager/NodesListManager.java       | 119 +++++++++++++-
 .../server/resourcemanager/ResourceManager.java |   5 +
 .../resourcemanager/rmnode/RMNodeImpl.java      |  48 ++++--
 .../yarn/server/resourcemanager/MockRM.java     |  16 ++
 .../server/resourcemanager/TestRMRestart.java   |  11 +-
 .../TestResourceTrackerService.java             | 154 +++++++++++++++----
 7 files changed, 297 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6167d4/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 59ff9b8..0f0cdad 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -81,6 +81,9 @@ Release 2.7.3 - UNRELEASED
     YARN-4428. Redirect RM page to AHS page when AHS turned on and RM page is
     not available (Chang Li via jlowe)
 
+    YARN-3102. Decommisioned Nodes not listed in Web UI (Kuhu Shukla via
+    jlowe)
+
 Release 2.7.2 - 2016-01-25
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6167d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index dd50401..b153b0f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.Node;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -78,7 +80,7 @@ public class NodesListManager extends AbstractService implements
           YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
       this.hostsReader =
           createHostsFileReader(this.includesFile, this.excludesFile);
-      setDecomissionedNMsMetrics();
+      setDecomissionedNMs();
       printConfiguredHosts();
     } catch (YarnException ex) {
       disableHostsFileReader(ex);
@@ -135,9 +137,24 @@ public class NodesListManager extends AbstractService implements
     }
   }
 
-  private void setDecomissionedNMsMetrics() {
+  private void setDecomissionedNMs() {
     Set<String> excludeList = hostsReader.getExcludedHosts();
-    ClusterMetrics.getMetrics().setDecommisionedNMs(excludeList.size());
+    for (final String host : excludeList) {
+      UnknownNodeId nodeId = new UnknownNodeId(host);
+      RMNodeImpl rmNode = new RMNodeImpl(nodeId,
+          rmContext, host, -1, -1, new UnknownNode(host), null, null);
+
+      RMNode prevRMNode =
+          rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
+      if (prevRMNode != null) {
+        this.rmContext.getDispatcher().getEventHandler().handle(
+            new RMNodeEvent(prevRMNode.getNodeID(),
+                RMNodeEventType.DECOMMISSION));
+      } else {
+        this.rmContext.getDispatcher().getEventHandler().handle(
+            new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
+      }
+    }
   }
 
   public boolean isValidNode(String hostName) {
@@ -210,7 +227,7 @@ public class NodesListManager extends AbstractService implements
           conf.get(YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
       this.hostsReader =
           createHostsFileReader(this.includesFile, this.excludesFile);
-      setDecomissionedNMsMetrics();
+      setDecomissionedNMs();
     } catch (IOException ioe2) {
       // Should *never* happen
       this.hostsReader = null;
@@ -240,4 +257,98 @@ public class NodesListManager extends AbstractService implements
                     .getConfigurationInputStream(this.conf, excludesFile));
     return hostsReader;
   }
+
+  /**
+   * A NodeId instance needed upon startup for populating inactive nodes Map.
+   * It only knows the hostname/ip and marks the port to -1 or invalid.
+   */
+  public static class UnknownNodeId extends NodeId {
+
+    private String host;
+
+    public UnknownNodeId(String host) {
+      this.host = host;
+    }
+
+    @Override
+    public String getHost() {
+      return this.host;
+    }
+
+    @Override
+    protected void setHost(String hst) {
+
+    }
+
+    @Override
+    public int getPort() {
+      return -1;
+    }
+
+    @Override
+    protected void setPort(int port) {
+
+    }
+
+    @Override
+    protected void build() {
+
+    }
+  }
+
+  /**
+   * A Node instance needed upon startup for populating RMNode Map.
+   * It only knows its hostname/ip.
+   */
+  private static class UnknownNode implements Node {
+
+    private String host;
+
+    public UnknownNode(String host) {
+      this.host = host;
+    }
+
+    @Override
+    public String getNetworkLocation() {
+      return null;
+    }
+
+    @Override
+    public void setNetworkLocation(String location) {
+
+    }
+
+    @Override
+    public String getName() {
+      return host;
+    }
+
+    @Override
+    public Node getParent() {
+      return null;
+    }
+
+    @Override
+    public void setParent(Node parent) {
+
+    }
+
+    @Override
+    public int getLevel() {
+      return 0;
+    }
+
+    @Override
+    public void setLevel(int i) {
+
+    }
+
+    public String getHost() {
+      return host;
+    }
+
+    public void setHost(String hst) {
+      this.host = hst;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6167d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 9e59081..711b69c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -179,6 +179,11 @@ public class ResourceManager extends CompositeService implements Recoverable
{
     clusterTimeStamp = timestamp;
   }
 
+  @VisibleForTesting
+  Dispatcher getRmDispatcher() {
+    return rmDispatcher;
+  }
+
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     this.conf = conf;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6167d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 4556dad..673264f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -141,6 +142,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent>
{
      .addTransition(NodeState.NEW, NodeState.NEW,
          RMNodeEventType.RESOURCE_UPDATE, 
          new UpdateNodeResourceWhenUnusableTransition())
+     .addTransition(NodeState.NEW, NodeState.DECOMMISSIONED,
+         RMNodeEventType.DECOMMISSION,
+         new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
 
      //Transitions from RUNNING state
      .addTransition(NodeState.RUNNING,
@@ -491,6 +495,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent>
{
     case UNHEALTHY:
       metrics.incrNumUnhealthyNMs();
       break;
+    case NEW:
+      break;
     default:
       LOG.debug("Unexpected final state");
     }
@@ -531,24 +537,21 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent>
{
       List<NMContainerStatus> containers = null;
 
       String host = rmNode.nodeId.getHost();
-      if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
-        // Old node rejoining
-        RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(host);
-        rmNode.context.getInactiveRMNodes().remove(host);
-        rmNode.updateMetricsForRejoinedNode(previouRMNode.getState());
+      RMNode previousRMNode = rmNode.context.getInactiveRMNodes().remove(host);
+      if (previousRMNode != null) {
+        if (previousRMNode.getNodeID().getPort() != -1) {
+          // Old node rejoining
+          rmNode.updateMetricsForRejoinedNode(previousRMNode.getState());
+        } else {
+          // An old excluded node rejoining
+          ClusterMetrics.getMetrics().decrDecommisionedNMs();
+          containers = updateNewNodeMetricsAndContainers(rmNode, startEvent);
+        }
       } else {
         // Increment activeNodes explicitly because this is a new node.
-        ClusterMetrics.getMetrics().incrNumActiveNodes();
-        containers = startEvent.getNMContainerStatuses();
-        if (containers != null && !containers.isEmpty()) {
-          for (NMContainerStatus container : containers) {
-            if (container.getContainerState() == ContainerState.RUNNING) {
-              rmNode.launchedContainers.add(container.getContainerId());
-            }
-          }
-        }
+        containers = updateNewNodeMetricsAndContainers(rmNode, startEvent);
       }
-      
+
       if (null != startEvent.getRunningApplications()) {
         for (ApplicationId appId : startEvent.getRunningApplications()) {
           handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
@@ -563,6 +566,21 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent>
{
     }
   }
 
+  private static List<NMContainerStatus> updateNewNodeMetricsAndContainers(
+      RMNodeImpl rmNode, RMNodeStartedEvent startEvent) {
+    List<NMContainerStatus> containers;
+    ClusterMetrics.getMetrics().incrNumActiveNodes();
+    containers = startEvent.getNMContainerStatuses();
+    if (containers != null && !containers.isEmpty()) {
+      for (NMContainerStatus container : containers) {
+        if (container.getContainerState() == ContainerState.RUNNING) {
+          rmNode.launchedContainers.add(container.getContainerId());
+        }
+      }
+    }
+    return containers;
+  }
+
   public static class ReconnectNodeTransition implements
       SingleArcTransition<RMNodeImpl, RMNodeEvent> {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6167d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index b1ce0f1..5bd952e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@@ -132,6 +134,20 @@ public class MockRM extends ResourceManager {
     }
   }
 
+  @Override
+  protected Dispatcher createDispatcher() {
+    return new DrainDispatcher();
+  }
+
+  public void drainEvents() {
+    Dispatcher rmDispatcher = getRmDispatcher();
+    if (rmDispatcher instanceof DrainDispatcher) {
+      ((DrainDispatcher) rmDispatcher).await();
+    } else {
+      throw new UnsupportedOperationException("Not a Drain Dispatcher!");
+    }
+  }
+
   public void waitForState(ApplicationId appId, RMAppState finalState)
       throws Exception {
     RMApp app = getRMContext().getRMApps().get(appId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6167d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index d057498..469135b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -1850,15 +1850,9 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
     conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
       hostFile.getAbsolutePath());
     writeToHostsFile("");
-    final DrainDispatcher dispatcher = new DrainDispatcher();
     MockRM rm1 = null, rm2 = null;
     try {
-      rm1 = new MockRM(conf) {
-        @Override
-        protected Dispatcher createDispatcher() {
-          return dispatcher;
-        }
-      };
+      rm1 = new MockRM(conf);
       rm1.start();
       MockNM nm1 = rm1.registerNode("localhost:1234", 8000);
       MockNM nm2 = rm1.registerNode("host2:1234", 8000);
@@ -1879,7 +1873,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
       Assert.assertTrue("The decommisioned metrics are not updated",
           NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
 
-      dispatcher.await();
+      rm1.drainEvents();
       Assert
           .assertEquals(2,
               ClusterMetrics.getMetrics().getNumDecommisionedNMs());
@@ -1892,6 +1886,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
       // restart RM.
       rm2 = new MockRM(conf);
       rm2.start();
+      rm2.drainEvents();
       Assert
           .assertEquals(2,
               ClusterMetrics.getMetrics().getNumDecommisionedNMs());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6167d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index a904dc0..83a3934 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -158,27 +158,21 @@ public class TestResourceTrackerService {
         .getAbsolutePath());
 
     writeToHostsFile("");
-    final DrainDispatcher dispatcher = new DrainDispatcher();
-    rm = new MockRM(conf) {
-      @Override
-      protected Dispatcher createDispatcher() {
-        return dispatcher;
-      }
-    };
+    rm = new MockRM(conf);
     rm.start();
 
     MockNM nm1 = rm.registerNode("host1:1234", 5120);
     MockNM nm2 = rm.registerNode("host2:5678", 10240);
     MockNM nm3 = rm.registerNode("localhost:4433", 1024);
 
-    dispatcher.await();
+    rm.drainEvents();
 
     int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
     NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
     Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
     nodeHeartbeat = nm2.nodeHeartbeat(true);
     Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
-    dispatcher.await();
+    rm.drainEvents();
 
     // To test that IPs also work
     String ip = NetUtils.normalizeHostName("localhost");
@@ -197,15 +191,15 @@ public class TestResourceTrackerService {
     nodeHeartbeat = nm3.nodeHeartbeat(true);
     Assert.assertTrue("The decommisioned metrics are not updated",
         NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
-    dispatcher.await();
+    rm.drainEvents();
 
     writeToHostsFile("");
     rm.getNodesListManager().refreshNodes(conf);
 
     nm3 = rm.registerNode("localhost:4433", 1024);
-    dispatcher.await();
+    rm.drainEvents();
     nodeHeartbeat = nm3.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
     // decommissined node is 1 since 1 node is rejoined after updating exclude
     // file
@@ -563,7 +557,6 @@ public class TestResourceTrackerService {
 
   @Test
   public void testReconnectNode() throws Exception {
-    final DrainDispatcher dispatcher = new DrainDispatcher();
     rm = new MockRM() {
       @Override
       protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
@@ -574,11 +567,6 @@ public class TestResourceTrackerService {
           }
         };
       }
-
-      @Override
-      protected Dispatcher createDispatcher() {
-        return dispatcher;
-      }
     };
     rm.start();
 
@@ -586,7 +574,7 @@ public class TestResourceTrackerService {
     MockNM nm2 = rm.registerNode("host2:5678", 5120);
     nm1.nodeHeartbeat(true);
     nm2.nodeHeartbeat(false);
-    dispatcher.await();
+    rm.drainEvents();
     checkUnealthyNMCount(rm, nm2, true, 1);
     final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs();
     QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
@@ -597,7 +585,7 @@ public class TestResourceTrackerService {
     nm1 = rm.registerNode("host1:1234", 5120);
     NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
     Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
     checkUnealthyNMCount(rm, nm2, true, 1);
 
@@ -605,23 +593,23 @@ public class TestResourceTrackerService {
     nm2 = rm.registerNode("host2:5678", 5120);
     response = nm2.nodeHeartbeat(false);
     Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
     checkUnealthyNMCount(rm, nm2, true, 1);
     
     // unhealthy node changed back to healthy
     nm2 = rm.registerNode("host2:5678", 5120);
-    dispatcher.await();
+    rm.drainEvents();
     response = nm2.nodeHeartbeat(true);
     response = nm2.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertEquals(5120 + 5120, metrics.getAvailableMB());
 
     // reconnect of node with changed capability
     nm1 = rm.registerNode("host2:5678", 10240);
-    dispatcher.await();
+    rm.drainEvents();
     response = nm1.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
     Assert.assertEquals(5120 + 10240, metrics.getAvailableMB());
 
@@ -629,9 +617,9 @@ public class TestResourceTrackerService {
     List<ApplicationId> runningApps = new ArrayList<ApplicationId>();
     runningApps.add(ApplicationId.newInstance(1, 0));
     nm1 = rm.registerNode("host2:5678", 15360, 2, runningApps);
-    dispatcher.await();
+    rm.drainEvents();
     response = nm1.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
     Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
     Assert.assertEquals(5120 + 15360, metrics.getAvailableMB());
     
@@ -639,10 +627,10 @@ public class TestResourceTrackerService {
     nm1 = new MockNM("host1:1234", 5120, rm.getResourceTrackerService());
     nm1.setHttpPort(3);
     nm1.registerNode();
-    dispatcher.await();
+    rm.drainEvents();
     response = nm1.nodeHeartbeat(true);
     response = nm1.nodeHeartbeat(true);
-    dispatcher.await();
+    rm.drainEvents();
     RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
     Assert.assertEquals(3, rmNode.getHttpPort());
     Assert.assertEquals(5120, rmNode.getTotalCapability().getMemory());
@@ -650,14 +638,116 @@ public class TestResourceTrackerService {
 
   }
 
+  @Test(timeout = 30000)
+  public void testInitDecommMetric() throws Exception {
+    testInitDecommMetricHelper(true);
+    testInitDecommMetricHelper(false);
+  }
+
+  public void testInitDecommMetricHelper(boolean hasIncludeList)
+      throws Exception {
+    Configuration conf = new Configuration();
+    rm = new MockRM(conf);
+    rm.start();
+    MockNM nm1 = rm.registerNode("host1:1234", 5120);
+    MockNM nm2 = rm.registerNode("host2:5678", 10240);
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+
+    File excludeHostFile =
+        new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
+    writeToHostsFile(excludeHostFile, "host1");
+    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
+        excludeHostFile.getAbsolutePath());
+
+    if (hasIncludeList) {
+      writeToHostsFile(hostFile, "host1", "host2");
+      conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+          hostFile.getAbsolutePath());
+    }
+    rm.getNodesListManager().refreshNodes(conf);
+    rm.drainEvents();
+    rm.stop();
+
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    nm1 = rm1.registerNode("host1:1234", 5120);
+    nm2 = rm1.registerNode("host2:5678", 10240);
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+    rm1.drainEvents();
+    Assert.assertEquals("Number of Decommissioned nodes should be 1",
+        1, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
+    Assert.assertEquals("The inactiveRMNodes should contain an entry for the" +
+        "decommissioned node",
+        1, rm1.getRMContext().getInactiveRMNodes().size());
+    excludeHostFile =
+        new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
+    writeToHostsFile(excludeHostFile, "");
+    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
+        excludeHostFile.getAbsolutePath());
+    rm1.getNodesListManager().refreshNodes(conf);
+    nm1 = rm1.registerNode("host1:1234", 5120);
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+    rm1.drainEvents();
+    Assert.assertEquals("The decommissioned nodes metric should have " +
+            "decremented to 0",
+        0, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
+    Assert.assertEquals("The active nodes metric should be 2",
+        2, ClusterMetrics.getMetrics().getNumActiveNMs());
+    Assert.assertEquals("The inactive RMNodes entry should have been removed",
+        0, rm1.getRMContext().getInactiveRMNodes().size());
+    rm1.drainEvents();
+    rm1.stop();
+  }
+
+  @Test(timeout = 30000)
+  public void testInitDecommMetricNoRegistration() throws Exception {
+    Configuration conf = new Configuration();
+    rm = new MockRM(conf);
+    rm.start();
+    MockNM nm1 = rm.registerNode("host1:1234", 5120);
+    MockNM nm2 = rm.registerNode("host2:5678", 10240);
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+    //host3 will not register or heartbeat
+    File excludeHostFile =
+        new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
+    writeToHostsFile(excludeHostFile, "host3", "host2");
+    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
+        excludeHostFile.getAbsolutePath());
+    writeToHostsFile(hostFile, "host1", "host2");
+    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+        hostFile.getAbsolutePath());
+    rm.getNodesListManager().refreshNodes(conf);
+    rm.drainEvents();
+    Assert.assertEquals("The decommissioned nodes metric should be 1 ",
+        1, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
+    rm.stop();
+
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    rm1.getNodesListManager().refreshNodes(conf);
+    rm1.drainEvents();
+    Assert.assertEquals("The decommissioned nodes metric should be 2 ",
+        2, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
+    rm1.stop();
+  }
+
   private void writeToHostsFile(String... hosts) throws IOException {
-    if (!hostFile.exists()) {
+   writeToHostsFile(hostFile, hosts);
+  }
+
+  private void writeToHostsFile(File file, String... hosts)
+      throws IOException {
+    if (!file.exists()) {
       TEMP_DIR.mkdirs();
-      hostFile.createNewFile();
+      file.createNewFile();
     }
     FileOutputStream fStream = null;
     try {
-      fStream = new FileOutputStream(hostFile);
+      fStream = new FileOutputStream(file);
       for (int i = 0; i < hosts.length; i++) {
         fStream.write(hosts[i].getBytes());
         fStream.write("\n".getBytes());


Mime
View raw message