hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kkarana...@apache.org
Subject [34/50] [abbrv] hadoop git commit: YARN-6483. Add nodes transitioning to DECOMMISSIONING state to the list of updated nodes returned to the AM. (Juan Rodriguez Hortala via asuresh)
Date Tue, 28 Nov 2017 21:47:52 GMT
YARN-6483. Add nodes transitioning to DECOMMISSIONING state to the list of updated nodes returned to the AM. (Juan Rodriguez Hortala via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b46ca7e7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b46ca7e7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b46ca7e7

Branch: refs/heads/YARN-6592
Commit: b46ca7e73b8bac3fdbff0b13afe009308078acf2
Parents: aab4395
Author: Arun Suresh <asuresh@apache.org>
Authored: Wed Nov 22 19:16:44 2017 -0800
Committer: Arun Suresh <asuresh@apache.org>
Committed: Wed Nov 22 19:18:30 2017 -0800

----------------------------------------------------------------------
 .../hadoop/yarn/api/records/NodeReport.java     |  47 ++++++--
 .../hadoop/yarn/api/records/NodeUpdateType.java |  29 +++++
 .../src/main/proto/yarn_protos.proto            |   8 ++
 .../hadoop/yarn/client/ProtocolHATestBase.java  |  14 +--
 .../hadoop/yarn/client/cli/TestYarnCLI.java     |   2 +-
 .../api/records/impl/pb/NodeReportPBImpl.java   |  50 +++++++-
 .../yarn/api/records/impl/pb/ProtoUtils.java    |  12 ++
 .../hadoop/yarn/server/utils/BuilderUtils.java  |  14 ++-
 .../server/resourcemanager/ClientRMService.java |   5 +-
 .../DecommissioningNodesWatcher.java            |  38 +-----
 .../resourcemanager/DefaultAMSProcessor.java    |  12 +-
 .../resourcemanager/NodesListManager.java       |  78 +++++++++----
 .../NodesListManagerEventType.java              |   3 +-
 .../server/resourcemanager/rmapp/RMApp.java     |  10 +-
 .../server/resourcemanager/rmapp/RMAppImpl.java |  11 +-
 .../rmapp/RMAppNodeUpdateEvent.java             |   9 +-
 .../server/resourcemanager/rmnode/RMNode.java   |   2 +-
 .../resourcemanager/rmnode/RMNodeImpl.java      |   5 +
 .../yarn/server/resourcemanager/MockRM.java     |  15 +++
 .../resourcemanager/TestClientRMService.java    |  50 ++++++++
 .../TestDecommissioningNodesWatcher.java        |   4 +-
 .../resourcemanager/TestRMNodeTransitions.java  |  13 ++-
 .../TestResourceTrackerService.java             | 116 ++++++++++++++++++-
 .../applicationsmanager/MockAsm.java            |   4 +-
 .../TestAMRMRPCNodeUpdates.java                 |  51 ++++++++
 .../server/resourcemanager/rmapp/MockRMApp.java |   4 +-
 26 files changed, 495 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java
index 885a3b4..3a80641 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeReport.java
@@ -53,7 +53,8 @@ public abstract class NodeReport {
       String httpAddress, String rackName, Resource used, Resource capability,
       int numContainers, String healthReport, long lastHealthReportTime) {
     return newInstance(nodeId, nodeState, httpAddress, rackName, used,
-        capability, numContainers, healthReport, lastHealthReportTime, null);
+        capability, numContainers, healthReport, lastHealthReportTime,
+        null, null, null);
   }
 
   @Private
