hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [2/2] hadoop git commit: YARN-2495. Allow admin specify labels from each NM (Distributed configuration for node label). (Naganarasimha G R via wangda)
Date Mon, 30 Mar 2015 19:10:16 GMT
YARN-2495. Allow admin specify labels from each NM (Distributed configuration for node label). (Naganarasimha G R via wangda)

(cherry picked from commit 2a945d24f7de1a7ae6e7bd6636188ce3b55c7f52)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cba4ed16
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cba4ed16
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cba4ed16

Branch: refs/heads/branch-2
Commit: cba4ed1678b70745f5f03be9a8129fdf26bccc72
Parents: dd5b2da
Author: Wangda Tan <wangda@apache.org>
Authored: Mon Mar 30 12:04:51 2015 -0700
Committer: Wangda Tan <wangda@apache.org>
Committed: Mon Mar 30 12:05:54 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  12 +
 .../src/main/proto/yarn_protos.proto            |   4 +
 .../yarn/client/TestResourceTrackerOnHA.java    |   2 +-
 .../protocolrecords/NodeHeartbeatRequest.java   |   8 +-
 .../protocolrecords/NodeHeartbeatResponse.java  |   3 +
 .../RegisterNodeManagerRequest.java             |  12 +
 .../RegisterNodeManagerResponse.java            |   3 +
 .../impl/pb/NodeHeartbeatRequestPBImpl.java     |  37 ++
 .../impl/pb/NodeHeartbeatResponsePBImpl.java    |  13 +
 .../pb/RegisterNodeManagerRequestPBImpl.java    |  48 ++-
 .../pb/RegisterNodeManagerResponsePBImpl.java   |  13 +
 .../yarn_server_common_service_protos.proto     |   4 +
 .../hadoop/yarn/TestYarnServerApiClasses.java   |  94 ++++
 .../yarn/server/nodemanager/NodeManager.java    |  34 +-
 .../nodemanager/NodeStatusUpdaterImpl.java      | 114 ++++-
 .../nodelabels/NodeLabelsProvider.java          |  43 ++
 .../nodemanager/TestNodeStatusUpdater.java      |   2 +-
 .../TestNodeStatusUpdaterForLabels.java         | 281 ++++++++++++
 .../resourcemanager/ResourceTrackerService.java |  80 +++-
 .../TestResourceTrackerService.java             | 430 ++++++++++++++++++-
 21 files changed, 1199 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index c36649e..d2850ad 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -35,6 +35,9 @@ Release 2.8.0 - UNRELEASED
 
     YARN-3288. Document and fix indentation in the DockerContainerExecutor code
 
