hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From junping...@apache.org
Subject [2/2] hadoop git commit: YARN-4676. Automatic and Asynchronous Decommissioning Nodes Status Tracking. Contributed by Diniel Zhi. (cherry picked from commit d464483bf7f0b3e3be3ba32cd6c3eee546747ab5)
Date Thu, 18 Aug 2016 14:25:01 GMT
YARN-4676. Automatic and Asynchronous Decommissioning Nodes Status Tracking. Contributed by Diniel Zhi.
(cherry picked from commit d464483bf7f0b3e3be3ba32cd6c3eee546747ab5)

Conflicts:

	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0da69c32
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0da69c32
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0da69c32

Branch: refs/heads/trunk
Commit: 0da69c324dee9baab0f0b9700db1cc5b623f8421
Parents: 040c185
Author: Junping Du <junping_du@apache.org>
Authored: Thu Aug 18 07:23:29 2016 -0700
Committer: Junping Du <junping_du@apache.org>
Committed: Thu Aug 18 07:27:23 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/util/HostsFileReader.java | 111 ++++-
 .../apache/hadoop/util/TestHostsFileReader.java |  64 ++-
 hadoop-project/src/site/site.xml                |   1 +
 .../hadoop/yarn/sls/nodemanager/NodeInfo.java   |   5 +
 .../yarn/sls/scheduler/RMNodeWrapper.java       |   5 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  14 +
 .../protocolrecords/RefreshNodesRequest.java    |  26 +-
 ..._server_resourcemanager_service_protos.proto |   1 +
 .../hadoop/yarn/client/cli/RMAdminCLI.java      | 166 ++++---
 .../hadoop/yarn/client/cli/TestRMAdminCLI.java  |  24 +-
 .../impl/pb/RefreshNodesRequestPBImpl.java      |  17 +-
 .../src/main/resources/yarn-default.xml         |  18 +
 .../server/resourcemanager/AdminService.java    |   3 +-
 .../DecommissioningNodesWatcher.java            | 439 +++++++++++++++++++
 .../resourcemanager/NodesListManager.java       | 166 +++++--
 .../server/resourcemanager/RMServerUtils.java   |   2 +-
 .../resourcemanager/ResourceTrackerService.java |  19 +
 .../server/resourcemanager/rmnode/RMNode.java   |   6 +
 .../rmnode/RMNodeDecommissioningEvent.java      |  41 ++
 .../resourcemanager/rmnode/RMNodeImpl.java      |  54 ++-
 .../webapp/dao/ClusterMetricsInfo.java          |   2 +-
 .../yarn/server/resourcemanager/MockNodes.java  |   5 +
 .../yarn/server/resourcemanager/MockRM.java     |  11 +-
 .../TestDecommissioningNodesWatcher.java        | 131 ++++++
 .../resourcemanager/TestRMNodeTransitions.java  |  11 -
 .../TestResourceTrackerService.java             | 199 +++++++--
 .../resourcetracker/TestNMReconnect.java        |   2 -
 .../src/site/markdown/YarnCommands.md           |   2 +-
 28 files changed, 1326 insertions(+), 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
index 1cba426..2ef1ead 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
@@ -21,16 +21,27 @@ package org.apache.hadoop.util;
 import java.io.*;
 import java.nio.charset.StandardCharsets;
 import java.util.Set;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.Map;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
 
-import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
 
 // Keeps track of which datanodes/tasktrackers are allowed to connect to the 
 // namenode/jobtracker.