@@ -61,7 +62,8 @@ public abstract class NodeReport {
   public static NodeReport newInstance(NodeId nodeId, NodeState nodeState,
       String httpAddress, String rackName, Resource used, Resource capability,
       int numContainers, String healthReport, long lastHealthReportTime,
-      Set<String> nodeLabels) {
+      Set<String> nodeLabels, Integer decommissioningTimeout,
+      NodeUpdateType nodeUpdateType) {
     NodeReport nodeReport = Records.newRecord(NodeReport.class);
     nodeReport.setNodeId(nodeId);
     nodeReport.setNodeState(nodeState);
@@ -73,6 +75,8 @@ public abstract class NodeReport {
     nodeReport.setHealthReport(healthReport);
     nodeReport.setLastHealthReportTime(lastHealthReportTime);
     nodeReport.setNodeLabels(nodeLabels);
+    nodeReport.setDecommissioningTimeout(decommissioningTimeout);
+    nodeReport.setNodeUpdateType(nodeUpdateType);
     return nodeReport;
   }
 
@@ -186,8 +190,8 @@ public abstract class NodeReport {
   public abstract void setLastHealthReportTime(long lastHealthReport);
   
   /**
-   * Get labels of this node
-   * @return labels of this node
+   * Get labels of this node.
+   * @return labels of this node.
    */
   @Public
   @Stable
@@ -198,8 +202,8 @@ public abstract class NodeReport {
   public abstract void setNodeLabels(Set<String> nodeLabels);
 
   /**
-   * Get containers aggregated resource utilization in a node
-   * @return containers resource utilization
+   * Get containers aggregated resource utilization in a node.
+   * @return containers resource utilization.
    */
   @Public
   @Stable
@@ -217,8 +221,8 @@ public abstract class NodeReport {
   }
 
   /**
-   * Get node resource utilization
-   * @return node resource utilization
+   * Get node resource utilization.
+   * @return node resource utilization.
    */
   @Public
   @Stable
@@ -227,4 +231,31 @@ public abstract class NodeReport {
   @Private
   @Unstable
   public abstract void setNodeUtilization(ResourceUtilization nodeUtilization);
+
+  /**
+   * Optional decommissioning timeout in seconds (null indicates absent
+   * timeout).
+   * @return the decommissioning timeout in second.
+   */
+  public Integer getDecommissioningTimeout() {
+    return null;
+  }
+
+  /**
+   * Set the decommissioning timeout in seconds (null indicates absent timeout).
+   * */
+  public void setDecommissioningTimeout(Integer decommissioningTimeout) {}
+
+  /**
+   * Optional node update type (null indicates absent update type).
+   * @return the node update.
+   */
+  public NodeUpdateType getNodeUpdateType() {
+    return NodeUpdateType.NODE_UNUSABLE;
+  }
+
+  /**
+   * Set the node update type (null indicates absent node update type).
+   * */
+  public void setNodeUpdateType(NodeUpdateType nodeUpdateType) {}
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeUpdateType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeUpdateType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeUpdateType.java
new file mode 100644
index 0000000..9152a6a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeUpdateType.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+/**
+ * <p>Taxonomy of the <code>NodeState</code> that a
+ * <code>Node</code> might transition into.</p>
+ * */
+public enum NodeUpdateType {
+  NODE_USABLE,
+  NODE_UNUSABLE,
+  NODE_DECOMMISSIONING
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 7769c48..fdbe2d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -332,6 +332,12 @@ message NodeIdProto {
   optional int32 port = 2;
 }
 
+enum NodeUpdateTypeProto {
+  NODE_USABLE = 0;
+  NODE_UNUSABLE = 1;
+  NODE_DECOMMISSIONING = 2;
+}
+
 message NodeReportProto {
   optional NodeIdProto nodeId = 1;
   optional string httpAddress = 2;
@@ -345,6 +351,8 @@ message NodeReportProto {
   repeated string node_labels = 10;
   optional ResourceUtilizationProto containers_utilization = 11;
   optional ResourceUtilizationProto node_utilization = 12;
+  optional uint32 decommissioning_timeout = 13;
+  optional NodeUpdateTypeProto node_update_type = 14;
 }
 
 message NodeIdToLabelsProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
index f4005e9..54537ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
@@ -618,7 +618,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
     }
 
     public ApplicationReport createFakeAppReport() {
-      ApplicationId appId = ApplicationId.newInstance(1000l, 1);
+      ApplicationId appId = ApplicationId.newInstance(1000L, 1);
       ApplicationAttemptId attemptId =
           ApplicationAttemptId.newInstance(appId, 1);
       // create a fake application report
@@ -626,7 +626,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
           ApplicationReport.newInstance(appId, attemptId, "fakeUser",
               "fakeQueue", "fakeApplicationName", "localhost", 0, null,
               YarnApplicationState.FINISHED, "fake an application report", "",
-              1000l, 1200l, FinalApplicationStatus.FAILED, null, "", 50f,
+              1000L, 1200L, FinalApplicationStatus.FAILED, null, "", 50f,
               "fakeApplicationType", null);
       return report;
     }
@@ -638,7 +638,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
     }
 
     public ApplicationId createFakeAppId() {
-      return ApplicationId.newInstance(1000l, 1);
+      return ApplicationId.newInstance(1000L, 1);
     }
 
     public ApplicationAttemptId createFakeApplicationAttemptId() {
@@ -657,7 +657,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
       NodeId nodeId = NodeId.newInstance("localhost", 0);
       NodeReport report =
           NodeReport.newInstance(nodeId, NodeState.RUNNING, "localhost",
-              "rack1", null, null, 4, null, 1000l, null);
+              "rack1", null, null, 4, null, 1000L);
       List<NodeReport> reports = new ArrayList<NodeReport>();
       reports.add(report);
       return reports;
@@ -680,8 +680,8 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
     public ApplicationAttemptReport createFakeApplicationAttemptReport() {
       return ApplicationAttemptReport.newInstance(
           createFakeApplicationAttemptId(), "localhost", 0, "", "", "",
-          YarnApplicationAttemptState.RUNNING, createFakeContainerId(), 1000l,
-          1200l);
+          YarnApplicationAttemptState.RUNNING, createFakeContainerId(), 1000L,
+          1200L);
     }
 
     public List<ApplicationAttemptReport>
@@ -694,7 +694,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
 
     public ContainerReport createFakeContainerReport() {
       return ContainerReport.newInstance(createFakeContainerId(), null,
-          NodeId.newInstance("localhost", 0), null, 1000l, 1200l, "", "", 0,
+          NodeId.newInstance("localhost", 0), null, 1000L, 1200L, "", "", 0,
           ContainerState.COMPLETE,
           "http://" + NodeId.newInstance("localhost", 0).toString());
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
index da1659f..54bd71e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
@@ -1992,7 +1992,7 @@ public class TestYarnCLI {
       NodeReport nodeReport = NodeReport.newInstance(NodeId
         .newInstance("host" + i, 0), state, "host" + 1 + ":8888",
           "rack1", Records.newRecord(Resource.class), Records
-              .newRecord(Resource.class), 0, "", 0, nodeLabels);
+              .newRecord(Resource.class), 0, "", 0, nodeLabels, null, null);
       if (!emptyResourceUtilization) {
         ResourceUtilization containersUtilization = ResourceUtilization
             .newInstance(1024, 2048, 4);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java
index 0d205e9..ced588d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NodeReportPBImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
@@ -150,8 +151,9 @@ public class NodeReportPBImpl extends NodeReport {
   @Override
   public void setNodeId(NodeId nodeId) {
     maybeInitBuilder();
-    if (nodeId == null)
+    if (nodeId == null) {
       builder.clearNodeId();
+    }
     this.nodeId = nodeId;
   }
   
@@ -177,8 +179,9 @@ public class NodeReportPBImpl extends NodeReport {
   @Override
   public void setCapability(Resource capability) {
     maybeInitBuilder();
-    if (capability == null)
+    if (capability == null) {
       builder.clearCapability();
+    }
     this.capability = capability;
   }
 
@@ -215,8 +218,9 @@ public class NodeReportPBImpl extends NodeReport {
   @Override
   public void setUsed(Resource used) {
     maybeInitBuilder();
-    if (used == null)
+    if (used == null) {
       builder.clearUsed();
+    }
     this.used = used;
   }
 
@@ -234,8 +238,9 @@ public class NodeReportPBImpl extends NodeReport {
 
   @Override
   public boolean equals(Object other) {
-    if (other == null)
+    if (other == null) {
       return false;
+    }
     if (other.getClass().isAssignableFrom(this.getClass())) {
       return this.getProto().equals(this.getClass().cast(other).getProto());
     }
@@ -278,8 +283,9 @@ public class NodeReportPBImpl extends NodeReport {
   }
 
   private void mergeLocalToProto() {
-    if (viaProto)
+    if (viaProto) {
       maybeInitBuilder();
+    }
     mergeLocalToBuilder();
     proto = builder.build();
     viaProto = true;
@@ -387,4 +393,38 @@ public class NodeReportPBImpl extends NodeReport {
     }
     this.nodeUtilization = nodeResourceUtilization;
   }
+
+  @Override
+  public Integer getDecommissioningTimeout() {
+    NodeReportProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.hasDecommissioningTimeout())
+        ? p.getDecommissioningTimeout() : null;
+  }
+
+  @Override
+  public void setDecommissioningTimeout(Integer decommissioningTimeout) {
+    maybeInitBuilder();
+    if (decommissioningTimeout == null || decommissioningTimeout < 0) {
+      builder.clearDecommissioningTimeout();
+      return;
+    }
+    builder.setDecommissioningTimeout(decommissioningTimeout);
+  }
+
+  @Override
+  public NodeUpdateType getNodeUpdateType() {
+    NodeReportProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.hasNodeUpdateType()) ?
+        ProtoUtils.convertFromProtoFormat(p.getNodeUpdateType()) : null;
+  }
+
+  @Override
+  public void setNodeUpdateType(NodeUpdateType nodeUpdateType) {
+    maybeInitBuilder();
+    if (nodeUpdateType == null) {
+      builder.clearNodeUpdateType();
+      return;
+    }
+    builder.setNodeUpdateType(ProtoUtils.convertToProtoFormat(nodeUpdateType));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index 528cf8e..f3e665b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
@@ -80,6 +81,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceTypesProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeUpdateTypeProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateTypeProto;
 import org.apache.hadoop.yarn.server.api.ContainerType;
@@ -344,6 +346,16 @@ public class ProtoUtils {
   }
 
   /*
+  * NodeUpdateType
+  */
+  public static NodeUpdateTypeProto convertToProtoFormat(NodeUpdateType e) {
+    return NodeUpdateTypeProto.valueOf(e.name());
+  }
+  public static NodeUpdateType convertFromProtoFormat(NodeUpdateTypeProto e) {
+    return NodeUpdateType.valueOf(e.name());
+  }
+
+  /*
    * ExecutionType
    */
   public static ExecutionTypeProto convertToProtoFormat(ExecutionType e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index fec2681..83f912f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -187,23 +188,26 @@ public class BuilderUtils {
       String httpAddress, String rackName, Resource used, Resource capability,
       int numContainers, String healthReport, long lastHealthReportTime) {
     return newNodeReport(nodeId, nodeState, httpAddress, rackName, used,
-        capability, numContainers, healthReport, lastHealthReportTime, null);
+        capability, numContainers, healthReport, lastHealthReportTime,
+        null, null, null);
   }
 
   public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
       String httpAddress, String rackName, Resource used, Resource capability,
       int numContainers, String healthReport, long lastHealthReportTime,
-      Set<String> nodeLabels) {
+      Set<String> nodeLabels, Integer decommissioningTimeout,
+      NodeUpdateType nodeUpdateType) {
     return newNodeReport(nodeId, nodeState, httpAddress, rackName, used,
         capability, numContainers, healthReport, lastHealthReportTime,
-        nodeLabels, null, null);
+        nodeLabels, null, null, decommissioningTimeout, nodeUpdateType);
   }
 
   public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
       String httpAddress, String rackName, Resource used, Resource capability,
       int numContainers, String healthReport, long lastHealthReportTime,
       Set<String> nodeLabels, ResourceUtilization containersUtilization,
-      ResourceUtilization nodeUtilization) {
+      ResourceUtilization nodeUtilization, Integer decommissioningTimeout,
+      NodeUpdateType nodeUpdateType) {
     NodeReport nodeReport = recordFactory.newRecordInstance(NodeReport.class);
     nodeReport.setNodeId(nodeId);
     nodeReport.setNodeState(nodeState);
@@ -217,6 +221,8 @@ public class BuilderUtils {
     nodeReport.setNodeLabels(nodeLabels);
     nodeReport.setAggregatedContainersUtilization(containersUtilization);
     nodeReport.setNodeUtilization(nodeUtilization);
+    nodeReport.setDecommissioningTimeout(decommissioningTimeout);
+    nodeReport.setNodeUpdateType(nodeUpdateType);
     return nodeReport;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index e9c6eac..55a3f0b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -1024,7 +1024,7 @@ public class ClientRMService extends AbstractService implements
     return response;
   }
 
-  private NodeReport createNodeReports(RMNode rmNode) {    
+  private NodeReport createNodeReports(RMNode rmNode) {
     SchedulerNodeReport schedulerNodeReport = 
         scheduler.getNodeReport(rmNode.getNodeID());
     Resource used = BuilderUtils.newResource(0, 0);
@@ -1040,7 +1040,8 @@ public class ClientRMService extends AbstractService implements
             rmNode.getTotalCapability(), numContainers,
             rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
             rmNode.getNodeLabels(), rmNode.getAggregatedContainersUtilization(),
-            rmNode.getNodeUtilization());
+            rmNode.getNodeUtilization(), rmNode.getDecommissioningTimeout(),
+            null);
 
     return report;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
index 9631803..ca3eb79 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
@@ -72,11 +72,6 @@ public class DecommissioningNodesWatcher {
 
   private final RMContext rmContext;
 
-  // Default timeout value in mills.
-  // Negative value indicates no timeout. 0 means immediate.
-  private long defaultTimeoutMs =
-      1000L * YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT;
-
   // Once a RMNode is observed in DECOMMISSIONING state,
   // All its ContainerStatus update are tracked inside DecomNodeContext.
   class DecommissioningNodeContext {
@@ -105,16 +100,15 @@ public class DecommissioningNodesWatcher {
 
     private long lastUpdateTime;
 
-    public DecommissioningNodeContext(NodeId nodeId) {
+    public DecommissioningNodeContext(NodeId nodeId, int timeoutSec) {
       this.nodeId = nodeId;
       this.appIds = new HashSet<ApplicationId>();
       this.decommissioningStartTime = mclock.getTime();
-      this.timeoutMs = defaultTimeoutMs;
+      this.timeoutMs = 1000L * timeoutSec;
     }
 
-    void updateTimeout(Integer timeoutSec) {
-      this.timeoutMs = (timeoutSec == null)?
-          defaultTimeoutMs : (1000L * timeoutSec);
+    void updateTimeout(int timeoutSec) {
+      this.timeoutMs = 1000L * timeoutSec;
     }
   }
 
@@ -132,7 +126,6 @@ public class DecommissioningNodesWatcher {
   }
 
   public void init(Configuration conf) {
-    readDecommissioningTimeout(conf);
     int v = conf.getInt(
         YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL,
         YarnConfiguration
@@ -162,7 +155,8 @@ public class DecommissioningNodesWatcher {
       }
     } else if (rmNode.getState() == NodeState.DECOMMISSIONING) {
       if (context == null) {
-        context = new DecommissioningNodeContext(rmNode.getNodeID());
+        context = new DecommissioningNodeContext(rmNode.getNodeID(),
+            rmNode.getDecommissioningTimeout());
         decomNodes.put(rmNode.getNodeID(), context);
         context.nodeState = rmNode.getState();
         context.decommissionedTime = 0;
@@ -416,24 +410,4 @@ public class DecommissioningNodesWatcher {
       LOG.debug("Decommissioning node: " + sb.toString());
     }
   }
-
-  // Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml.
-  // This enables DecommissioningNodesWatcher to pick up new value
-  // without ResourceManager restart.
-  private void readDecommissioningTimeout(Configuration conf) {
-    try {
-      if (conf == null) {
-        conf = new YarnConfiguration();
-      }
-      int v = conf.getInt(
-          YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
-          YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
-      if (defaultTimeoutMs != 1000L * v) {
-        defaultTimeoutMs = 1000L * v;
-        LOG.info("Use new decommissioningTimeoutMs: " + defaultTimeoutMs);
-      }
-    } catch (Exception e) {
-      LOG.info("Error readDecommissioningTimeout ", e);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
index 9774a1a..4c2531e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.api.records.PreemptionContainer;
 import org.apache.hadoop.yarn.api.records.PreemptionContract;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
@@ -83,6 +84,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -326,10 +329,12 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
   }
 
   private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) {
-    List<RMNode> updatedNodes = new ArrayList<>();
+    Map<RMNode, NodeUpdateType> updatedNodes = new HashMap<>();
     if(app.pullRMNodeUpdates(updatedNodes) > 0) {
       List<NodeReport> updatedNodeReports = new ArrayList<>();
-      for(RMNode rmNode: updatedNodes) {
+      for(Map.Entry<RMNode, NodeUpdateType> rmNodeEntry :
+          updatedNodes.entrySet()) {
+        RMNode rmNode = rmNodeEntry.getKey();
         SchedulerNodeReport schedulerNodeReport =
             getScheduler().getNodeReport(rmNode.getNodeID());
         Resource used = BuilderUtils.newResource(0, 0);
@@ -344,7 +349,8 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
                 rmNode.getHttpAddress(), rmNode.getRackName(), used,
                 rmNode.getTotalCapability(), numContainers,
                 rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
-                rmNode.getNodeLabels());
+                rmNode.getNodeLabels(), rmNode.getDecommissioningTimeout(),
+                rmNodeEntry.getValue());
 
         updatedNodeReports.add(report);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index edd173b..647dfa3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -72,6 +72,11 @@ public class NodesListManager extends CompositeService implements
   private Configuration conf;
   private final RMContext rmContext;
 
+  // Default decommissioning timeout value in seconds.
+  // Negative value indicates no timeout. 0 means immediate.
+  private int defaultDecTimeoutSecs =
+      YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT;
+
   private String includesFile;
   private String excludesFile;
 
@@ -214,6 +219,11 @@ public class NodesListManager extends CompositeService implements
   private void refreshHostsReader(
       Configuration yarnConf, boolean graceful, Integer timeout)
           throws IOException, YarnException {
+    // resolve the default timeout to the decommission timeout that is
+    // configured at this moment
+    if (null == timeout) {
+      timeout = readDecommissioningTimeout(yarnConf);
+    }
     if (null == yarnConf) {
       yarnConf = new YarnConfiguration();
     }
@@ -252,7 +262,7 @@ public class NodesListManager extends CompositeService implements
   // Gracefully decommission excluded nodes that are not already
   // DECOMMISSIONED nor DECOMMISSIONING; Take no action for excluded nodes
   // that are already DECOMMISSIONED or DECOMMISSIONING.
-  private void handleExcludeNodeList(boolean graceful, Integer timeout) {
+  private void handleExcludeNodeList(boolean graceful, int timeout) {
     // DECOMMISSIONED/DECOMMISSIONING nodes need to be re-commissioned.
     List<RMNode> nodesToRecom = new ArrayList<RMNode>();
 
@@ -463,36 +473,40 @@ public class NodesListManager extends CompositeService implements
         && !(excludeList.contains(hostName) || excludeList.contains(ip));
   }
 
+  private void sendRMAppNodeUpdateEventToNonFinalizedApps(
+      RMNode eventNode, RMAppNodeUpdateType appNodeUpdateType) {
+    for(RMApp app : rmContext.getRMApps().values()) {
+      if (!app.isAppFinalStateStored()) {
+        this.rmContext
+            .getDispatcher()
+            .getEventHandler()
+            .handle(
+                new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
+                    appNodeUpdateType));
+      }
+    }
+  }
+
   @Override
   public void handle(NodesListManagerEvent event) {
     RMNode eventNode = event.getNode();
     switch (event.getType()) {
     case NODE_UNUSABLE:
       LOG.debug(eventNode + " reported unusable");
-      for(RMApp app: rmContext.getRMApps().values()) {
-        if (!app.isAppFinalStateStored()) {
-          this.rmContext
-              .getDispatcher()
-              .getEventHandler()
-              .handle(
-                  new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
-                      RMAppNodeUpdateType.NODE_UNUSABLE));
-        }
-      }
+      sendRMAppNodeUpdateEventToNonFinalizedApps(eventNode,
+          RMAppNodeUpdateType.NODE_UNUSABLE);
       break;
     case NODE_USABLE:
       LOG.debug(eventNode + " reported usable");
-      for (RMApp app : rmContext.getRMApps().values()) {
-        if (!app.isAppFinalStateStored()) {
-          this.rmContext
-              .getDispatcher()
-              .getEventHandler()
-              .handle(
-                  new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
-                      RMAppNodeUpdateType.NODE_USABLE));
-        }
-      }
+      sendRMAppNodeUpdateEventToNonFinalizedApps(eventNode,
+          RMAppNodeUpdateType.NODE_USABLE);
+      break;
+    case NODE_DECOMMISSIONING:
+      LOG.debug(eventNode + " reported decommissioning");
+      sendRMAppNodeUpdateEventToNonFinalizedApps(
+          eventNode, RMAppNodeUpdateType.NODE_DECOMMISSIONING);
       break;
+
     default:
       LOG.error("Ignoring invalid eventtype " + event.getType());
     }
@@ -611,6 +625,28 @@ public class NodesListManager extends CompositeService implements
     }
   }
 
+  // Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml.
+  // This enables NodesListManager to pick up new value without
+  // ResourceManager restart.
+  private int readDecommissioningTimeout(Configuration pConf) {
+    try {
+      if (pConf == null) {
+        pConf = new YarnConfiguration();
+      }
+      int configuredDefaultDecTimeoutSecs =
+          pConf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
+              YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
+      if (defaultDecTimeoutSecs != configuredDefaultDecTimeoutSecs) {
+        defaultDecTimeoutSecs = configuredDefaultDecTimeoutSecs;
+        LOG.info("Use new decommissioningTimeoutSecs: "
+            + defaultDecTimeoutSecs);
+      }
+    } catch (Exception e) {
+      LOG.warn("Error readDecommissioningTimeout " + e.getMessage());
+    }
+    return defaultDecTimeoutSecs;
+  }
+
   /**
    * A NodeId instance needed upon startup for populating inactive nodes Map.
    * It only knows the hostname/ip and marks the port to -1 or invalid.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java
index 2afc8e6..db16bc4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManagerEventType.java
@@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 
 public enum NodesListManagerEventType {
   NODE_USABLE,
-  NODE_UNUSABLE
+  NODE_UNUSABLE,
+  NODE_DECOMMISSIONING
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index 93c41b6..8583789 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -154,10 +154,12 @@ public interface RMApp extends EventHandler<RMAppEvent> {
    * received by the RMApp. Updates can be node becoming lost or becoming
    * healthy etc. The method clears the information from the {@link RMApp}. So
    * each call to this method gives the delta from the previous call.
-   * @param updatedNodes Collection into which the updates are transferred
-   * @return the number of nodes added to the {@link Collection}
+   * @param updatedNodes Map into which the updates are transferred, with each
+   * node updates as the key, and the {@link NodeUpdateType} for that update
+   * as the corresponding value.
+   * @return the number of nodes added to the {@link Map}
    */
-  int pullRMNodeUpdates(Collection<RMNode> updatedNodes);
+  int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> updatedNodes);
 
   /**
    * The finish time of the {@link RMApp}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 6896254..0266b83 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -20,11 +20,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 import java.net.InetAddress;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -60,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -149,7 +148,7 @@ public class RMAppImpl implements RMApp, Recoverable {
   private final Map<ApplicationAttemptId, RMAppAttempt> attempts
       = new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
   private final long submitTime;
-  private final Set<RMNode> updatedNodes = new HashSet<RMNode>();
+  private final Map<RMNode, NodeUpdateType> updatedNodes = new HashMap<>();
   private final String applicationType;
   private final Set<String> applicationTags;
 
@@ -677,11 +676,11 @@ public class RMAppImpl implements RMApp, Recoverable {
   }
 
   @Override
-  public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
+  public int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> upNodes) {
     this.writeLock.lock();
     try {
       int updatedNodeCount = this.updatedNodes.size();
-      updatedNodes.addAll(this.updatedNodes);
+      upNodes.putAll(this.updatedNodes);
       this.updatedNodes.clear();
       return updatedNodeCount;
     } finally {
@@ -987,7 +986,7 @@ public class RMAppImpl implements RMApp, Recoverable {
 
   private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
     NodeState nodeState = node.getState();
-    updatedNodes.add(node);
+    updatedNodes.put(node, RMAppNodeUpdateType.convertToNodeUpdateType(type));
     LOG.debug("Received node update event:" + type + " for node:" + node
         + " with state:" + nodeState);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java
index ba8af98..245d9a6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeUpdateEvent.java
@@ -19,13 +19,20 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 public class RMAppNodeUpdateEvent extends RMAppEvent {
 
   public enum RMAppNodeUpdateType {
     NODE_USABLE, 
-    NODE_UNUSABLE
+    NODE_UNUSABLE,
+    NODE_DECOMMISSIONING;
+
+    public static NodeUpdateType convertToNodeUpdateType(
+        RMAppNodeUpdateType rmAppNodeUpdateType) {
+      return NodeUpdateType.valueOf(rmAppNodeUpdateType.name());
+    }
   }
 
   private final RMNode node;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index ab15c95..328c040 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -183,7 +183,7 @@ public interface RMNode {
   void setUntrackedTimeStamp(long timeStamp);
   /*
    * Optional decommissioning timeout in second
-   * (null indicates default timeout).
+   * (null indicates absent timeout).
    * @return the decommissioning timeout in second.
    */
   Integer getDecommissioningTimeout();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index d0bfecf..2b013a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -1160,6 +1160,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       // Update NM metrics during graceful decommissioning.
       rmNode.updateMetricsForGracefulDecommission(initState, finalState);
       rmNode.decommissioningTimeout = timeout;
+      // Notify NodesListManager to notify all RMApp so that each
+      // Application Master could take any required actions.
+      rmNode.context.getDispatcher().getEventHandler().handle(
+          new NodesListManagerEvent(
+              NodesListManagerEventType.NODE_DECOMMISSIONING, rmNode));
       if (rmNode.originalTotalCapability == null){
         rmNode.originalTotalCapability =
             Resources.clone(rmNode.totalCapability);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index d128b02..711f008 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecommissioningEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
@@ -917,12 +918,26 @@ public class MockRM extends ResourceManager {
         node.getState());
   }
 
+  public void sendNodeGracefulDecommission(
+      MockNM nm, int timeout) throws Exception {
+    RMNodeImpl node = (RMNodeImpl)
+        getRMContext().getRMNodes().get(nm.getNodeId());
+    Assert.assertNotNull("node shouldn't be null", node);
+    node.handle(new RMNodeDecommissioningEvent(nm.getNodeId(), timeout));
+  }
+
   public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception {
     RMNodeImpl node = (RMNodeImpl)
         getRMContext().getRMNodes().get(nm.getNodeId());
+    Assert.assertNotNull("node shouldn't be null", node);
     node.handle(new RMNodeEvent(nm.getNodeId(), event));
   }
 
+  public Integer getDecommissioningTimeout(NodeId nodeid) {
+    return this.getRMContext().getRMNodes()
+        .get(nodeid).getDecommissioningTimeout();
+  }
+
   public KillApplicationResponse killApp(ApplicationId appId) throws Exception {
     ApplicationClientProtocol client = getClientRMService();
     KillApplicationRequest req = KillApplicationRequest.newInstance(appId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index ae57dfb..1c50dd3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -181,6 +181,50 @@ public class TestClientRMService {
   private final static String QUEUE_2 = "Q-2";
 
   @Test
+  public void testGetDecommissioningClusterNodes() throws Exception {
+    MockRM rm = new MockRM() {
+      protected ClientRMService createClientRMService() {
+        return new ClientRMService(this.rmContext, scheduler,
+            this.rmAppManager, this.applicationACLsManager,
+            this.queueACLsManager,
+            this.getRMContext().getRMDelegationTokenSecretManager());
+      };
+    };
+    rm.start();
+
+    int nodeMemory = 1024;
+    MockNM nm1 = rm.registerNode("host1:1234", nodeMemory);
+    rm.sendNodeStarted(nm1);
+    nm1.nodeHeartbeat(true);
+    rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
+    Integer decommissioningTimeout = 600;
+    rm.sendNodeGracefulDecommission(nm1, decommissioningTimeout);
+    rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING);
+
+    // Create a client.
+    Configuration conf = new Configuration();
+    YarnRPC rpc = YarnRPC.create(conf);
+    InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
+    LOG.info("Connecting to ResourceManager at " + rmAddress);
+    ApplicationClientProtocol client =
+        (ApplicationClientProtocol) rpc
+            .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
+
+    // Make call
+    List<NodeReport> nodeReports = client.getClusterNodes(
+        GetClusterNodesRequest.newInstance(
+            EnumSet.of(NodeState.DECOMMISSIONING)))
+        .getNodeReports();
+    Assert.assertEquals(1, nodeReports.size());
+    NodeReport nr = nodeReports.iterator().next();
+    Assert.assertEquals(decommissioningTimeout, nr.getDecommissioningTimeout());
+    Assert.assertNull(nr.getNodeUpdateType());
+
+    rpc.stopProxy(client, conf);
+    rm.close();
+  }
+
+  @Test
   public void testGetClusterNodes() throws Exception {
     MockRM rm = new MockRM() {
       protected ClientRMService createClientRMService() {
@@ -228,6 +272,8 @@ public class TestClientRMService {
     
     // Check node's label = x
     Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("x"));
+    Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout());
+    Assert.assertNull(nodeReports.get(0).getNodeUpdateType());
 
     // Now make the node unhealthy.
     node.nodeHeartbeat(false);
@@ -251,6 +297,8 @@ public class TestClientRMService {
         nodeReports.get(0).getNodeState());
     
     Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("y"));
+    Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout());
+    Assert.assertNull(nodeReports.get(0).getNodeUpdateType());
     
     // Remove labels of host1
     map = new HashMap<NodeId, Set<String>>();
@@ -267,6 +315,8 @@ public class TestClientRMService {
     for (NodeReport report : nodeReports) {
       Assert.assertTrue(report.getNodeLabels() != null
           && report.getNodeLabels().isEmpty());
+      Assert.assertNull(report.getDecommissioningTimeout());
+      Assert.assertNull(report.getNodeUpdateType());
     }
 
     rpc.stopProxy(client, conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java
index 690de30..4371156 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -69,7 +68,8 @@ public class TestDecommissioningNodesWatcher {
     MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
 
     // Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher.
-    rm.sendNodeEvent(nm1, RMNodeEventType.GRACEFUL_DECOMMISSION);
+    rm.sendNodeGracefulDecommission(nm1,
+        YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
     rm.waitForState(id1, NodeState.DECOMMISSIONING);
 
     // Update status with decreasing number of running containers until 0.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index ba806ab..3657123 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -27,7 +27,9 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 
@@ -98,13 +100,16 @@ public class TestRMNodeTransitions {
   }
 
   private NodesListManagerEvent nodesListManagerEvent = null;
-  
+  private List<NodeState> nodesListManagerEventsNodeStateSequence =
+      new LinkedList<>();
+
   private class TestNodeListManagerEventDispatcher implements
       EventHandler<NodesListManagerEvent> {
     
     @Override
     public void handle(NodesListManagerEvent event) {
       nodesListManagerEvent = event;
+      nodesListManagerEventsNodeStateSequence.add(event.getNode().getState());
     }
 
   }
@@ -150,7 +155,7 @@ public class TestRMNodeTransitions {
     NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
     node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
     nodesListManagerEvent =  null;
-
+    nodesListManagerEventsNodeStateSequence.clear();
   }
   
   @After
@@ -721,6 +726,8 @@ public class TestRMNodeTransitions {
     node.handle(new RMNodeEvent(node.getNodeID(),
         RMNodeEventType.GRACEFUL_DECOMMISSION));
     Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
+    Assert.assertEquals(Arrays.asList(NodeState.NEW, NodeState.RUNNING),
+        nodesListManagerEventsNodeStateSequence);
     Assert
         .assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
     Assert.assertEquals("Decommissioning Nodes", initialDecommissioning + 1,
@@ -1008,7 +1015,7 @@ public class TestRMNodeTransitions {
 
     Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
     Assert.assertNotNull(nodesListManagerEvent);
-    Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
+    Assert.assertEquals(NodesListManagerEventType.NODE_DECOMMISSIONING,
         nodesListManagerEvent.getType());
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index 1cb2d0d..fc6326e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -39,7 +39,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import javax.xml.transform.OutputKeys;
+
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.metrics2.MetricsSystem;
@@ -99,14 +106,19 @@ import org.apache.hadoop.yarn.util.YarnVersionInfo;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
 
 public class TestResourceTrackerService extends NodeLabelTestBase {
 
   private final static File TEMP_DIR = new File(System.getProperty(
       "test.build.data", "/tmp"), "decommision");
-  private final File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
+  private final File hostFile =
+      new File(TEMP_DIR + File.separator + "hostFile.txt");
   private final File excludeHostFile = new File(TEMP_DIR + File.separator +
       "excludeHostFile.txt");
+  private final File excludeHostXmlFile =
+      new File(TEMP_DIR + File.separator + "excludeHostFile.xml");
 
   private MockRM rm;
 
@@ -291,6 +303,67 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction());
   }
 
+  @Test
+  public void testGracefulDecommissionDefaultTimeoutResolution()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostXmlFile
+        .getAbsolutePath());
+
+    writeToHostsXmlFile(excludeHostXmlFile, Pair.of("", null));
+    rm = new MockRM(conf);
+    rm.start();
+
+    int nodeMemory = 1024;
+    MockNM nm1 = rm.registerNode("host1:1234", nodeMemory);
+    MockNM nm2 = rm.registerNode("host2:5678", nodeMemory);
+    MockNM nm3 = rm.registerNode("host3:9101", nodeMemory);
+
+    NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true);
+    NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true);
+    NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true);
+
+    Assert.assertTrue(
+        NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction()));
+    Assert.assertTrue(
+        NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction()));
+    Assert.assertTrue(
+        NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction()));
+
+    rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
+    rm.waitForState(nm2.getNodeId(), NodeState.RUNNING);
+    rm.waitForState(nm3.getNodeId(), NodeState.RUNNING);
+
+    // Graceful decommission both host1 and host2, with
+    // non default timeout for host1
+    final Integer nm1DecommissionTimeout = 20;
+    writeToHostsXmlFile(
+        excludeHostXmlFile,
+        Pair.of(nm1.getNodeId().getHost(), nm1DecommissionTimeout),
+        Pair.of(nm2.getNodeId().getHost(), null));
+    rm.getNodesListManager().refreshNodes(conf, true);
+    rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING);
+    rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING);
+    Assert.assertEquals(
+        nm1DecommissionTimeout, rm.getDecommissioningTimeout(nm1.getNodeId()));
+    Integer defaultDecTimeout =
+        conf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
+            YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
+    Assert.assertEquals(
+        defaultDecTimeout, rm.getDecommissioningTimeout(nm2.getNodeId()));
+
+    // Graceful decommission host3 with a new default timeout
+    final Integer newDefaultDecTimeout = defaultDecTimeout + 10;
+    writeToHostsXmlFile(
+        excludeHostXmlFile, Pair.of(nm3.getNodeId().getHost(), null));
+    conf.setInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
+        newDefaultDecTimeout);
+    rm.getNodesListManager().refreshNodes(conf, true);
+    rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING);
+    Assert.assertEquals(
+        newDefaultDecTimeout, rm.getDecommissioningTimeout(nm3.getNodeId()));
+  }
+
   /**
    * Graceful decommission node with running application.
    */
@@ -1967,16 +2040,20 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     rm.stop();
   }
 
+  private void ensureFileExists(File file) throws IOException {
+    if (!file.exists()) {
+      TEMP_DIR.mkdirs();
+      file.createNewFile();
+    }
+  }
+
   private void writeToHostsFile(String... hosts) throws IOException {
     writeToHostsFile(hostFile, hosts);
   }
 
   private void writeToHostsFile(File file, String... hosts)
       throws IOException {
-    if (!file.exists()) {
-      TEMP_DIR.mkdirs();
-      file.createNewFile();
-    }
+    ensureFileExists(file);
     FileOutputStream fStream = null;
     try {
       fStream = new FileOutputStream(file);
@@ -1992,6 +2069,33 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     }
   }
 
+  private void writeToHostsXmlFile(
+      File file, Pair<String, Integer>... hostsAndTimeouts) throws Exception {
+    ensureFileExists(file);
+    DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
+    Document doc = dbFactory.newDocumentBuilder().newDocument();
+    Element hosts = doc.createElement("hosts");
+    doc.appendChild(hosts);
+    for (Pair<String, Integer> hostsAndTimeout : hostsAndTimeouts) {
+      Element host = doc.createElement("host");
+      hosts.appendChild(host);
+      Element name = doc.createElement("name");
+      host.appendChild(name);
+      name.appendChild(doc.createTextNode(hostsAndTimeout.getLeft()));
+      if (hostsAndTimeout.getRight() != null) {
+        Element timeout = doc.createElement("timeout");
+        host.appendChild(timeout);
+        timeout.appendChild(
+            doc.createTextNode(hostsAndTimeout.getRight().toString())
+        );
+      }
+    }
+    TransformerFactory transformerFactory = TransformerFactory.newInstance();
+    Transformer transformer = transformerFactory.newTransformer();
+    transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+    transformer.transform(new DOMSource(doc), new StreamResult(file));
+  }
+
   private void checkDecommissionedNMCount(MockRM rm, int count)
       throws InterruptedException {
     int waitCount = 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index 399df02..9ef48db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -154,7 +154,7 @@ public abstract class MockAsm extends MockApps {
       throw new UnsupportedOperationException("Not supported yet.");
     }
     @Override
-    public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
+    public int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> updatedNodes) {
       throw new UnsupportedOperationException("Not supported yet.");
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
index f9f0b74..dd9bcd0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@@ -82,6 +83,13 @@ public class TestAMRMRPCNodeUpdates {
     rm.drainEvents();
   }
 
+  private void syncNodeGracefulDecommission(
+      MockNM nm, int timeout) throws Exception {
+    rm.sendNodeGracefulDecommission(nm, timeout);
+    rm.waitForState(nm.getNodeId(), NodeState.DECOMMISSIONING);
+    rm.drainEvents();
+  }
+
   private AllocateResponse allocate(final ApplicationAttemptId attemptId,
       final AllocateRequest req) throws Exception {
     UserGroupInformation ugi =
@@ -99,6 +107,39 @@ public class TestAMRMRPCNodeUpdates {
   }
 
   @Test
+  public void testAMRMDecommissioningNodes() throws Exception {
+    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10000);
+    MockNM nm2 = rm.registerNode("127.0.0.2:1234", 10000);
+    rm.drainEvents();
+
+    RMApp app1 = rm.submitApp(2000);
+
+    // Trigger the scheduling so the AM gets 'launched' on nm1
+    nm1.nodeHeartbeat(true);
+
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+
+    // register AM returns no unusable node
+    am1.registerAppAttempt();
+
+    Integer decommissioningTimeout = 600;
+    syncNodeGracefulDecommission(nm2, decommissioningTimeout);
+
+    AllocateRequest allocateRequest1 =
+        AllocateRequest.newInstance(0, 0F, null, null, null);
+    AllocateResponse response1 =
+        allocate(attempt1.getAppAttemptId(), allocateRequest1);
+    List<NodeReport> updatedNodes = response1.getUpdatedNodes();
+    Assert.assertEquals(1, updatedNodes.size());
+    NodeReport nr = updatedNodes.iterator().next();
+    Assert.assertEquals(
+        decommissioningTimeout, nr.getDecommissioningTimeout());
+    Assert.assertEquals(
+        NodeUpdateType.NODE_DECOMMISSIONING, nr.getNodeUpdateType());
+  }
+
+  @Test
   public void testAMRMUnusableNodes() throws Exception {
     
     MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10000);
@@ -138,6 +179,8 @@ public class TestAMRMRPCNodeUpdates {
     NodeReport nr = updatedNodes.iterator().next();
     Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
     Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
+    Assert.assertNull(nr.getDecommissioningTimeout());
+    Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType());
     
     // resending the allocate request returns the same result
     response1 = allocate(attempt1.getAppAttemptId(), allocateRequest1);
@@ -146,6 +189,8 @@ public class TestAMRMRPCNodeUpdates {
     nr = updatedNodes.iterator().next();
     Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
     Assert.assertEquals(NodeState.UNHEALTHY, nr.getNodeState());
+    Assert.assertNull(nr.getDecommissioningTimeout());
+    Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType());
 
     syncNodeLost(nm3);
     
@@ -159,6 +204,8 @@ public class TestAMRMRPCNodeUpdates {
     nr = updatedNodes.iterator().next();
     Assert.assertEquals(nm3.getNodeId(), nr.getNodeId());
     Assert.assertEquals(NodeState.LOST, nr.getNodeState());
+    Assert.assertNull(nr.getDecommissioningTimeout());
+    Assert.assertEquals(NodeUpdateType.NODE_UNUSABLE, nr.getNodeUpdateType());
         
     // registering another AM gives it the complete failed list
     RMApp app2 = rm.submitApp(2000);
@@ -190,6 +237,8 @@ public class TestAMRMRPCNodeUpdates {
     nr = updatedNodes.iterator().next();
     Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
     Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
+    Assert.assertNull(nr.getDecommissioningTimeout());
+    Assert.assertEquals(NodeUpdateType.NODE_USABLE, nr.getNodeUpdateType());
     
     allocateRequest2 =
         AllocateRequest.newInstance(response2.getResponseId(), 0F, null, null,
@@ -200,6 +249,8 @@ public class TestAMRMRPCNodeUpdates {
     nr = updatedNodes.iterator().next();
     Assert.assertEquals(nm4.getNodeId(), nr.getNodeId());
     Assert.assertEquals(NodeState.RUNNING, nr.getNodeState());
+    Assert.assertNull(nr.getDecommissioningTimeout());
+    Assert.assertEquals(NodeUpdateType.NODE_USABLE, nr.getNodeUpdateType());
 
     // subsequent allocate calls should return no updated nodes
     allocateRequest2 =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b46ca7e7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index 39a7f99..6c64a67 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
-import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -36,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeUpdateType;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -235,7 +235,7 @@ public class MockRMApp implements RMApp {
   }
 
   @Override
-  public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
+  public int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> updatedNodes) {
     throw new UnsupportedOperationException("Not supported yet.");
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message