+    YARN-2495. Allow admin specify labels from each NM (Distributed 
+    configuration for node label). (Naganarasimha G R via wangda)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index be5471d..a25cfe9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1719,6 +1719,18 @@ public class YarnConfiguration extends Configuration {
   public static final String NODE_LABELS_ENABLED = NODE_LABELS_PREFIX
       + "enabled";
   public static final boolean DEFAULT_NODE_LABELS_ENABLED = false;
+  
+  public static final String NODELABEL_CONFIGURATION_TYPE =
+      NODE_LABELS_PREFIX + "configuration-type";
+  
+  public static final String CENTALIZED_NODELABEL_CONFIGURATION_TYPE =
+      "centralized";
+  
+  public static final String DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE =
+      "distributed";
+  
+  public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE =
+      CENTALIZED_NODELABEL_CONFIGURATION_TYPE;
 
   public YarnConfiguration() {
     super();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 194be82..b396f4d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -239,6 +239,10 @@ message NodeIdToLabelsProto {
   repeated string nodeLabels = 2;
 }
 
+message StringArrayProto {
+  repeated string elements = 1;
+}
+
 message LabelsToNodeIdsProto {
   optional string nodeLabels = 1;
   repeated NodeIdProto nodeId = 2;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
index 8885769..8167a58 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
@@ -70,7 +70,7 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{
         NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
             null, null);
     NodeHeartbeatRequest request2 =
-        NodeHeartbeatRequest.newInstance(status, null, null);
+        NodeHeartbeatRequest.newInstance(status, null, null,null);
     resourceTracker.nodeHeartbeat(request2);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
index addd3fe..b80d9ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
+import java.util.Set;
+
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.util.Records;
@@ -26,7 +28,7 @@ public abstract class NodeHeartbeatRequest {
   
   public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
       MasterKey lastKnownContainerTokenMasterKey,
-      MasterKey lastKnownNMTokenMasterKey) {
+      MasterKey lastKnownNMTokenMasterKey, Set<String> nodeLabels) {
     NodeHeartbeatRequest nodeHeartbeatRequest =
         Records.newRecord(NodeHeartbeatRequest.class);
     nodeHeartbeatRequest.setNodeStatus(nodeStatus);
@@ -34,6 +36,7 @@ public abstract class NodeHeartbeatRequest {
         .setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey);
     nodeHeartbeatRequest
         .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
+    nodeHeartbeatRequest.setNodeLabels(nodeLabels);
     return nodeHeartbeatRequest;
   }
 
@@ -45,4 +48,7 @@ public abstract class NodeHeartbeatRequest {
   
   public abstract MasterKey getLastKnownNMTokenMasterKey();
   public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey);
+  
+  public abstract Set<String> getNodeLabels();
+  public abstract void setNodeLabels(Set<String> nodeLabels);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index 9fb44ca..1498a0c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -67,4 +67,7 @@ public interface NodeHeartbeatResponse {
 
   void setSystemCredentialsForApps(
       Map<ApplicationId, ByteBuffer> systemCredentials);
+  
+  boolean getAreNodeLabelsAcceptedByRM();
+  void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
index 366c32c..bf09b33 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -31,6 +32,14 @@ public abstract class RegisterNodeManagerRequest {
       int httpPort, Resource resource, String nodeManagerVersionId,
       List<NMContainerStatus> containerStatuses,
       List<ApplicationId> runningApplications) {
+    return newInstance(nodeId, httpPort, resource, nodeManagerVersionId,
+        containerStatuses, runningApplications, null);
+  }
+
+  public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
+      int httpPort, Resource resource, String nodeManagerVersionId,
+      List<NMContainerStatus> containerStatuses,
+      List<ApplicationId> runningApplications, Set<String> nodeLabels) {
     RegisterNodeManagerRequest request =
         Records.newRecord(RegisterNodeManagerRequest.class);
     request.setHttpPort(httpPort);
@@ -39,6 +48,7 @@ public abstract class RegisterNodeManagerRequest {
     request.setNMVersion(nodeManagerVersionId);
     request.setContainerStatuses(containerStatuses);
     request.setRunningApplications(runningApplications);
+    request.setNodeLabels(nodeLabels);
     return request;
   }
   
@@ -47,6 +57,8 @@ public abstract class RegisterNodeManagerRequest {
   public abstract Resource getResource();
   public abstract String getNMVersion();
   public abstract List<NMContainerStatus> getNMContainerStatuses();
+  public abstract Set<String> getNodeLabels();
+  public abstract void setNodeLabels(Set<String> nodeLabels);
   
   /**
    * We introduce this here because currently YARN RM doesn't persist nodes info

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
index b20803f..c8678f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
@@ -45,4 +45,7 @@ public interface RegisterNodeManagerResponse {
   void setRMVersion(String version);
 
   String getRMVersion();
+  
+  boolean getAreNodeLabelsAcceptedByRM();
+  void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index 26d1f19..16d47f9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
@@ -36,6 +41,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   private NodeStatus nodeStatus = null;
   private MasterKey lastKnownContainerTokenMasterKey = null;
   private MasterKey lastKnownNMTokenMasterKey = null;
+  private Set<String> labels = null;
   
   public NodeHeartbeatRequestPBImpl() {
     builder = NodeHeartbeatRequestProto.newBuilder();
@@ -80,6 +86,11 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
       builder.setLastKnownNmTokenMasterKey(
           convertToProtoFormat(this.lastKnownNMTokenMasterKey));
     }
+    if (this.labels != null) {
+      builder.clearNodeLabels();
+      builder.setNodeLabels(StringArrayProto.newBuilder()
+          .addAllElements(this.labels).build());
+    }
   }
 
   private void mergeLocalToProto() {
@@ -178,4 +189,30 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   private MasterKeyProto convertToProtoFormat(MasterKey t) {
     return ((MasterKeyPBImpl)t).getProto();
   }
+
+  @Override
+  public Set<String> getNodeLabels() {
+    initNodeLabels();
+    return this.labels;
+  }
+
+  @Override
+  public void setNodeLabels(Set<String> nodeLabels) {
+    maybeInitBuilder();
+    builder.clearNodeLabels();
+    this.labels = nodeLabels;
+  }
+  
+  private void initNodeLabels() {
+    if (this.labels != null) {
+      return;
+    }
+    NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasNodeLabels()) {
+      labels = null;
+      return;
+    }
+    StringArrayProto nodeLabels = p.getNodeLabels();
+    labels = new HashSet<String>(nodeLabels.getElementsList());
+  }
 }  

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index 630a5bf..e27d8ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -483,5 +483,18 @@ public class NodeHeartbeatResponsePBImpl extends
   private MasterKeyProto convertToProtoFormat(MasterKey t) {
     return ((MasterKeyPBImpl) t).getProto();
   }
+
+  @Override
+  public boolean getAreNodeLabelsAcceptedByRM() {
+    NodeHeartbeatResponseProtoOrBuilder p =
+        this.viaProto ? this.proto : this.builder;
+    return p.getAreNodeLabelsAcceptedByRM();
+  }
+
+  @Override
+  public void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM) {
+    maybeInitBuilder();
+    this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
index ce4faec..1d2bb82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
@@ -20,32 +20,27 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
-
-
     
 public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest {
   RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
@@ -56,7 +51,8 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
   private NodeId nodeId = null;
   private List<NMContainerStatus> containerStatuses = null;
   private List<ApplicationId> runningApplications = null;
-  
+  private Set<String> labels = null;
+
   public RegisterNodeManagerRequestPBImpl() {
     builder = RegisterNodeManagerRequestProto.newBuilder();
   }
@@ -86,7 +82,11 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
     if (this.nodeId != null) {
       builder.setNodeId(convertToProtoFormat(this.nodeId));
     }
-
+    if (this.labels != null) {
+      builder.clearNodeLabels();
+      builder.setNodeLabels(StringArrayProto.newBuilder()
+          .addAllElements(this.labels).build());
+    }
   }
 
   private synchronized void addNMContainerStatusesToProto() {
@@ -292,6 +292,32 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
     builder.setNmVersion(version);
   }
   
+  @Override
+  public Set<String> getNodeLabels() {
+    initNodeLabels();
+    return this.labels;
+  }
+
+  @Override
+  public void setNodeLabels(Set<String> nodeLabels) {
+    maybeInitBuilder();
+    builder.clearNodeLabels();
+    this.labels = nodeLabels;
+  }
+  
+  private void initNodeLabels() {
+    if (this.labels != null) {
+      return;
+    }
+    RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasNodeLabels()) {
+      labels=null;
+      return;
+    }
+    StringArrayProto nodeLabels = p.getNodeLabels();
+    labels = new HashSet<String>(nodeLabels.getElementsList());
+  }
+
   private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
     return new ApplicationIdPBImpl(p);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java
index ac329ed..391d00d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java
@@ -216,4 +216,17 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
   private MasterKeyProto convertToProtoFormat(MasterKey t) {
     return ((MasterKeyPBImpl)t).getProto();
   }
+
+  @Override
+  public boolean getAreNodeLabelsAcceptedByRM() {
+    RegisterNodeManagerResponseProtoOrBuilder p =
+        this.viaProto ? this.proto : this.builder;
+    return p.getAreNodeLabelsAcceptedByRM();
+  }
+
+  @Override
+  public void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM) {
+    maybeInitBuilder();
+    this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM);
+  }
 }  

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 91473c5..d8c92c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -32,6 +32,7 @@ message RegisterNodeManagerRequestProto {
   optional string nm_version = 5;
   repeated NMContainerStatusProto container_statuses = 6;
   repeated ApplicationIdProto runningApplications = 7;
+  optional StringArrayProto nodeLabels = 8;
 }
 
 message RegisterNodeManagerResponseProto {
@@ -41,12 +42,14 @@ message RegisterNodeManagerResponseProto {
   optional int64 rm_identifier = 4;
   optional string diagnostics_message = 5;
   optional string rm_version = 6;
+  optional bool areNodeLabelsAcceptedByRM = 7 [default = false];
 }
 
 message NodeHeartbeatRequestProto {
   optional NodeStatusProto node_status = 1;
   optional MasterKeyProto last_known_container_token_master_key = 2;
   optional MasterKeyProto last_known_nm_token_master_key = 3;
+  optional StringArrayProto nodeLabels = 4;
 }
 
 message NodeHeartbeatResponseProto {
@@ -60,6 +63,7 @@ message NodeHeartbeatResponseProto {
   optional string diagnostics_message = 8;
   repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
   repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
+  optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
 }
 
 message SystemCredentialsForAppsProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
index 20983b6..d42b2c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
@@ -19,11 +19,13 @@
 package org.apache.hadoop.yarn;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
@@ -46,6 +49,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;
+import org.junit.Assert;
 import org.junit.Test;
 
 /**
@@ -77,7 +81,17 @@ public class TestYarnServerApiClasses {
     assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
     assertEquals(NodeAction.NORMAL, copy.getNodeAction());
     assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
+    assertFalse(copy.getAreNodeLabelsAcceptedByRM());
+  }
 
+  @Test
+  public void testRegisterNodeManagerResponsePBImplWithRMAcceptLbls() {
+    RegisterNodeManagerResponsePBImpl original =
+        new RegisterNodeManagerResponsePBImpl();
+    original.setAreNodeLabelsAcceptedByRM(true);
+    RegisterNodeManagerResponsePBImpl copy =
+        new RegisterNodeManagerResponsePBImpl(original.getProto());
+    assertTrue(copy.getAreNodeLabelsAcceptedByRM());
   }
 
   /**
@@ -89,11 +103,32 @@ public class TestYarnServerApiClasses {
     original.setLastKnownContainerTokenMasterKey(getMasterKey());
     original.setLastKnownNMTokenMasterKey(getMasterKey());
     original.setNodeStatus(getNodeStatus());
+    original.setNodeLabels(getValidNodeLabels());
     NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl(
         original.getProto());
     assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId());
     assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId());
     assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost());
+    // check labels are coming with valid values
+    Assert.assertTrue(original.getNodeLabels()
+        .containsAll(copy.getNodeLabels()));
+    // check for empty labels
+    original.setNodeLabels(new HashSet<String> ());
+    copy = new NodeHeartbeatRequestPBImpl(
+        original.getProto());
+    Assert.assertNotNull(copy.getNodeLabels());
+    Assert.assertEquals(0, copy.getNodeLabels().size());
+  }
+
+  /**
+   * Test NodeHeartbeatRequestPBImpl.
+   */
+  @Test
+  public void testNodeHeartbeatRequestPBImplWithNullLabels() {
+    NodeHeartbeatRequestPBImpl original = new NodeHeartbeatRequestPBImpl();
+    NodeHeartbeatRequestPBImpl copy =
+        new NodeHeartbeatRequestPBImpl(original.getProto());
+    Assert.assertNull(copy.getNodeLabels());
   }
 
   /**
@@ -119,6 +154,16 @@ public class TestYarnServerApiClasses {
     assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());
     assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
     assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
+    assertEquals(false, copy.getAreNodeLabelsAcceptedByRM());
+   }
+
+  @Test
+  public void testNodeHeartbeatResponsePBImplWithRMAcceptLbls() {
+    NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl();
+    original.setAreNodeLabelsAcceptedByRM(true);
+    NodeHeartbeatResponsePBImpl copy =
+        new NodeHeartbeatResponsePBImpl(original.getProto());
+    assertTrue(copy.getAreNodeLabelsAcceptedByRM());
   }
 
   /**
@@ -208,6 +253,55 @@ public class TestYarnServerApiClasses {
 
   }
 
+  @Test
+  public void testRegisterNodeManagerRequestWithNullLabels() {
+    RegisterNodeManagerRequest request =
+        RegisterNodeManagerRequest.newInstance(
+            NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0),
+            "version", null, null);
+
+    // serialze to proto, and get request from proto
+    RegisterNodeManagerRequest request1 =
+        new RegisterNodeManagerRequestPBImpl(
+            ((RegisterNodeManagerRequestPBImpl) request).getProto());
+
+    // check labels are coming with no values
+    Assert.assertNull(request1.getNodeLabels());
+  }
+
+  @Test
+  public void testRegisterNodeManagerRequestWithValidLabels() {
+    HashSet<String> nodeLabels = getValidNodeLabels();
+    RegisterNodeManagerRequest request =
+        RegisterNodeManagerRequest.newInstance(
+            NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0),
+            "version", null, null, nodeLabels);
+
+    // serialze to proto, and get request from proto
+    RegisterNodeManagerRequest copy =
+        new RegisterNodeManagerRequestPBImpl(
+            ((RegisterNodeManagerRequestPBImpl) request).getProto());
+
+    // check labels are coming with valid values
+    Assert.assertEquals(true, nodeLabels.containsAll(copy.getNodeLabels()));
+
+    // check for empty labels
+    request.setNodeLabels(new HashSet<String> ());
+    copy = new RegisterNodeManagerRequestPBImpl(
+        ((RegisterNodeManagerRequestPBImpl) request).getProto());
+    Assert.assertNotNull(copy.getNodeLabels());
+    Assert.assertEquals(0, copy.getNodeLabels().size());
+  }
+
+  private HashSet<String> getValidNodeLabels() {
+    HashSet<String> nodeLabels = new HashSet<String>();
+    nodeLabels.add("java");
+    nodeLabels.add("windows");
+    nodeLabels.add("gpu");
+    nodeLabels.add("x86");
+    return nodeLabels;
+  }
+
   private ContainerStatus getContainerStatus(int applicationId,
       int containerID, int appAttemptId) {
     ContainerStatus status = recordFactory

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 016447c..5727f10 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@@ -79,6 +80,7 @@ public class NodeManager extends CompositeService
   protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
   private ApplicationACLsManager aclsManager;
   private NodeHealthCheckerService nodeHealthChecker;
+  private NodeLabelsProvider nodeLabelsProvider;
   private LocalDirsHandlerService dirsHandler;
   private Context context;
   private AsyncDispatcher dispatcher;
@@ -97,7 +99,22 @@ public class NodeManager extends CompositeService
   protected NodeStatusUpdater createNodeStatusUpdater(Context context,
       Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
     return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
-      metrics);
+        metrics, nodeLabelsProvider);
+  }
+
+  protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+      Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+      NodeLabelsProvider nodeLabelsProvider) {
+    return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
+        metrics, nodeLabelsProvider);
+  }
+
+  @VisibleForTesting
+  protected NodeLabelsProvider createNodeLabelsProvider(
+      Configuration conf) throws IOException {
+    // TODO as part of YARN-2729
+    // Need to get the implementation of provider service and return
+    return null;
   }
 
   protected NodeResourceMonitor createNodeResourceMonitor() {
@@ -223,9 +240,18 @@ public class NodeManager extends CompositeService
 
     this.context = createNMContext(containerTokenSecretManager,
         nmTokenSecretManager, nmStore);
-    
-    nodeStatusUpdater =
-        createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
+
+    nodeLabelsProvider = createNodeLabelsProvider(conf);
+
+    if (null == nodeLabelsProvider) {
+      nodeStatusUpdater =
+          createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
+    } else {
+      addService(nodeLabelsProvider);
+      nodeStatusUpdater =
+          createNodeStatusUpdater(context, dispatcher, nodeHealthChecker,
+              nodeLabelsProvider);
+    }
 
     NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
     addService(nodeResourceMonitor);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 6ddd7e4..2549e0f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.ServerRMProxy;
@@ -70,6 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -120,15 +123,25 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER;
   Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>();
 
+  private final NodeLabelsProvider nodeLabelsProvider;
+  private final boolean hasNodeLabelsProvider;
+
   public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
       NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+    this(context, dispatcher, healthChecker, metrics, null);
+  }
+
+  public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
+      NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
+      NodeLabelsProvider nodeLabelsProvider) {
     super(NodeStatusUpdaterImpl.class.getName());
     this.healthChecker = healthChecker;
+    this.nodeLabelsProvider = nodeLabelsProvider;
+    this.hasNodeLabelsProvider = (nodeLabelsProvider != null);
     this.context = context;
     this.dispatcher = dispatcher;
     this.metrics = metrics;
-    this.recentlyStoppedContainers =
-        new LinkedHashMap<ContainerId, Long>();
+    this.recentlyStoppedContainers = new LinkedHashMap<ContainerId, Long>();
     this.pendingCompletedContainers =
         new HashMap<ContainerId, ContainerStatus>();
   }
@@ -253,22 +266,30 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   protected void registerWithRM()
       throws YarnException, IOException {
     List<NMContainerStatus> containerReports = getNMContainerStatuses();
+    Set<String> nodeLabels = null;
+    if (hasNodeLabelsProvider) {
+      nodeLabels = nodeLabelsProvider.getNodeLabels();
+      nodeLabels =
+          (null == nodeLabels) ? CommonNodeLabelsManager.EMPTY_STRING_SET
+              : nodeLabels;
+    }
     RegisterNodeManagerRequest request =
         RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
-          nodeManagerVersionId, containerReports, getRunningApplications());
+            nodeManagerVersionId, containerReports, getRunningApplications(),
+            nodeLabels);
     if (containerReports != null) {
       LOG.info("Registering with RM using containers :" + containerReports);
     }
     RegisterNodeManagerResponse regNMResponse =
         resourceTracker.registerNodeManager(request);
     this.rmIdentifier = regNMResponse.getRMIdentifier();
-    // if the Resourcemanager instructs NM to shutdown.
+    // if the Resource Manager instructs NM to shutdown.
     if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
       String message =
           "Message from ResourceManager: "
               + regNMResponse.getDiagnosticsMessage();
       throw new YarnRuntimeException(
-        "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed, "
+        "Recieved SHUTDOWN signal from Resourcemanager, Registration of NodeManager failed, "
             + message);
     }
 
@@ -306,8 +327,21 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       this.context.getNMTokenSecretManager().setMasterKey(masterKey);
     }
 
-    LOG.info("Registered with ResourceManager as " + this.nodeId
-        + " with total resource of " + this.totalResource);
+    StringBuilder successfullRegistrationMsg = new StringBuilder();
+    successfullRegistrationMsg.append("Registered with ResourceManager as ")
+        .append(this.nodeId).append(" with total resource of ")
+        .append(this.totalResource);
+
+    if (regNMResponse.getAreNodeLabelsAcceptedByRM()) {
+      successfullRegistrationMsg
+          .append(" and with following Node label(s) : {")
+          .append(StringUtils.join(",", nodeLabels)).append("}");
+    } else if (hasNodeLabelsProvider) {
+      //case where provider is set but RM did not accept the Node Labels
+      LOG.error(regNMResponse.getDiagnosticsMessage());
+    }
+
+    LOG.info(successfullRegistrationMsg);
     LOG.info("Notifying ContainerManager to unblock new container-requests");
     ((ContainerManagerImpl) this.context.getContainerManager())
       .setBlockNewContainerRequests(false);
@@ -580,19 +614,41 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       @Override
       @SuppressWarnings("unchecked")
       public void run() {
-        int lastHeartBeatID = 0;
+        int lastHeartbeatID = 0;
+        Set<String> lastUpdatedNodeLabelsToRM = null;
+        if (hasNodeLabelsProvider) {
+          lastUpdatedNodeLabelsToRM = nodeLabelsProvider.getNodeLabels();
+          lastUpdatedNodeLabelsToRM =
+              (null == lastUpdatedNodeLabelsToRM) ? CommonNodeLabelsManager.EMPTY_STRING_SET
+                  : lastUpdatedNodeLabelsToRM;
+        }
         while (!isStopped) {
           // Send heartbeat
           try {
             NodeHeartbeatResponse response = null;
-            NodeStatus nodeStatus = getNodeStatus(lastHeartBeatID);
-            
+            Set<String> nodeLabelsForHeartbeat = null;
+            NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
+
+            if (hasNodeLabelsProvider) {
+              nodeLabelsForHeartbeat = nodeLabelsProvider.getNodeLabels();
+              //if the provider returns null then consider empty labels are set
+              nodeLabelsForHeartbeat =
+                  (nodeLabelsForHeartbeat == null) ? CommonNodeLabelsManager.EMPTY_STRING_SET
+                      : nodeLabelsForHeartbeat;
+              if (!areNodeLabelsUpdated(nodeLabelsForHeartbeat,
+                  lastUpdatedNodeLabelsToRM)) {
+                //if nodelabels have not changed then no need to send
+                nodeLabelsForHeartbeat = null;
+              }
+            }
+
             NodeHeartbeatRequest request =
                 NodeHeartbeatRequest.newInstance(nodeStatus,
-                  NodeStatusUpdaterImpl.this.context
-                    .getContainerTokenSecretManager().getCurrentKey(),
-                  NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager()
-                    .getCurrentKey());
+                    NodeStatusUpdaterImpl.this.context
+                        .getContainerTokenSecretManager().getCurrentKey(),
+                    NodeStatusUpdaterImpl.this.context
+                        .getNMTokenSecretManager().getCurrentKey(),
+                    nodeLabelsForHeartbeat);
             response = resourceTracker.nodeHeartbeat(request);
             //get next heartbeat interval from response
             nextHeartBeatInterval = response.getNextHeartBeatInterval();
@@ -623,6 +679,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
               break;
             }
 
+            if (response.getAreNodeLabelsAcceptedByRM()) {
+              lastUpdatedNodeLabelsToRM = nodeLabelsForHeartbeat;
+              LOG.info("Node Labels {"
+                  + StringUtils.join(",", nodeLabelsForHeartbeat)
+                  + "} were Accepted by RM ");
+            } else if (nodeLabelsForHeartbeat != null) {
+              // case where NodeLabelsProvider is set and updated labels were
+              // sent to RM and RM rejected the labels
+              LOG.error(response.getDiagnosticsMessage());
+            }
+
             // Explicitly put this method after checking the resync response. We
             // don't want to remove the completed containers before resync
             // because these completed containers will be reported back to RM
@@ -631,7 +698,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
             removeOrTrackCompletedContainersFromContext(response
                   .getContainersToBeRemovedFromNM());
 
-            lastHeartBeatID = response.getResponseId();
+            lastHeartbeatID = response.getResponseId();
             List<ContainerId> containersToCleanup = response
                 .getContainersToCleanup();
             if (!containersToCleanup.isEmpty()) {
@@ -680,6 +747,23 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         }
       }
 
+      /**
+       * Caller should take care of sending non null nodelabels for both
+       * arguments
+       * 
+       * @param nodeLabelsNew
+       * @param nodeLabelsOld
+       * @return if the New node labels are diff from the older one.
+       */
+      private boolean areNodeLabelsUpdated(Set<String> nodeLabelsNew,
+          Set<String> nodeLabelsOld) {
+        if (nodeLabelsNew.size() != nodeLabelsOld.size()
+            || !nodeLabelsOld.containsAll(nodeLabelsNew)) {
+          return true;
+        }
+        return false;
+      }
+
       private void updateMasterKeys(NodeHeartbeatResponse response) {
         // See if the master-key has rolled over
         MasterKey updatedMasterKey = response.getContainerTokenMasterKey();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProvider.java
new file mode 100644
index 0000000..4b34d76
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProvider.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
+
+import java.util.Set;
+
+import org.apache.hadoop.service.AbstractService;
+
+/**
+ * Interface which will be responsible for fetching the labels
+ * 
+ */
+public abstract class NodeLabelsProvider extends AbstractService {
+
+  public NodeLabelsProvider(String name) {
+    super(name);
+  }
+
+  /**
+   * Provides the labels. LabelProvider is expected to give same Labels
+   * continuously until there is a change in labels. 
+   * If null is returned then Empty label set is assumed by the caller.
+   * 
+   * @return Set of node label strings applicable for a node
+   */
+  public abstract Set<String> getNodeLabels();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 71a420e..fc404de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -1182,7 +1182,7 @@ public class TestNodeStatusUpdater {
       }
     };
     verifyNodeStartFailure(
-          "Recieved SHUTDOWN signal from Resourcemanager ,"
+          "Recieved SHUTDOWN signal from Resourcemanager, "
         + "Registration of NodeManager failed, "
         + "Message from ResourceManager: RM Shutting Down Node");
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
new file mode 100644
index 0000000..437e4c8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.ServiceOperations;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
+import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
+  private static final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
+  private NodeManager nm;
+  protected DummyNodeLabelsProvider dummyLabelsProviderRef;
+
+  @Before
+  public void setup() {
+    dummyLabelsProviderRef = new DummyNodeLabelsProvider();
+  }
+
+  @After
+  public void tearDown() {
+    if (null != nm) {
+      ServiceOperations.stop(nm);
+    }
+  }
+
+  private class ResourceTrackerForLabels implements ResourceTracker {
+    int heartbeatID = 0;
+    Set<String> labels;
+
+    private boolean receivedNMHeartbeat = false;
+    private boolean receivedNMRegister = false;
+
+    private MasterKey createMasterKey() {
+      MasterKey masterKey = new MasterKeyPBImpl();
+      masterKey.setKeyId(123);
+      masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
+          .byteValue() }));
+      return masterKey;
+    }
+
+    @Override
+    public RegisterNodeManagerResponse registerNodeManager(
+        RegisterNodeManagerRequest request) throws YarnException, IOException {
+      labels = request.getNodeLabels();
+      RegisterNodeManagerResponse response =
+          recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
+      response.setNodeAction(NodeAction.NORMAL);
+      response.setContainerTokenMasterKey(createMasterKey());
+      response.setNMTokenMasterKey(createMasterKey());
+      response.setAreNodeLabelsAcceptedByRM(labels != null);
+      synchronized (ResourceTrackerForLabels.class) {
+        receivedNMRegister = true;
+        ResourceTrackerForLabels.class.notifyAll();
+      }
+      return response;
+    }
+
+    public void waitTillHeartbeat() {
+      if (receivedNMHeartbeat) {
+        return;
+      }
+      int i = 500;
+      while (!receivedNMHeartbeat && i > 0) {
+        synchronized (ResourceTrackerForLabels.class) {
+          if (!receivedNMHeartbeat) {
+            try {
+              System.out
+                  .println("In ResourceTrackerForLabels waiting for heartbeat : "
+                      + System.currentTimeMillis());
+              ResourceTrackerForLabels.class.wait(500l);
+              // to avoid race condition, i.e. sendOutofBandHeartBeat can be
+              // sent before NSU thread has gone to sleep, hence we wait and try
+              // to resend heartbeat again
+              nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
+              ResourceTrackerForLabels.class.wait(500l);
+              i--;
+            } catch (InterruptedException e) {
+              Assert.fail("Exception caught while waiting for Heartbeat");
+              e.printStackTrace();
+            }
+          }
+        }
+      }
+      if (!receivedNMHeartbeat) {
+        Assert.fail("Heartbeat dint receive even after waiting");
+      }
+    }
+
+    public void waitTillRegister() {
+      if (receivedNMRegister) {
+        return;
+      }
+      while (!receivedNMRegister) {
+        synchronized (ResourceTrackerForLabels.class) {
+          try {
+            ResourceTrackerForLabels.class.wait();
+          } catch (InterruptedException e) {
+            Assert.fail("Exception caught while waiting for register");
+            e.printStackTrace();
+          }
+        }
+      }
+    }
+
+    /**
+     * Flag to indicate received any
+     */
+    public void resetNMHeartbeatReceiveFlag() {
+      synchronized (ResourceTrackerForLabels.class) {
+        receivedNMHeartbeat = false;
+      }
+    }
+
+    @Override
+    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+        throws YarnException, IOException {
+      System.out.println("RTS receive heartbeat : "
+          + System.currentTimeMillis());
+      labels = request.getNodeLabels();
+      NodeStatus nodeStatus = request.getNodeStatus();
+      nodeStatus.setResponseId(heartbeatID++);
+
+      NodeHeartbeatResponse nhResponse =
+          YarnServerBuilderUtils.newNodeHeartbeatResponse(heartbeatID,
+              NodeAction.NORMAL, null, null, null, null, 1000L);
+
+      // to ensure that heartbeats are sent only when required.
+      nhResponse.setNextHeartBeatInterval(Long.MAX_VALUE);
+      nhResponse.setAreNodeLabelsAcceptedByRM(labels != null);
+
+      synchronized (ResourceTrackerForLabels.class) {
+        receivedNMHeartbeat = true;
+        ResourceTrackerForLabels.class.notifyAll();
+      }
+      return nhResponse;
+    }
+  }
+
+  public static class DummyNodeLabelsProvider extends NodeLabelsProvider {
+
+    @SuppressWarnings("unchecked")
+    private Set<String> nodeLabels = Collections.EMPTY_SET;
+
+    public DummyNodeLabelsProvider() {
+      super(DummyNodeLabelsProvider.class.getName());
+    }
+
+    @Override
+    public synchronized Set<String> getNodeLabels() {
+      return nodeLabels;
+    }
+
+    synchronized void setNodeLabels(Set<String> nodeLabels) {
+      this.nodeLabels = nodeLabels;
+    }
+  }
+
+  private YarnConfiguration createNMConfigForDistributeNodeLabels() {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
+        YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);
+    return conf;
+  }
+
+  @Test
+  public void testNodeStatusUpdaterForNodeLabels() throws InterruptedException,
+      IOException {
+    final ResourceTrackerForLabels resourceTracker =
+        new ResourceTrackerForLabels();
+    nm = new NodeManager() {
+      @Override
+      protected NodeLabelsProvider createNodeLabelsProvider(
+          Configuration conf) throws IOException {
+        return dummyLabelsProviderRef;
+      }
+
+      @Override
+      protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+          Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+          NodeLabelsProvider labelsProvider) {
+
+        return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
+            metrics, labelsProvider) {
+          @Override
+          protected ResourceTracker getRMClient() {
+            return resourceTracker;
+          }
+
+          @Override
+          protected void stopRMProxy() {
+            return;
+          }
+        };
+      }
+    };
+
+    YarnConfiguration conf = createNMConfigForDistributeNodeLabels();
+    nm.init(conf);
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+    nm.start();
+    resourceTracker.waitTillRegister();
+    assertCollectionEquals(resourceTracker.labels,
+        dummyLabelsProviderRef.getNodeLabels());
+
+    resourceTracker.waitTillHeartbeat();// wait till the first heartbeat
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+
+    // heartbeat with updated labels
+    dummyLabelsProviderRef.setNodeLabels(toSet("P"));
+
+    nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
+    resourceTracker.waitTillHeartbeat();
+    assertCollectionEquals(resourceTracker.labels,
+        dummyLabelsProviderRef.getNodeLabels());
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+
+    // heartbeat without updating labels
+    nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
+    resourceTracker.waitTillHeartbeat();
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+    assertNull(
+        "If no change in labels then null should be sent as part of request",
+        resourceTracker.labels);
+    
+    // provider return with null labels
+    dummyLabelsProviderRef.setNodeLabels(null);    
+    nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
+    resourceTracker.waitTillHeartbeat();
+    assertTrue("If provider sends null then empty labels should be sent",
+        resourceTracker.labels.isEmpty());
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+
+    nm.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cba4ed16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 0de556b..22efe25 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -21,6 +21,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
@@ -31,6 +34,7 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -100,6 +104,8 @@ public class ResourceTrackerService extends AbstractService implements
   private int minAllocMb;
   private int minAllocVcores;
 
+  private boolean isDistributesNodeLabelsConf;
+
   static {
     resync.setNodeAction(NodeAction.RESYNC);
 
@@ -149,6 +155,14 @@ public class ResourceTrackerService extends AbstractService implements
         YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
         YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
 
+    String nodeLabelConfigurationType =
+        conf.get(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
+            YarnConfiguration.DEFAULT_NODELABEL_CONFIGURATION_TYPE);
+
+    isDistributesNodeLabelsConf =
+        YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE
+            .equals(nodeLabelConfigurationType);
+
     super.serviceInit(conf);
   }
 
@@ -336,11 +350,31 @@ public class ResourceTrackerService extends AbstractService implements
       }
     }
 