@@ -38,7 +49,9 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceStability.Unstable
 public class HostsFileReader {
   private Set<String> includes;
-  private Set<String> excludes;
+  // exclude host list with optional timeout.
+  // If the value is null, it indicates default timeout.
+  private Map<String, Integer> excludes;
   private String includesFile;
   private String excludesFile;
   private WriteLock writeLock;
@@ -49,7 +62,7 @@ public class HostsFileReader {
   public HostsFileReader(String inFile, 
                          String exFile) throws IOException {
     includes = new HashSet<String>();
-    excludes = new HashSet<String>();
+    excludes = new HashMap<String, Integer>();
     includesFile = inFile;
     excludesFile = exFile;
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
@@ -62,7 +75,7 @@ public class HostsFileReader {
   public HostsFileReader(String includesFile, InputStream inFileInputStream,
       String excludesFile, InputStream exFileInputStream) throws IOException {
     includes = new HashSet<String>();
-    excludes = new HashSet<String>();
+    excludes = new HashMap<String, Integer>();
     this.includesFile = includesFile;
     this.excludesFile = excludesFile;
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
@@ -121,6 +134,73 @@ public class HostsFileReader {
     }
   }
 
+  public static void readFileToMap(String type,
+      String filename, Map<String, Integer> map) throws IOException {
+    File file = new File(filename);
+    FileInputStream fis = new FileInputStream(file);
+    readFileToMapWithFileInputStream(type, filename, fis, map);
+  }
+
+  public static void readFileToMapWithFileInputStream(String type,
+      String filename, InputStream inputStream, Map<String, Integer> map)
+          throws IOException {
+    // The input file could be either simple text or XML.
+    boolean xmlInput = filename.toLowerCase().endsWith(".xml");
+    if (xmlInput) {
+      readXmlFileToMapWithFileInputStream(type, filename, inputStream, map);
+    } else {
+      HashSet<String> nodes = new HashSet<String>();
+      readFileToSetWithFileInputStream(type, filename, inputStream, nodes);
+      for (String node : nodes) {
+        map.put(node, null);
+      }
+    }
+  }
+
+  public static void readXmlFileToMapWithFileInputStream(String type,
+      String filename, InputStream fileInputStream, Map<String, Integer> map)
+          throws IOException {
+    Document dom;
+    DocumentBuilderFactory builder = DocumentBuilderFactory.newInstance();
+    try {
+      DocumentBuilder db = builder.newDocumentBuilder();
+      dom = db.parse(fileInputStream);
+      // Examples:
+      // <host><name>host1</name></host>
+      // <host><name>host2</name><timeout>123</timeout></host>
+      // <host><name>host3</name><timeout>-1</timeout></host>
+      // <host><name>host4, host5,host6</name><timeout>1800</timeout></host>
+      Element doc = dom.getDocumentElement();
+      NodeList nodes = doc.getElementsByTagName("host");
+      for (int i = 0; i < nodes.getLength(); i++) {
+        Node node = nodes.item(i);
+        if (node.getNodeType() == Node.ELEMENT_NODE) {
+          Element e= (Element) node;
+          // Support both single host and comma-separated list of hosts.
+          String v = readFirstTagValue(e, "name");
+          String[] hosts = StringUtils.getTrimmedStrings(v);
+          String str = readFirstTagValue(e, "timeout");
+          Integer timeout = (str == null)? null : Integer.parseInt(str);
+          for (String host : hosts) {
+            map.put(host, timeout);
+            LOG.info("Adding a node \"" + host + "\" to the list of "
+                + type + " hosts from " + filename);
+          }
+        }
+      }
+    } catch (IOException|SAXException|ParserConfigurationException e) {
+      LOG.fatal("error parsing " + filename, e);
+      throw new RuntimeException(e);
+    } finally {
+      fileInputStream.close();
+    }
+  }
+
+  static String readFirstTagValue(Element e, String tag) {
+    NodeList nodes = e.getElementsByTagName(tag);
+    return (nodes.getLength() == 0)? null : nodes.item(0).getTextContent();
+  }
+
   public void refresh(String includeFiles, String excludeFiles)
       throws IOException {
     LOG.info("Refreshing hosts (include/exclude) list");
@@ -129,7 +209,7 @@ public class HostsFileReader {
       // update instance variables
       updateFileNames(includeFiles, excludeFiles);
       Set<String> newIncludes = new HashSet<String>();
-      Set<String> newExcludes = new HashSet<String>();
+      Map<String, Integer> newExcludes = new HashMap<String, Integer>();
       boolean switchIncludes = false;
       boolean switchExcludes = false;
       if (includeFiles != null && !includeFiles.isEmpty()) {
@@ -137,7 +217,7 @@ public class HostsFileReader {
         switchIncludes = true;
       }
       if (excludeFiles != null && !excludeFiles.isEmpty()) {
-        readFileToSet("excluded", excludeFiles, newExcludes);
+        readFileToMap("excluded", excludeFiles, newExcludes);
         switchExcludes = true;
       }
 
@@ -161,7 +241,7 @@ public class HostsFileReader {
     this.writeLock.lock();
     try {
       Set<String> newIncludes = new HashSet<String>();
-      Set<String> newExcludes = new HashSet<String>();
+      Map<String, Integer> newExcludes = new HashMap<String, Integer>();
       boolean switchIncludes = false;
       boolean switchExcludes = false;
       if (inFileInputStream != null) {
@@ -170,7 +250,7 @@ public class HostsFileReader {
         switchIncludes = true;
       }
       if (exFileInputStream != null) {
-        readFileToSetWithFileInputStream("excluded", excludesFile,
+        readFileToMapWithFileInputStream("excluded", excludesFile,
             exFileInputStream, newExcludes);
         switchExcludes = true;
       }
@@ -199,7 +279,7 @@ public class HostsFileReader {
   public Set<String> getExcludedHosts() {
     this.readLock.lock();
     try {
-      return excludes;
+      return excludes.keySet();
     } finally {
       this.readLock.unlock();
     }
@@ -209,7 +289,18 @@ public class HostsFileReader {
     this.readLock.lock();
     try {
       includes.addAll(this.includes);
-      excludes.addAll(this.excludes);
+      excludes.addAll(this.excludes.keySet());
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  public void getHostDetails(Set<String> includeHosts,
+                             Map<String, Integer> excludeHosts) {
+    this.readLock.lock();
+    try {
+      includeHosts.addAll(this.includes);
+      excludeHosts.putAll(this.excludes);
     } finally {
       this.readLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java
index 8015f7a..5766591 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java
@@ -20,16 +20,19 @@ package org.apache.hadoop.util;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileWriter;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.Map;
 
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.*;
+
 import static org.junit.Assert.*;
 
 /*
  * Test for HostsFileReader.java
- * 
+ *
  */
 public class TestHostsFileReader {
 
@@ -39,6 +42,7 @@ public class TestHostsFileReader {
   File INCLUDES_FILE = new File(HOSTS_TEST_DIR, "dfs.include");
   String excludesFile = HOSTS_TEST_DIR + "/dfs.exclude";
   String includesFile = HOSTS_TEST_DIR + "/dfs.include";
+  private String excludesXmlFile = HOSTS_TEST_DIR + "/dfs.exclude.xml";
 
   @Before
   public void setUp() throws Exception {
@@ -288,4 +292,62 @@ public class TestHostsFileReader {
     assertFalse(hfp.getExcludedHosts().contains("somehost5"));
 
   }
+
+  /*
+   * Test if timeout values are provided in HostFile
+   */
+  @Test
+  public void testHostFileReaderWithTimeout() throws Exception {
+    FileWriter efw = new FileWriter(excludesXmlFile);
+    FileWriter ifw = new FileWriter(includesFile);
+
+    efw.write("<?xml version=\"1.0\"?>\n");
+    efw.write("<!-- yarn.nodes.exclude -->\n");
+    efw.write("<hosts>\n");
+    efw.write("<host><name>host1</name></host>\n");
+    efw.write("<host><name>host2</name><timeout>123</timeout></host>\n");
+    efw.write("<host><name>host3</name><timeout>-1</timeout></host>\n");
+    efw.write("<host><name>10000</name></host>\n");
+    efw.write("<host><name>10001</name><timeout>123</timeout></host>\n");
+    efw.write("<host><name>10002</name><timeout>-1</timeout></host>\n");
+    efw.write("<host><name>host4,host5, host6</name>" +
+              "<timeout>1800</timeout></host>\n");
+    efw.write("</hosts>\n");
+    efw.close();
+
+    ifw.write("#Hosts-in-DFS\n");
+    ifw.write("     \n");
+    ifw.write("   somehost \t  somehost2 \n somehost4");
+    ifw.write("   somehost3 \t # somehost5");
+    ifw.close();
+
+    HostsFileReader hfp = new HostsFileReader(includesFile, excludesXmlFile);
+
+    int includesLen = hfp.getHosts().size();
+    int excludesLen = hfp.getExcludedHosts().size();
+    assertEquals(4, includesLen);
+    assertEquals(9, excludesLen);
+
+    Set<String> includes = new HashSet<String>();
+    Map<String, Integer> excludes = new HashMap<String, Integer>();
+    hfp.getHostDetails(includes, excludes);
+    assertTrue(excludes.containsKey("host1"));
+    assertTrue(excludes.containsKey("host2"));
+    assertTrue(excludes.containsKey("host3"));
+    assertTrue(excludes.containsKey("10000"));
+    assertTrue(excludes.containsKey("10001"));
+    assertTrue(excludes.containsKey("10002"));
+    assertTrue(excludes.containsKey("host4"));
+    assertTrue(excludes.containsKey("host5"));
+    assertTrue(excludes.containsKey("host6"));
+    assertTrue(excludes.get("host1") == null);
+    assertTrue(excludes.get("host2") == 123);
+    assertTrue(excludes.get("host3") == -1);
+    assertTrue(excludes.get("10000") == null);
+    assertTrue(excludes.get("10001") == 123);
+    assertTrue(excludes.get("10002") == -1);
+    assertTrue(excludes.get("host4") == 1800);
+    assertTrue(excludes.get("host5") == 1800);
+    assertTrue(excludes.get("host6") == 1800);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-project/src/site/site.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/src/site/site.xml b/hadoop-project/src/site/site.xml
index 9fa1469..0d87973 100644
--- a/hadoop-project/src/site/site.xml
+++ b/hadoop-project/src/site/site.xml
@@ -136,6 +136,7 @@
       <item name="Secure Containers" href="hadoop-yarn/hadoop-yarn-site/SecureContainer.html"/>
       <item name="Registry" href="hadoop-yarn/hadoop-yarn-site/registry/index.html"/>
       <item name="Reservation System" href="hadoop-yarn/hadoop-yarn-site/ReservationSystem.html"/>
+      <item name="Graceful Decommission" href="hadoop-yarn/hadoop-yarn-site/GracefulDecommission.html"/>
     </menu>
 
     <menu name="YARN REST APIs" inherit="top">

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index bd737bd..c598aa0 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -213,6 +213,11 @@ public class NodeInfo {
     @Override
     public void setUntrackedTimeStamp(long timeStamp) {
     }
+
+    @Override
+    public Integer getDecommissioningTimeout() {
+      return null;
+    }
   }
 
   public static RMNode newNodeInfo(String rackName, String hostName,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 5048978..6d0ffbd 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -202,4 +202,9 @@ public class RMNodeWrapper implements RMNode {
   @Override
   public void setUntrackedTimeStamp(long timeStamp) {
   }
+
+  @Override
+  public Integer getDecommissioningTimeout() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 8899ccd..770d139 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -795,6 +795,20 @@ public class YarnConfiguration extends Configuration {
    */
   public static final String RM_PROXY_USER_PREFIX = RM_PREFIX + "proxyuser.";
 
+  /**
+   * Timeout in seconds for YARN node graceful decommission.
+   * This is the maximal time to wait for running containers and applications
+   * to complete before transition a DECOMMISSIONING node into DECOMMISSIONED.
+   */
+  public static final String RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT =
+      RM_PREFIX + "nodemanager-graceful-decommission-timeout-secs";
+  public static final int DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT = 3600;
+
+  public static final String RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL =
+      RM_PREFIX + "decommissioning-nodes-watcher.poll-interval-secs";
+  public static final int
+      DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL = 20;
+
   ////////////////////////////////
   // Node Manager Configs
   ////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java
index 0333c3b..732d98e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java
@@ -43,6 +43,16 @@ public abstract class RefreshNodesRequest {
     return request;
   }
 
+  @Private
+  @Unstable
+  public static RefreshNodesRequest newInstance(
+      DecommissionType decommissionType, Integer timeout) {
+    RefreshNodesRequest request = Records.newRecord(RefreshNodesRequest.class);
+    request.setDecommissionType(decommissionType);
+    request.setDecommissionTimeout(timeout);
+    return request;
+  }
+
   /**
    * Set the DecommissionType
    * 
@@ -56,4 +66,18 @@ public abstract class RefreshNodesRequest {
    * @return decommissionType
    */
   public abstract DecommissionType getDecommissionType();
-}
+
+  /**
+   * Set the DecommissionTimeout.
+   *
+   * @param timeout graceful decommission timeout in seconds
+   */
+  public abstract void setDecommissionTimeout(Integer timeout);
+
+  /**
+   * Get the DecommissionTimeout.
+   *
+   * @return decommissionTimeout
+   */
+  public abstract Integer getDecommissionTimeout();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
index eaf658f..b9f30db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
@@ -37,6 +37,7 @@ message RefreshQueuesResponseProto {
 
 message RefreshNodesRequestProto {
   optional DecommissionTypeProto decommissionType = 1 [default = NORMAL];
+  optional int32 decommissionTimeout = 2;
 }
 message RefreshNodesResponseProto {
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
index 4aa3a14..fcb9b74 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
@@ -99,10 +99,10 @@ public class RMAdminCLI extends HAAdmin {
                   "properties. \n\t\tResourceManager will reload the " +
                   "mapred-queues configuration file."))
           .put("-refreshNodes",
-              new UsageInfo("[-g [timeout in seconds] -client|server]",
+              new UsageInfo("[-g|graceful [timeout in seconds] -client|server]",
               "Refresh the hosts information at the ResourceManager. Here "
-              + "[-g [timeout in seconds] -client|server] is optional, if we "
-              + "specify the timeout then ResourceManager will wait for "
+              + "[-g|graceful [timeout in seconds] -client|server] is optional,"
+              + " if we specify the timeout then ResourceManager will wait for "
               + "timeout before marking the NodeManager as decommissioned."
               + " The -client|server indicates if the timeout tracking should"
               + " be handled by the client or the ResourceManager. The client"
@@ -234,21 +234,23 @@ public class RMAdminCLI extends HAAdmin {
     summary.append("rmadmin is the command to execute YARN administrative " +
         "commands.\n");
     summary.append("The full syntax is: \n\n" +
-    "yarn rmadmin" +
-      " [-refreshQueues]" +
-      " [-refreshNodes [-g [timeout in seconds] -client|server]]" +
-      " [-refreshNodesResources]" +
-      " [-refreshSuperUserGroupsConfiguration]" +
-      " [-refreshUserToGroupsMappings]" +
-      " [-refreshAdminAcls]" +
-      " [-refreshServiceAcl]" +
-      " [-getGroup [username]]" +
-      " [-addToClusterNodeLabels <\"label1(exclusive=true),"
-                  + "label2(exclusive=false),label3\">]" +
-      " [-removeFromClusterNodeLabels <label1,label2,label3>]" +
-      " [-replaceLabelsOnNode <\"node1[:port]=label1,label2 node2[:port]=label1\">]" +
-      " [-directlyAccessNodeLabelStore]" +
-      " [-updateNodeResource [NodeID] [MemSize] [vCores] ([OvercommitTimeout])");
+        "yarn rmadmin" +
+        " [-refreshQueues]" +
+        " [-refreshNodes [-g|graceful [timeout in seconds] -client|server]]" +
+        " [-refreshNodesResources]" +
+        " [-refreshSuperUserGroupsConfiguration]" +
+        " [-refreshUserToGroupsMappings]" +
+        " [-refreshAdminAcls]" +
+        " [-refreshServiceAcl]" +
+        " [-getGroup [username]]" +
+        " [-addToClusterNodeLabels <\"label1(exclusive=true),"
+            + "label2(exclusive=false),label3\">]" +
+        " [-removeFromClusterNodeLabels <label1,label2,label3>]" +
+        " [-replaceLabelsOnNode <\"node1[:port]=label1,label2" +
+        " node2[:port]=label1\">]" +
+        " [-directlyAccessNodeLabelStore]" +
+        " [-updateNodeResource [NodeID] [MemSize] [vCores]" +
+        " ([OvercommitTimeout])");
     if (isHAEnabled) {
       appendHAUsage(summary);
     }
@@ -309,33 +311,40 @@ public class RMAdminCLI extends HAAdmin {
     return 0;
   }
 
-  private int refreshNodes() throws IOException, YarnException {
+  private int refreshNodes(boolean graceful) throws IOException, YarnException {
     // Refresh the nodes
     ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
-    RefreshNodesRequest request = RefreshNodesRequest
-        .newInstance(DecommissionType.NORMAL);
+    RefreshNodesRequest request = RefreshNodesRequest.newInstance(
+        graceful? DecommissionType.GRACEFUL : DecommissionType.NORMAL);
     adminProtocol.refreshNodes(request);
     return 0;
   }
 
-  private int refreshNodes(long timeout, String trackingMode)
+  private int refreshNodes(int timeout, String trackingMode)
       throws IOException, YarnException {
-    if (!"client".equals(trackingMode)) {
-      throw new UnsupportedOperationException(
-          "Only client tracking mode is currently supported.");
-    }
+    boolean serverTracking = !"client".equals(trackingMode);
     // Graceful decommissioning with timeout
     ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
     RefreshNodesRequest gracefulRequest = RefreshNodesRequest
-        .newInstance(DecommissionType.GRACEFUL);
+        .newInstance(DecommissionType.GRACEFUL, timeout);
     adminProtocol.refreshNodes(gracefulRequest);
+    if (serverTracking) {
+      return 0;
+    }
     CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest = recordFactory
         .newRecordInstance(CheckForDecommissioningNodesRequest.class);
     long waitingTime;
     boolean nodesDecommissioning = true;
+    // As RM enforces timeout automatically, client usually don't need
+    // to forcefully decommission nodes upon timeout.
+    // Here we let the client waits a small additional seconds so to avoid
+    // unnecessary double decommission.
+    final int gracePeriod = 5;
     // timeout=-1 means wait for all the nodes to be gracefully
     // decommissioned
-    for (waitingTime = 0; waitingTime < timeout || timeout == -1; waitingTime++) {
+    for (waitingTime = 0;
+        timeout == -1 || (timeout >= 0 && waitingTime < timeout + gracePeriod);
+        waitingTime++) {
       // wait for one second to check nodes decommissioning status
       try {
         Thread.sleep(1000);
@@ -380,6 +389,10 @@ public class RMAdminCLI extends HAAdmin {
     return 0;
   }
 
+  private int refreshNodes() throws IOException, YarnException {
+    return refreshNodes(false);
+  }
+
   private int refreshUserToGroupsMappings() throws IOException,
       YarnException {
     // Refresh the user-to-groups mappings
@@ -725,33 +738,12 @@ public class RMAdminCLI extends HAAdmin {
         return exitCode;
       }
     }
-    
+
     try {
       if ("-refreshQueues".equals(cmd)) {
         exitCode = refreshQueues();
       } else if ("-refreshNodes".equals(cmd)) {
-        if (args.length == 1) {
-          exitCode = refreshNodes();
-        } else if (args.length == 3 || args.length == 4) {
-          // if the graceful timeout specified
-          if ("-g".equals(args[1])) {
-            long timeout = -1;
-            String trackingMode;
-            if (args.length == 4) {
-              timeout = validateTimeout(args[2]);
-              trackingMode = validateTrackingMode(args[3]);
-            } else {
-              trackingMode = validateTrackingMode(args[2]);
-            }
-            exitCode = refreshNodes(timeout, trackingMode);
-          } else {
-            printUsage(cmd, isHAEnabled);
-            return -1;
-          }
-        } else {
-          printUsage(cmd, isHAEnabled);
-          return -1;
-        }
+        exitCode = handleRefreshNodes(args, cmd, isHAEnabled);
       } else if ("-refreshNodesResources".equals(cmd)) {
         exitCode = refreshNodesResources();
       } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
@@ -768,22 +760,7 @@ public class RMAdminCLI extends HAAdmin {
         String[] usernames = Arrays.copyOfRange(args, i, args.length);
         exitCode = getGroups(usernames);
       } else if ("-updateNodeResource".equals(cmd)) {
-        if (args.length < 4 || args.length > 5) {
-          System.err.println("Number of parameters specified for " +
-              "updateNodeResource is wrong.");
-          printUsage(cmd, isHAEnabled);
-          exitCode = -1;
-        } else {
-          String nodeID = args[i++];
-          String memSize = args[i++];
-          String cores = args[i++];
-          int overCommitTimeout = ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT;
-          if (i == args.length - 1) {
-            overCommitTimeout = Integer.parseInt(args[i]);
-          }
-          exitCode = updateNodeResource(nodeID, Integer.parseInt(memSize),
-              Integer.parseInt(cores), overCommitTimeout);
-        }
+        exitCode = handleUpdateNodeResource(args, cmd, isHAEnabled);
       } else if ("-addToClusterNodeLabels".equals(cmd)) {
         if (i >= args.length) {
           System.err.println(NO_LABEL_ERR_MSG);
@@ -843,10 +820,59 @@ public class RMAdminCLI extends HAAdmin {
     return exitCode;
   }
 
-  private long validateTimeout(String strTimeout) {
-    long timeout;
+  // A helper method to reduce the number of lines of run()
+  private int handleRefreshNodes(String[] args, String cmd, boolean isHAEnabled)
+      throws IOException, YarnException {
+    if (args.length == 1) {
+      return refreshNodes();
+    } else if (args.length == 3 || args.length == 4) {
+      // if the graceful timeout specified
+      if ("-g".equals(args[1]) || "-graceful".equals(args[1])) {
+        int timeout = -1;
+        String trackingMode;
+        if (args.length == 4) {
+          timeout = validateTimeout(args[2]);
+          trackingMode = validateTrackingMode(args[3]);
+        } else {
+          trackingMode = validateTrackingMode(args[2]);
+        }
+        return refreshNodes(timeout, trackingMode);
+      } else {
+        printUsage(cmd, isHAEnabled);
+        return -1;
+      }
+    } else {
+      printUsage(cmd, isHAEnabled);
+      return -1;
+    }
+  }
+
+  private int handleUpdateNodeResource(
+      String[] args, String cmd, boolean isHAEnabled)
+          throws NumberFormatException, IOException, YarnException {
+    int i = 1;
+    if (args.length < 4 || args.length > 5) {
+      System.err.println("Number of parameters specified for " +
+          "updateNodeResource is wrong.");
+      printUsage(cmd, isHAEnabled);
+      return -1;
+    } else {
+      String nodeID = args[i++];
+      String memSize = args[i++];
+      String cores = args[i++];
+      int overCommitTimeout = ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT;
+      if (i == args.length - 1) {
+        overCommitTimeout = Integer.parseInt(args[i]);
+      }
+      return updateNodeResource(nodeID, Integer.parseInt(memSize),
+          Integer.parseInt(cores), overCommitTimeout);
+    }
+  }
+
+  private int validateTimeout(String strTimeout) {
+    int timeout;
     try {
-      timeout = Long.parseLong(strTimeout);
+      timeout = Integer.parseInt(strTimeout);
     } catch (NumberFormatException ex) {
       throw new IllegalArgumentException(INVALID_TIMEOUT_ERR_MSG + strTimeout);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
index d3161ba..60c7eac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
@@ -267,7 +267,7 @@ public class TestRMAdminCLI {
         CheckForDecommissioningNodesRequest.class))).thenReturn(response);
     assertEquals(0, rmAdminCLI.run(args));
     verify(admin).refreshNodes(
-        RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
+        RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL, 1));
     verify(admin, never()).refreshNodes(
         RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
   }
@@ -327,7 +327,7 @@ public class TestRMAdminCLI {
           });
     assertEquals(0, rmAdminCLI.run(args));
     verify(admin, atLeastOnce()).refreshNodes(
-        RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
+        RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL, -1));
     verify(admin, never()).refreshNodes(
         RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
   }
@@ -346,10 +346,6 @@ public class TestRMAdminCLI {
     String[] negativeTimeoutArgs = {"-refreshNodes", "-g", "-1000", "-client"};
     assertEquals(-1, rmAdminCLI.run(negativeTimeoutArgs));
 
-    // server tracking mode
-    String[] serveTrackingrArgs = {"-refreshNodes", "-g", "1", "-server"};
-    assertEquals(-1, rmAdminCLI.run(serveTrackingrArgs));
-
     // invalid tracking mode
     String[] invalidTrackingArgs = {"-refreshNodes", "-g", "1", "-foo"};
     assertEquals(-1, rmAdminCLI.run(invalidTrackingArgs));
@@ -465,8 +461,9 @@ public class TestRMAdminCLI {
       assertTrue(dataOut
           .toString()
           .contains(
-              "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in " +
-              "seconds] -client|server]] [-refreshNodesResources] [-refresh" +
+              "yarn rmadmin [-refreshQueues] [-refreshNodes "+
+              "[-g|graceful [timeout in seconds] -client|server]] " +
+              "[-refreshNodesResources] [-refresh" +
               "SuperUserGroupsConfiguration] [-refreshUserToGroupsMappings] " +
               "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup " +
               "[username]] [-addToClusterNodeLabels " +
@@ -485,7 +482,8 @@ public class TestRMAdminCLI {
       assertTrue(dataOut
           .toString()
           .contains(
-              "-refreshNodes [-g [timeout in seconds] -client|server]: " +
+              "-refreshNodes [-g|graceful [timeout in seconds]" +
+              " -client|server]: " +
               "Refresh the hosts information at the ResourceManager."));
       assertTrue(dataOut
           .toString()
@@ -518,8 +516,8 @@ public class TestRMAdminCLI {
       testError(new String[] { "-help", "-refreshQueues" },
           "Usage: yarn rmadmin [-refreshQueues]", dataErr, 0);
       testError(new String[] { "-help", "-refreshNodes" },
-          "Usage: yarn rmadmin [-refreshNodes [-g [timeout in seconds] " +
-          "-client|server]]", dataErr, 0);
+          "Usage: yarn rmadmin [-refreshNodes [-g|graceful " +
+          "[timeout in seconds] -client|server]]", dataErr, 0);
       testError(new String[] { "-help", "-refreshNodesResources" },
           "Usage: yarn rmadmin [-refreshNodesResources]", dataErr, 0);
       testError(new String[] { "-help", "-refreshUserToGroupsMappings" },
@@ -558,8 +556,8 @@ public class TestRMAdminCLI {
       assertEquals(0, rmAdminCLIWithHAEnabled.run(args));
       oldOutPrintStream.println(dataOut);
       String expectedHelpMsg = 
-          "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in "
-              + "seconds] -client|server]] "
+          "yarn rmadmin [-refreshQueues] [-refreshNodes [-g|graceful "
+              + "[timeout in seconds] -client|server]] "
               + "[-refreshNodesResources] [-refreshSuperUserGroupsConfiguration] "
               + "[-refreshUserToGroupsMappings] "
               + "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java
index 05f3230..c03a569 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java
@@ -31,7 +31,6 @@ import com.google.protobuf.TextFormat;
 @Private
 @Unstable
 public class RefreshNodesRequestPBImpl extends RefreshNodesRequest {
-
   RefreshNodesRequestProto proto = RefreshNodesRequestProto.getDefaultInstance();
   RefreshNodesRequestProto.Builder builder = null;
   boolean viaProto = false;
@@ -108,6 +107,22 @@ public class RefreshNodesRequestPBImpl extends RefreshNodesRequest {
     return convertFromProtoFormat(p.getDecommissionType());
   }
 
+  @Override
+  public synchronized void setDecommissionTimeout(Integer timeout) {
+    maybeInitBuilder();
+    if (timeout != null) {
+      builder.setDecommissionTimeout(timeout);
+    } else {
+      builder.clearDecommissionTimeout();
+    }
+  }
+
+  @Override
+  public synchronized Integer getDecommissionTimeout() {
+    RefreshNodesRequestProtoOrBuilder p = viaProto ? proto : builder;
+    return p.hasDecommissionTimeout()? p.getDecommissionTimeout() : null;
+  }
+
   private DecommissionType convertFromProtoFormat(DecommissionTypeProto p) {
     return DecommissionType.valueOf(p.name());
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index e77e990..e956507 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2586,6 +2586,24 @@
   </property>
 
   <property>
+    <description>
+    Timeout in seconds for YARN node graceful decommission.
+    This is the maximal time to wait for running containers and applications to complete
+    before transition a DECOMMISSIONING node into DECOMMISSIONED.
+    </description>
+    <name>yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs</name>
+    <value>3600</value>
+  </property>
+
+  <property>
+    <description>
+    Timeout in seconds of DecommissioningNodesWatcher internal polling.
+    </description>
+    <name>yarn.resourcemanager.decommissioning-nodes-watcher.poll-interval-secs</name>
+    <value>20</value>
+  </property>
+
+  <property>
     <description>The Node Label script to run. Script output Line starting with
      "NODE_PARTITION:" will be considered as Node Label Partition. In case of
      multiple lines have this pattern, then last one will be considered

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index 2ec03aa..db55264 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -447,7 +447,8 @@ public class AdminService extends CompositeService implements
         rmContext.getNodesListManager().refreshNodes(conf);
         break;
       case GRACEFUL:
-        rmContext.getNodesListManager().refreshNodesGracefully(conf);
+        rmContext.getNodesListManager().refreshNodesGracefully(
+            conf, request.getDecommissionTimeout());
         break;
       case FORCEFUL:
         rmContext.getNodesListManager().refreshNodesForcefully();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
new file mode 100644
index 0000000..376b503
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.util.MonotonicClock;
+
+/**
+ * DecommissioningNodesWatcher is used by ResourceTrackerService to track
+ * DECOMMISSIONING nodes to decide when, after all running containers on
+ * the node have completed, will be transitioned into DECOMMISSIONED state
+ * (NodeManager will be told to shutdown).
+ * Under MR application, a node, after completes all its containers,
+ * may still serve it map output data during the duration of the application
+ * for reducers. A fully graceful mechanism would keep such DECOMMISSIONING
+ * nodes until all involved applications complete. It could be however
+ * undesirable under long-running applications scenario where a bunch
+ * of "idle" nodes would stay around for long period of time.
+ *
+ * DecommissioningNodesWatcher balance such concern with a timeout policy ---
+ * a DECOMMISSIONING node will be DECOMMISSIONED no later than
+ * DECOMMISSIONING_TIMEOUT regardless of running containers or applications.
+ *
+ * To be efficient, DecommissioningNodesWatcher skip tracking application
+ * containers on a particular node before the node is in DECOMMISSIONING state.
+ * It only tracks containers once the node is in DECOMMISSIONING state.
+ * DecommissioningNodesWatcher basically is no cost when no node is
+ * DECOMMISSIONING. This sacrifices the possibility that the node once
+ * host containers of an application that is still running
+ * (the affected map tasks will be rescheduled).
+ */
+public class DecommissioningNodesWatcher {
+  private static final Log LOG =
+      LogFactory.getLog(DecommissioningNodesWatcher.class);
+
+  private final RMContext rmContext;
+
+  // Default timeout value in mills.
+  // Negative value indicates no timeout. 0 means immediate.
+  private long defaultTimeoutMs =
+      1000L * YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT;
+
+  // Once a RMNode is observed in DECOMMISSIONING state,
+  // All its ContainerStatus update are tracked inside DecomNodeContext.
+  class DecommissioningNodeContext {
+    private final NodeId nodeId;
+
+    // Last known NodeState.
+    private NodeState nodeState;
+
+    // The moment node is observed in DECOMMISSIONING state.
+    private final long decommissioningStartTime;
+
+    private long lastContainerFinishTime;
+
+    // number of running containers at the moment.
+    private int numActiveContainers;
+
+    // All applications run on the node at or after decommissioningStartTime.
+    private Set<ApplicationId> appIds;
+
+    // First moment the node is observed in DECOMMISSIONED state.
+    private long decommissionedTime;
+
+    // Timeout in millis for this decommissioning node.
+    // This value could be dynamically updated with new value from RMNode.
+    private long timeoutMs;
+
+    private long lastUpdateTime;
+
+    public DecommissioningNodeContext(NodeId nodeId) {
+      this.nodeId = nodeId;
+      this.appIds = new HashSet<ApplicationId>();
+      this.decommissioningStartTime = mclock.getTime();
+      this.timeoutMs = defaultTimeoutMs;
+    }
+
+    void updateTimeout(Integer timeoutSec) {
+      this.timeoutMs = (timeoutSec == null)?
+          defaultTimeoutMs : (1000L * timeoutSec);
+    }
+  }
+
+  // All DECOMMISSIONING nodes to track.
+  private HashMap<NodeId, DecommissioningNodeContext> decomNodes =
+      new HashMap<NodeId, DecommissioningNodeContext>();
+
+  private Timer pollTimer;
+  private MonotonicClock mclock;
+
+  public DecommissioningNodesWatcher(RMContext rmContext) {
+    this.rmContext = rmContext;
+    pollTimer = new Timer(true);
+    mclock = new MonotonicClock();
+  }
+
+  public void init(Configuration conf) {
+    readDecommissioningTimeout(conf);
+    int v = conf.getInt(
+        YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL,
+        YarnConfiguration
+          .DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL);
+    pollTimer.schedule(new PollTimerTask(rmContext), 0, (1000L * v));
+  }
+
+  /**
+   * Update rmNode decommissioning status based on NodeStatus.
+   * @param rmNode The node
+   * @param remoteNodeStatus latest NodeStatus
+   */
+  public synchronized void update(RMNode rmNode, NodeStatus remoteNodeStatus) {
+    DecommissioningNodeContext context = decomNodes.get(rmNode.getNodeID());
+    long now = mclock.getTime();
+    if (rmNode.getState() == NodeState.DECOMMISSIONED) {
+      if (context == null) {
+        return;
+      }
+      context.nodeState = rmNode.getState();
+      // keep DECOMMISSIONED node for a while for status log, so that such
+      // host will appear as DECOMMISSIONED instead of quietly disappears.
+      if (context.decommissionedTime == 0) {
+        context.decommissionedTime = now;
+      } else if (now - context.decommissionedTime > 60000L) {
+        decomNodes.remove(rmNode.getNodeID());
+      }
+    } else if (rmNode.getState() == NodeState.DECOMMISSIONING) {
+      if (context == null) {
+        context = new DecommissioningNodeContext(rmNode.getNodeID());
+        decomNodes.put(rmNode.getNodeID(), context);
+        context.nodeState = rmNode.getState();
+        context.decommissionedTime = 0;
+      }
+      context.updateTimeout(rmNode.getDecommissioningTimeout());
+      context.lastUpdateTime = now;
+
+      if (remoteNodeStatus.getKeepAliveApplications() != null) {
+        context.appIds.addAll(remoteNodeStatus.getKeepAliveApplications());
+      }
+
+      // Count number of active containers.
+      int numActiveContainers = 0;
+      for (ContainerStatus cs : remoteNodeStatus.getContainersStatuses()) {
+        ContainerState newState = cs.getState();
+        if (newState == ContainerState.RUNNING ||
+            newState == ContainerState.NEW) {
+          numActiveContainers++;
+        }
+        context.numActiveContainers = numActiveContainers;
+        ApplicationId aid = cs.getContainerId()
+            .getApplicationAttemptId().getApplicationId();
+        if (!context.appIds.contains(aid)) {
+          context.appIds.add(aid);
+        }
+      }
+
+      context.numActiveContainers = numActiveContainers;
+
+      // maintain lastContainerFinishTime.
+      if (context.numActiveContainers == 0 &&
+          context.lastContainerFinishTime == 0) {
+        context.lastContainerFinishTime = now;
+      }
+    } else {
+      // remove node in other states
+      if (context != null) {
+        decomNodes.remove(rmNode.getNodeID());
+      }
+    }
+  }
+
+  public synchronized void remove(NodeId nodeId) {
+    DecommissioningNodeContext context = decomNodes.get(nodeId);
+    if (context != null) {
+      LOG.info("remove " + nodeId + " in " + context.nodeState);
+      decomNodes.remove(nodeId);
+    }
+  }
+
+  /**
+   * Status about a specific decommissioning node.
+   *
+   */
+  public enum DecommissioningNodeStatus {
+    // Node is not in DECOMMISSIONING state.
+    NONE,
+
+    // wait for running containers to complete
+    WAIT_CONTAINER,
+
+    // wait for running application to complete (after all containers complete);
+    WAIT_APP,
+
+    // Timeout waiting for either containers or applications to complete.
+    TIMEOUT,
+
+    // nothing to wait, ready to be decommissioned
+    READY,
+
+    // The node has already been decommissioned
+    DECOMMISSIONED,
+  }
+
+  public boolean checkReadyToBeDecommissioned(NodeId nodeId) {
+    DecommissioningNodeStatus s = checkDecommissioningStatus(nodeId);
+    return (s == DecommissioningNodeStatus.READY ||
+            s == DecommissioningNodeStatus.TIMEOUT);
+  }
+
+  public DecommissioningNodeStatus checkDecommissioningStatus(NodeId nodeId) {
+    DecommissioningNodeContext context = decomNodes.get(nodeId);
+    if (context == null) {
+      return DecommissioningNodeStatus.NONE;
+    }
+
+    if (context.nodeState == NodeState.DECOMMISSIONED) {
+      return DecommissioningNodeStatus.DECOMMISSIONED;
+    }
+
+    long waitTime = mclock.getTime() - context.decommissioningStartTime;
+    if (context.numActiveContainers > 0) {
+      return (context.timeoutMs < 0 || waitTime < context.timeoutMs)?
+          DecommissioningNodeStatus.WAIT_CONTAINER :
+          DecommissioningNodeStatus.TIMEOUT;
+    }
+
+    removeCompletedApps(context);
+    if (context.appIds.size() == 0) {
+      return DecommissioningNodeStatus.READY;
+    } else {
+      return (context.timeoutMs < 0 || waitTime < context.timeoutMs)?
+          DecommissioningNodeStatus.WAIT_APP :
+          DecommissioningNodeStatus.TIMEOUT;
+    }
+  }
+
+  /**
+   * PollTimerTask periodically:
+   *   1. log status of all DECOMMISSIONING nodes;
+   *   2. identify and taken care of stale DECOMMISSIONING nodes
+   *      (for example, node already terminated).
+   */
+  class PollTimerTask extends TimerTask {
+    private final RMContext rmContext;
+
+    public PollTimerTask(RMContext rmContext) {
+      this.rmContext = rmContext;
+    }
+
+    public void run() {
+      logDecommissioningNodesStatus();
+      long now = mclock.getTime();
+      Set<NodeId> staleNodes = new HashSet<NodeId>();
+
+      for (Iterator<Map.Entry<NodeId, DecommissioningNodeContext>> it =
+          decomNodes.entrySet().iterator(); it.hasNext();) {
+        Map.Entry<NodeId, DecommissioningNodeContext> e = it.next();
+        DecommissioningNodeContext d = e.getValue();
+        // Skip node recently updated (NM usually updates every second).
+        if (now - d.lastUpdateTime < 5000L) {
+          continue;
+        }
+        // Remove stale non-DECOMMISSIONING node
+        if (d.nodeState != NodeState.DECOMMISSIONING) {
+          LOG.debug("remove " + d.nodeState + " " + d.nodeId);
+          it.remove();
+          continue;
+        } else if (now - d.lastUpdateTime > 60000L) {
+          // Node DECOMMISSIONED could become stale, remove as necessary.
+          RMNode rmNode = getRmNode(d.nodeId);
+          if (rmNode != null &&
+              rmNode.getState() == NodeState.DECOMMISSIONED) {
+            LOG.debug("remove " + rmNode.getState() + " " + d.nodeId);
+            it.remove();
+            continue;
+          }
+        }
+        if (d.timeoutMs >= 0 &&
+            d.decommissioningStartTime + d.timeoutMs < now) {
+          staleNodes.add(d.nodeId);
+          LOG.debug("Identified stale and timeout node " + d.nodeId);
+        }
+      }
+
+      for (NodeId nodeId : staleNodes) {
+        RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
+        if (rmNode == null || rmNode.getState() != NodeState.DECOMMISSIONING) {
+          remove(nodeId);
+          continue;
+        }
+        if (rmNode.getState() == NodeState.DECOMMISSIONING &&
+            checkReadyToBeDecommissioned(rmNode.getNodeID())) {
+          LOG.info("DECOMMISSIONING " + nodeId + " timeout");
+          this.rmContext.getDispatcher().getEventHandler().handle(
+              new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
+        }
+      }
+    }
+  }
+
+  private RMNode getRmNode(NodeId nodeId) {
+    RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
+    if (rmNode == null) {
+      rmNode = this.rmContext.getInactiveRMNodes().get(nodeId);
+    }
+    return rmNode;
+  }
+
+  private void removeCompletedApps(DecommissioningNodeContext context) {
+    Iterator<ApplicationId> it = context.appIds.iterator();
+    while (it.hasNext()) {
+      ApplicationId appId = it.next();
+      RMApp rmApp = rmContext.getRMApps().get(appId);
+      if (rmApp == null) {
+        LOG.debug("Consider non-existing app " + appId + " as completed");
+        it.remove();
+        continue;
+      }
+      if (rmApp.getState() == RMAppState.FINISHED ||
+          rmApp.getState() == RMAppState.FAILED ||
+          rmApp.getState() == RMAppState.KILLED) {
+        LOG.debug("Remove " + rmApp.getState() + " app " + appId);
+        it.remove();
+      }
+    }
+  }
+
+  // Time in second to be decommissioned.
+  private int getTimeoutInSec(DecommissioningNodeContext context) {
+    if (context.nodeState == NodeState.DECOMMISSIONED) {
+      return 0;
+    } else if (context.nodeState != NodeState.DECOMMISSIONING) {
+      return -1;
+    }
+    if (context.appIds.size() == 0 && context.numActiveContainers == 0) {
+      return 0;
+    }
+    // negative timeout value means no timeout (infinite timeout).
+    if (context.timeoutMs < 0) {
+      return -1;
+    }
+
+    long now = mclock.getTime();
+    long timeout = context.decommissioningStartTime + context.timeoutMs - now;
+    return Math.max(0, (int)(timeout / 1000));
+  }
+
+  private void logDecommissioningNodesStatus() {
+    if (!LOG.isDebugEnabled() || decomNodes.size() == 0) {
+      return;
+    }
+    StringBuilder sb = new StringBuilder();
+    long now = mclock.getTime();
+    for (DecommissioningNodeContext d : decomNodes.values()) {
+      DecommissioningNodeStatus s = checkDecommissioningStatus(d.nodeId);
+      sb.append(String.format(
+          "%n  %-34s %4ds fresh:%3ds containers:%2d %14s",
+          d.nodeId.getHost(),
+          (now - d.decommissioningStartTime) / 1000,
+          (now - d.lastUpdateTime) / 1000,
+          d.numActiveContainers,
+          s));
+      if (s == DecommissioningNodeStatus.WAIT_APP ||
+          s == DecommissioningNodeStatus.WAIT_CONTAINER) {
+        sb.append(String.format(" timeout:%4ds", getTimeoutInSec(d)));
+      }
+      for (ApplicationId aid : d.appIds) {
+        sb.append("\n    " + aid);
+        RMApp rmApp = rmContext.getRMApps().get(aid);
+        if (rmApp != null) {
+          sb.append(String.format(
+              " %s %9s %5.2f%% %5ds",
+              rmApp.getState(),
+              (rmApp.getApplicationType() == null)?
+                  "" : rmApp.getApplicationType(),
+              100.0 * rmApp.getProgress(),
+              (mclock.getTime() - rmApp.getStartTime()) / 1000));
+        }
+      }
+    }
+    LOG.info("Decommissioning Nodes: " + sb.toString());
+  }
+
+  // Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml.
+  // This enables DecommissioningNodesWatcher to pick up new value
+  // without ResourceManager restart.
+  private void readDecommissioningTimeout(Configuration conf) {
+    try {
+      if (conf == null) {
+        conf = new YarnConfiguration();
+      }
+      int v = conf.getInt(
+          YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
+          YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
+      if (defaultTimeoutMs != 1000L * v) {
+        defaultTimeoutMs = 1000L * v;
+        LOG.info("Use new decommissioningTimeoutMs: " + defaultTimeoutMs);
+      }
+    } catch (Exception e) {
+      LOG.info("Error readDecommissioningTimeout ", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index 7937383..99413bc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -19,14 +19,18 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.Map;
-import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,6 +41,7 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -47,14 +52,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecommissioningEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
-
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 
+import com.google.common.annotations.VisibleForTesting;
+
 @SuppressWarnings("unchecked")
 public class NodesListManager extends CompositeService implements
     EventHandler<NodesListManagerEvent> {
@@ -178,10 +184,11 @@ public class NodesListManager extends CompositeService implements
     if (!LOG.isDebugEnabled()) {
       return;
     }
-    
-    LOG.debug("hostsReader: in=" + conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, 
+
+    LOG.debug("hostsReader: in=" +
+        conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
         YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH) + " out=" +
-        conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, 
+        conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
             YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH));
 
     Set<String> hostsList = new HashSet<String>();
@@ -196,23 +203,19 @@ public class NodesListManager extends CompositeService implements
     }
   }
 
-  public void refreshNodes(Configuration yarnConf) throws IOException,
-      YarnException {
-    refreshHostsReader(yarnConf);
+  public void refreshNodes(Configuration yarnConf)
+      throws IOException, YarnException {
+    refreshNodes(yarnConf, false);
+  }
 
-    for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
-      if (!isValidNode(nodeId.getHost())) {
-        RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
-            RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
-        this.rmContext.getDispatcher().getEventHandler().handle(
-            new RMNodeEvent(nodeId, nodeEventType));
-      }
-    }
-    updateInactiveNodes();
+  public void refreshNodes(Configuration yarnConf, boolean graceful)
+      throws IOException, YarnException {
+    refreshHostsReader(yarnConf, graceful, null);
   }
 
-  private void refreshHostsReader(Configuration yarnConf) throws IOException,
-      YarnException {
+  private void refreshHostsReader(
+      Configuration yarnConf, boolean graceful, Integer timeout)
+          throws IOException, YarnException {
     if (null == yarnConf) {
       yarnConf = new YarnConfiguration();
     }
@@ -222,8 +225,16 @@ public class NodesListManager extends CompositeService implements
     excludesFile =
         yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
             YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
+    LOG.info("refreshNodes excludesFile " + excludesFile);
     hostsReader.refresh(includesFile, excludesFile);
     printConfiguredHosts();
+
+    LOG.info("hostsReader include:{" +
+        StringUtils.join(",", hostsReader.getHosts()) +
+        "} exclude:{" +
+        StringUtils.join(",", hostsReader.getExcludedHosts()) + "}");
+
+    handleExcludeNodeList(graceful, timeout);
   }
 
   private void setDecomissionedNMs() {
@@ -237,6 +248,86 @@ public class NodesListManager extends CompositeService implements
     }
   }
 
+  // Handle excluded nodes based on following rules:
+  // Recommission DECOMMISSIONED or DECOMMISSIONING nodes no longer excluded;
+  // Gracefully decommission excluded nodes that are not already
+  // DECOMMISSIONED nor DECOMMISSIONING; Take no action for excluded nodes
+  // that are already DECOMMISSIONED or DECOMMISSIONING.
+  private void handleExcludeNodeList(boolean graceful, Integer timeout) {
+    // DECOMMISSIONED/DECOMMISSIONING nodes need to be re-commissioned.
+    List<RMNode> nodesToRecom = new ArrayList<RMNode>();
+
+    // Nodes need to be decommissioned (graceful or forceful);
+    List<RMNode> nodesToDecom = new ArrayList<RMNode>();
+
+    Set<String> includes = new HashSet<String>();
+    Map<String, Integer> excludes = new HashMap<String, Integer>();
+    hostsReader.getHostDetails(includes, excludes);
+
+    for (RMNode n : this.rmContext.getRMNodes().values()) {
+      NodeState s = n.getState();
+      // An invalid node (either due to explicit exclude or not include)
+      // should be excluded.
+      boolean isExcluded = !isValidNode(
+          n.getHostName(), includes, excludes.keySet());
+      String nodeStr = "node " + n.getNodeID() + " with state " + s;
+      if (!isExcluded) {
+        // Note that no action is needed for DECOMMISSIONED node.
+        if (s == NodeState.DECOMMISSIONING) {
+          LOG.info("Recommission " + nodeStr);
+          nodesToRecom.add(n);
+        }
+        // Otherwise no-action needed.
+      } else {
+        // exclude is true.
+        if (graceful) {
+          // Use per node timeout if exist otherwise the request timeout.
+          Integer timeoutToUse = (excludes.get(n.getHostName()) != null)?
+              excludes.get(n.getHostName()) : timeout;
+          if (s != NodeState.DECOMMISSIONED &&
+              s != NodeState.DECOMMISSIONING) {
+            LOG.info("Gracefully decommission " + nodeStr);
+            nodesToDecom.add(n);
+          } else if (s == NodeState.DECOMMISSIONING &&
+                     !Objects.equals(n.getDecommissioningTimeout(),
+                         timeoutToUse)) {
+            LOG.info("Update " + nodeStr + " timeout to be " + timeoutToUse);
+            nodesToDecom.add(n);
+          } else {
+            LOG.info("No action for " + nodeStr);
+          }
+        } else {
+          if (s != NodeState.DECOMMISSIONED) {
+            LOG.info("Forcefully decommission " + nodeStr);
+            nodesToDecom.add(n);
+          }
+        }
+      }
+    }
+
+    for (RMNode n : nodesToRecom) {
+      RMNodeEvent e = new RMNodeEvent(
+          n.getNodeID(), RMNodeEventType.RECOMMISSION);
+      this.rmContext.getDispatcher().getEventHandler().handle(e);
+    }
+
+    for (RMNode n : nodesToDecom) {
+      RMNodeEvent e;
+      if (graceful) {
+        Integer timeoutToUse = (excludes.get(n.getHostName()) != null)?
+            excludes.get(n.getHostName()) : timeout;
+        e = new RMNodeDecommissioningEvent(n.getNodeID(), timeoutToUse);
+      } else {
+        RMNodeEventType eventType = isUntrackedNode(n.getHostName())?
+            RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
+        e = new RMNodeEvent(n.getNodeID(), eventType);
+      }
+      this.rmContext.getDispatcher().getEventHandler().handle(e);
+    }
+
+    updateInactiveNodes();
+  }
+
   @VisibleForTesting
   public int getNodeRemovalCheckInterval() {
     return nodeRemovalCheckInterval;
@@ -360,11 +451,15 @@ public class NodesListManager extends CompositeService implements
   }
 
   public boolean isValidNode(String hostName) {
-    String ip = resolver.resolve(hostName);
     Set<String> hostsList = new HashSet<String>();
     Set<String> excludeList = new HashSet<String>();
     hostsReader.getHostDetails(hostsList, excludeList);
+    return isValidNode(hostName, hostsList, excludeList);
+  }
 
+  private boolean isValidNode(
+      String hostName, Set<String> hostsList, Set<String> excludeList) {
+    String ip = resolver.resolve(hostName);
     return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList
         .contains(ip))
         && !(excludeList.contains(hostName) || excludeList.contains(ip));
@@ -478,29 +573,14 @@ public class NodesListManager extends CompositeService implements
   /**
    * Refresh the nodes gracefully
    *
-   * @param conf
+   * @param yarnConf
+   * @param timeout decommission timeout, null means default timeout.
    * @throws IOException
    * @throws YarnException
    */
-  public void refreshNodesGracefully(Configuration conf) throws IOException,
-      YarnException {
-    refreshHostsReader(conf);
-    for (Entry<NodeId, RMNode> entry : rmContext.getRMNodes().entrySet()) {
-      NodeId nodeId = entry.getKey();
-      if (!isValidNode(nodeId.getHost())) {
-        RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
-            RMNodeEventType.SHUTDOWN : RMNodeEventType.GRACEFUL_DECOMMISSION;
-        this.rmContext.getDispatcher().getEventHandler().handle(
-            new RMNodeEvent(nodeId, nodeEventType));
-      } else {
-        // Recommissioning the nodes
-        if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
-          this.rmContext.getDispatcher().getEventHandler()
-              .handle(new RMNodeEvent(nodeId, RMNodeEventType.RECOMMISSION));
-        }
-      }
-    }
-    updateInactiveNodes();
+  public void refreshNodesGracefully(Configuration yarnConf, Integer timeout)
+      throws IOException, YarnException {
+    refreshHostsReader(yarnConf, true, timeout);
   }
 
   /**
@@ -596,4 +676,4 @@ public class NodesListManager extends CompositeService implements
       this.host = hst;
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index 9b9b02e..5e9827a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -70,7 +70,7 @@ public class RMServerUtils {
 
   public static List<RMNode> queryRMNodes(RMContext context,
       EnumSet<NodeState> acceptedStates) {
-    // nodes contains nodes that are NEW, RUNNING OR UNHEALTHY
+    // nodes contains nodes that are NEW, RUNNING, UNHEALTHY or DECOMMISSIONING.
     ArrayList<RMNode> results = new ArrayList<RMNode>();
     if (acceptedStates.contains(NodeState.NEW) ||
         acceptedStates.contains(NodeState.RUNNING) ||

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 9d480f3..51fc0bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -113,6 +113,8 @@ public class ResourceTrackerService extends AbstractService implements
   private int minAllocMb;
   private int minAllocVcores;
 
+  private DecommissioningNodesWatcher decommissioningWatcher;
+
   private boolean isDistributedNodeLabelsConf;
   private boolean isDelegatedCentralizedNodeLabelsConf;
   private DynamicResourceConfiguration drConf;
@@ -131,6 +133,7 @@ public class ResourceTrackerService extends AbstractService implements
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
+    this.decommissioningWatcher = new DecommissioningNodesWatcher(rmContext);
   }
 
   @Override
@@ -170,6 +173,7 @@ public class ResourceTrackerService extends AbstractService implements
     }
 
     loadDynamicResourceConfiguration(conf);
+    decommissioningWatcher.init(conf);
     super.serviceInit(conf);
   }
 
@@ -494,6 +498,7 @@ public class ResourceTrackerService extends AbstractService implements
 
     // Send ping
     this.nmLivelinessMonitor.receivedPing(nodeId);
+    this.decommissioningWatcher.update(rmNode, remoteNodeStatus);
 
     // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
     NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
@@ -526,6 +531,20 @@ public class ResourceTrackerService extends AbstractService implements
       updateAppCollectorsMap(request);
     }
 
+    // Evaluate whether a DECOMMISSIONING node is ready to be DECOMMISSIONED.
+    if (rmNode.getState() == NodeState.DECOMMISSIONING &&
+        decommissioningWatcher.checkReadyToBeDecommissioned(
+            rmNode.getNodeID())) {
+      String message = "DECOMMISSIONING " + nodeId +
+          " is ready to be decommissioned";
+      LOG.info(message);
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
+      this.nmLivelinessMonitor.unregister(nodeId);
+      return YarnServerBuilderUtils.newNodeHeartbeatResponse(
+          NodeAction.SHUTDOWN, message);
+    }
+
     // Heartbeat response
     NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
         .newNodeHeartbeatResponse(lastNodeHeartbeatResponse.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index 3a9cf54..10e2afa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -175,4 +175,10 @@ public interface RMNode {
   long getUntrackedTimeStamp();
 
   void setUntrackedTimeStamp(long timeStamp);
+  /*
+   * Optional decommissioning timeout in second
+   * (null indicates default timeout).
+   * @return the decommissioning timeout in second.
+   */
+  Integer getDecommissioningTimeout();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java
new file mode 100644
index 0000000..9955e9e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+/**
+ * RMNode Decommissioning Event.
+ *
+ */
+public class RMNodeDecommissioningEvent extends RMNodeEvent {
+  // Optional decommissioning timeout in second.
+  private final Integer decommissioningTimeout;
+
+  // Create instance with optional timeout
+  // (timeout could be null which means use default).
+  public RMNodeDecommissioningEvent(NodeId nodeId, Integer timeout) {
+    super(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION);
+    this.decommissioningTimeout = timeout;
+  }
+
+  public Integer getDecommissioningTimeout() {
+    return this.decommissioningTimeout;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index a3a6b30..d1ccecb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -124,6 +125,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   private String healthReport;
   private long lastHealthReportTime;
   private String nodeManagerVersion;
+  private Integer decommissioningTimeout;
 
   private long timeStamp;
   /* Aggregated resource utilization for the containers. */
@@ -179,7 +181,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
                                            NodeState,
                                            RMNodeEventType,
                                            RMNodeEvent>(NodeState.NEW)
-
       //Transitions from NEW state
       .addTransition(NodeState.NEW, NodeState.RUNNING,
           RMNodeEventType.STARTED, new AddNodeTransition())
@@ -265,6 +266,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       .addTransition(NodeState.DECOMMISSIONING, NodeState.REBOOTED,
           RMNodeEventType.REBOOTING,
           new DeactivateNodeTransition(NodeState.REBOOTED))
+      .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
+         RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+         new AddContainersToBeRemovedFromNMTransition())
 
       .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
           RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
@@ -633,7 +637,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       } catch (InvalidStateTransitionException e) {
         LOG.error("Can't handle this event at current state", e);
         LOG.error("Invalid event " + event.getType() + 
-            " on Node  " + this.nodeId);
+            " on Node  " + this.nodeId + " oldState " + oldState);
       }
       if (oldState != getState()) {
         LOG.info(nodeId + " Node Transitioned from " + oldState + " to "
@@ -666,6 +670,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     case SHUTDOWN:
       metrics.decrNumShutdownNMs();
       break;
+    case DECOMMISSIONING:
+      metrics.decrDecommissioningNMs();
+      break;
     default:
       LOG.debug("Unexpected previous node state");
     }
@@ -712,6 +719,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     case DECOMMISSIONING:
       metrics.decrDecommissioningNMs();
       break;
+    case DECOMMISSIONED:
+      metrics.decrDecommisionedNMs();
+      break;
     case UNHEALTHY:
       metrics.decrNumUnhealthyNMs();
       break;
@@ -1087,9 +1097,26 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+      Integer timeout = null;
+      if (RMNodeDecommissioningEvent.class.isInstance(event)) {
+        RMNodeDecommissioningEvent e = ((RMNodeDecommissioningEvent) event);
+        timeout = e.getDecommissioningTimeout();
+      }
+      // Pick up possible updates on decommissioningTimeout.
+      if (rmNode.getState() == NodeState.DECOMMISSIONING) {
+        if (!Objects.equals(rmNode.getDecommissioningTimeout(), timeout)) {
+          LOG.info("Update " + rmNode.getNodeID() +
+                   " DecommissioningTimeout to be " + timeout);
+          rmNode.decommissioningTimeout = timeout;
+        } else {
+          LOG.info(rmNode.getNodeID() + " is already DECOMMISSIONING");
+        }
+        return;
+      }
       LOG.info("Put Node " + rmNode.nodeId + " in DECOMMISSIONING.");
       // Update NM metrics during graceful decommissioning.
       rmNode.updateMetricsForGracefulDecommission(initState, finalState);
+      rmNode.decommissioningTimeout = timeout;
       if (rmNode.originalTotalCapability == null){
         rmNode.originalTotalCapability =
             Resources.clone(rmNode.totalCapability);
@@ -1156,24 +1183,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
           return NodeState.UNHEALTHY;
         }
       }
-      if (isNodeDecommissioning) {
-        List<ApplicationId> runningApps = rmNode.getRunningApps();
-
-        List<ApplicationId> keepAliveApps = statusEvent.getKeepAliveAppIds();
-
-        // no running (and keeping alive) app on this node, get it
-        // decommissioned.
-        // TODO may need to check no container is being scheduled on this node
-        // as well.
-        if ((runningApps == null || runningApps.size() == 0)
-            && (keepAliveApps == null || keepAliveApps.size() == 0)) {
-          RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED);
-          return NodeState.DECOMMISSIONED;
-        }
-
-        // TODO (in YARN-3223) if node in decommissioning, get node resource
-        // updated if container get finished (keep available resource to be 0)
-      }
 
       rmNode.handleContainerStatus(statusEvent.getContainers());
       rmNode.handleReportedIncreasedContainers(
@@ -1472,4 +1481,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   public void setUntrackedTimeStamp(long ts) {
     this.timeStamp = ts;
   }
+
+  @Override
+  public Integer getDecommissioningTimeout() {
+    return decommissioningTimeout;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0da69c32/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
index 3012d0d..1789e09 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
@@ -97,7 +97,7 @@ public class ClusterMetricsInfo {
     this.rebootedNodes = clusterMetrics.getNumRebootedNMs();
     this.shutdownNodes = clusterMetrics.getNumShutdownNMs();
     this.totalNodes = activeNodes + lostNodes + decommissionedNodes
-        + rebootedNodes + unhealthyNodes + shutdownNodes;
+        + rebootedNodes + unhealthyNodes + decommissioningNodes + shutdownNodes;
   }
 
   public int getAppsSubmitted() {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message