-    String message =
-        "NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: "
-            + httpPort + ") " + "registered with capability: " + capability
-            + ", assigned nodeId " + nodeId;
-    LOG.info(message);
+    // Update node's labels to RM's NodeLabelManager.
+    Set<String> nodeLabels = request.getNodeLabels();
+    if (isDistributesNodeLabelsConf && nodeLabels != null) {
+      try {
+        updateNodeLabelsFromNMReport(nodeLabels, nodeId);
+        response.setAreNodeLabelsAcceptedByRM(true);
+      } catch (IOException ex) {
+        // Ensure the exception is captured in the response
+        response.setDiagnosticsMessage(ex.getMessage());
+        response.setAreNodeLabelsAcceptedByRM(false);
+      }
+    }
+
+    StringBuilder message = new StringBuilder();
+    message.append("NodeManager from node ").append(host).append("(cmPort: ")
+        .append(cmPort).append(" httpPort: ");
+    message.append(httpPort).append(") ")
+        .append("registered with capability: ").append(capability);
+    message.append(", assigned nodeId ").append(nodeId);
+    if (response.getAreNodeLabelsAcceptedByRM()) {
+      message.append(", node labels { ").append(
+          StringUtils.join(",", nodeLabels) + " } ");
+    }
+
+    LOG.info(message.toString());
     response.setNodeAction(NodeAction.NORMAL);
     response.setRMIdentifier(ResourceManager.getClusterTimeStamp());
     response.setRMVersion(YarnVersionInfo.getVersion());
@@ -359,6 +393,7 @@ public class ResourceTrackerService extends AbstractService implements
      * 2. Check if it's a registered node
      * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
      * 4. Send healthStatus to RMNode
+     * 5. Update node's labels if distributed Node Labels configuration is enabled
      */
 
     NodeId nodeId = remoteNodeStatus.getNodeId();
@@ -428,9 +463,44 @@ public class ResourceTrackerService extends AbstractService implements
             remoteNodeStatus.getContainersStatuses(), 
             remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
 
+    // 5. Update node's labels to RM's NodeLabelManager.
+    if (isDistributesNodeLabelsConf && request.getNodeLabels() != null) {
+      try {
+        updateNodeLabelsFromNMReport(request.getNodeLabels(), nodeId);
+        nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(true);
+      } catch (IOException ex) {
+        //ensure the error message is captured and sent across in response
+        nodeHeartBeatResponse.setDiagnosticsMessage(ex.getMessage());
+        nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(false);
+      }
+    }
+
     return nodeHeartBeatResponse;
   }
 
+  private void updateNodeLabelsFromNMReport(Set<String> nodeLabels,
+      NodeId nodeId) throws IOException {
+    try {
+      Map<NodeId, Set<String>> labelsUpdate =
+          new HashMap<NodeId, Set<String>>();
+      labelsUpdate.put(nodeId, nodeLabels);
+      this.rmContext.getNodeLabelManager().replaceLabelsOnNode(labelsUpdate);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Node Labels {" + StringUtils.join(",", nodeLabels)
+            + "} from Node " + nodeId + " were Accepted from RM");
+      }
+    } catch (IOException ex) {
+      StringBuilder errorMessage = new StringBuilder();
+      errorMessage.append("Node Labels {")
+          .append(StringUtils.join(",", nodeLabels))
+          .append("} reported from NM with ID ").append(nodeId)
+          .append(" was rejected from RM with exception message as : ")
+          .append(ex.getMessage());
+      LOG.error(errorMessage, ex);
+      throw new IOException(errorMessage.toString(), ex);
+    }
+  }
+
   private void populateKeys(NodeHeartbeatRequest request,
       NodeHeartbeatResponse nodeHeartBeatResponse) {
 


Mime
View raw message