hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [22/50] [abbrv] hadoop git commit: YARN-2923. Support configuration based NodeLabelsProvider Service in Distributed Node Label Configuration Setup. (Naganarasimha G R)
Date Tue, 25 Aug 2015 22:22:16 GMT
YARN-2923. Support configuration based NodeLabelsProvider Service in Distributed Node Label Configuration Setup. (Naganarasimha G R)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fc07464d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fc07464d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fc07464d

Branch: refs/heads/HDFS-7240
Commit: fc07464d1a48b0413da5e921614430e41263fdb7
Parents: 0bc15cb
Author: Wangda Tan <wangda@apache.org>
Authored: Thu Aug 20 11:51:03 2015 -0700
Committer: Wangda Tan <wangda@apache.org>
Committed: Thu Aug 20 11:51:03 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  30 +++
 .../nodelabels/CommonNodeLabelsManager.java     |   2 +-
 .../src/main/resources/yarn-default.xml         |  47 ++++
 .../yarn/server/nodemanager/NodeManager.java    |  39 ++-
 .../nodemanager/NodeStatusUpdaterImpl.java      | 259 ++++++++++++++-----
 .../nodelabels/AbstractNodeLabelsProvider.java  | 146 +++++++++++
 .../ConfigurationNodeLabelsProvider.java        |  81 ++++++
 .../server/nodemanager/TestNodeManager.java     |  50 +++-
 .../TestNodeStatusUpdaterForLabels.java         |  76 +++++-
 .../TestConfigurationNodeLabelsProvider.java    | 146 +++++++++++
 11 files changed, 793 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc07464d/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index b22777c..07b6339 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -175,6 +175,9 @@ Release 2.8.0 - UNRELEASED
     YARN-4055. Report node resource utilization in heartbeat. 
     (Inigo Goiri via kasha)
 
+    YARN-2923. Support configuration based NodeLabelsProvider Service in Distributed 
+    Node Label Configuration Setup. (Naganarasimha G R)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc07464d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 6c438f2..55eac85 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1967,6 +1967,36 @@ public class YarnConfiguration extends Configuration {
         NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE));
   }
 
+  private static final String NM_NODE_LABELS_PREFIX = NM_PREFIX
+      + "node-labels.";
+
+  public static final String NM_NODE_LABELS_PROVIDER_CONFIG =
+      NM_NODE_LABELS_PREFIX + "provider";
+
+  // whitelist names for the yarn.nodemanager.node-labels.provider
+  public static final String CONFIG_NODE_LABELS_PROVIDER = "config";
+
+  private static final String NM_NODE_LABELS_PROVIDER_PREFIX =
+      NM_NODE_LABELS_PREFIX + "provider.";
+
+  // If -1 is configured then no timer task should be created
+  public static final String NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
+      NM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms";
+
+  public static final String NM_NODE_LABELS_PROVIDER_FETCH_TIMEOUT_MS =
+      NM_NODE_LABELS_PROVIDER_PREFIX + "fetch-timeout-ms";
+
+  // once in 10 mins
+  public static final long DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
+      10 * 60 * 1000;
+
+  // Twice of default interval time
+  public static final long DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_TIMEOUT_MS =
+      DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS * 2;
+
+  public static final String NM_PROVIDER_CONFIGURED_NODE_LABELS =
+      NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-labels";
+
   public YarnConfiguration() {
     super();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc07464d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
index 34e6832..8cc3770 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
@@ -916,7 +916,7 @@ public class CommonNodeLabelsManager extends AbstractService {
     }
   }
 
-  private void checkAndThrowLabelName(String label) throws IOException {
+  public static void checkAndThrowLabelName(String label) throws IOException {
     if (label == null || label.isEmpty() || label.length() > MAX_LABEL_LENGTH) {
       throw new IOException("label added is empty or exceeds "
           + MAX_LABEL_LENGTH + " character(s)");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc07464d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 53face0..00a9fba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2092,6 +2092,53 @@
     <value>centralized</value>
   </property>
 
+  <!-- Distributed Node Labels Configuration -->
+  <property>
+    <description>
+    When node labels "yarn.node-labels.configuration-type" is
+    of type "distributed" Administrators can configure the source of the
+    node labels provider by configuring this parameter. Administrators can
+    specify either "config" or the class name of the provider. Configured
+    class needs to extend
+    org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider.
+    If "config" is specified then, "ConfigurationNodeLabelsProvider" will
+    be used.
+    </description>
+    <name>yarn.nodemanager.node-labels.provider</name>
+  </property>
+
+  <property>
+    <description>
+    When node labels "yarn.nodemanager.node-labels.provider" is of type
+    "config" or the configured class extends AbstractNodeLabelsProvider then
+    periodically node labels are retrieved from the node labels provider.
+    This configuration is to define the interval. If -1 is configured then
+    node labels are retrieved from. provider only during initialization.
+    Defaults to 10 mins.
+    </description>
+    <name>yarn.nodemanager.node-labels.provider.fetch-interval-ms</name>
+    <value>600000</value>
+  </property>
+
+  <property>
+    <description>
+    When node labels "yarn.nodemanager.node-labels.provider"
+    is of type "config" then ConfigurationNodeLabelsProvider fetches the
+    labels this parameter.
+    </description>
+    <name>yarn.nodemanager.node-labels.provider.configured-node-labels</name>
+  </property>
+
+  <property>
+    <description>
+    When node labels "yarn.nodemanager.node-labels.provider" is a class
+    which extends AbstractNodeLabelsProvider then this configuration provides
+    the timeout period after which it will stop querying the Node labels
+    provider. Defaults to 20 mins.
+    </description>
+    <name>yarn.nodemanager.node-labels.provider.fetch-timeout-ms</name>
+    <value>1200000</value>
+  </property>
   <!-- Other Configuration -->
 
   <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc07464d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 327171b..68820a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
 import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
@@ -122,12 +123,38 @@ public class NodeManager extends CompositeService
         metrics, nodeLabelsProvider);
   }
 
-  @VisibleForTesting
-  protected NodeLabelsProvider createNodeLabelsProvider(
-      Configuration conf) throws IOException {
-    // TODO as part of YARN-2729
-    // Need to get the implementation of provider service and return
-    return null;
+  protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
+      throws IOException {
+    NodeLabelsProvider provider = null;
+    String providerString =
+        conf.get(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG, null);
+    if (providerString == null || providerString.trim().length() == 0) {
+      // Seems like Distributed Node Labels configuration is not enabled
+      return provider;
+    }
+    switch (providerString.trim().toLowerCase()) {
+    case YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER:
+      provider = new ConfigurationNodeLabelsProvider();
+      break;
+    default:
+      try {
+        Class<? extends NodeLabelsProvider> labelsProviderClass =
+            conf.getClass(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG, null,
+                NodeLabelsProvider.class);
+        provider = labelsProviderClass.newInstance();
+      } catch (InstantiationException | IllegalAccessException
+          | RuntimeException e) {
+        LOG.error("Failed to create NodeLabelsProvider based on Configuration",
+            e);
+        throw new IOException("Failed to create NodeLabelsProvider : "
+            + e.getMessage(), e);
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Distributed Node Labels is enabled"
+          + " with provider class as : " + provider.getClass().toString());
+    }
+    return provider;
   }
 
   protected NodeResourceMonitor createNodeResourceMonitor() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc07464d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 0680ea3..05efc69 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -137,8 +137,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   private boolean registeredWithRM = false;
   Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>();
 
-  private final NodeLabelsProvider nodeLabelsProvider;
-  private final boolean hasNodeLabelsProvider;
+  private NMNodeLabelsHandler nodeLabelsHandler;
 
   public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
       NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
@@ -150,8 +149,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       NodeLabelsProvider nodeLabelsProvider) {
     super(NodeStatusUpdaterImpl.class.getName());
     this.healthChecker = healthChecker;
-    this.nodeLabelsProvider = nodeLabelsProvider;
-    this.hasNodeLabelsProvider = (nodeLabelsProvider != null);
+    nodeLabelsHandler = createNMNodeLabelsHandler(nodeLabelsProvider);
     this.context = context;
     this.dispatcher = dispatcher;
     this.metrics = metrics;
@@ -313,13 +311,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   protected void registerWithRM()
       throws YarnException, IOException {
     List<NMContainerStatus> containerReports = getNMContainerStatuses();
-    Set<NodeLabel> nodeLabels = null;
-    if (hasNodeLabelsProvider) {
-      nodeLabels = nodeLabelsProvider.getNodeLabels();
-      nodeLabels =
-          (null == nodeLabels) ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET
-              : nodeLabels;
-    }
+    Set<NodeLabel> nodeLabels = nodeLabelsHandler.getNodeLabelsForRegistration();
     RegisterNodeManagerRequest request =
         RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
             nodeManagerVersionId, containerReports, getRunningApplications(),
@@ -380,14 +372,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         .append(this.nodeId).append(" with total resource of ")
         .append(this.totalResource);
 
-    if (regNMResponse.getAreNodeLabelsAcceptedByRM()) {
-      successfullRegistrationMsg
-          .append(" and with following Node label(s) : {")
-          .append(StringUtils.join(",", nodeLabels)).append("}");
-    } else if (hasNodeLabelsProvider) {
-      //case where provider is set but RM did not accept the Node Labels
-      LOG.error(regNMResponse.getDiagnosticsMessage());
-    }
+    successfullRegistrationMsg.append(nodeLabelsHandler
+        .verifyRMRegistrationResponseForNodeLabels(regNMResponse));
 
     LOG.info(successfullRegistrationMsg);
     LOG.info("Notifying ContainerManager to unblock new container-requests");
@@ -688,33 +674,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       @SuppressWarnings("unchecked")
       public void run() {
         int lastHeartbeatID = 0;
-        Set<NodeLabel> lastUpdatedNodeLabelsToRM = null;
-        if (hasNodeLabelsProvider) {
-          lastUpdatedNodeLabelsToRM = nodeLabelsProvider.getNodeLabels();
-          lastUpdatedNodeLabelsToRM =
-              (null == lastUpdatedNodeLabelsToRM) ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET
-                  : lastUpdatedNodeLabelsToRM;
-        }
         while (!isStopped) {
           // Send heartbeat
           try {
             NodeHeartbeatResponse response = null;
-            Set<NodeLabel> nodeLabelsForHeartbeat = null;
+            Set<NodeLabel> nodeLabelsForHeartbeat =
+                nodeLabelsHandler.getNodeLabelsForHeartbeat();
             NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
 
-            if (hasNodeLabelsProvider) {
-              nodeLabelsForHeartbeat = nodeLabelsProvider.getNodeLabels();
-              // if the provider returns null then consider empty labels are set
-              nodeLabelsForHeartbeat =
-                  (nodeLabelsForHeartbeat == null) ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET
-                      : nodeLabelsForHeartbeat;
-              if (!areNodeLabelsUpdated(nodeLabelsForHeartbeat,
-                  lastUpdatedNodeLabelsToRM)) {
-                // if nodelabels have not changed then no need to send
-                nodeLabelsForHeartbeat = null;
-              }
-            }
-
             NodeHeartbeatRequest request =
                 NodeHeartbeatRequest.newInstance(nodeStatus,
                     NodeStatusUpdaterImpl.this.context
@@ -740,9 +707,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
             updateMasterKeys(response);
 
             if (response.getNodeAction() == NodeAction.SHUTDOWN) {
-              LOG
-                .warn("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat,"
-                    + " hence shutting down.");
+              LOG.warn("Recieved SHUTDOWN signal from Resourcemanager as part of"
+                  + " heartbeat, hence shutting down.");
               LOG.warn("Message from ResourceManager: "
                   + response.getDiagnosticsMessage());
               context.setDecommissioned(true);
@@ -764,16 +730,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
               break;
             }
 
-            if (response.getAreNodeLabelsAcceptedByRM()) {
-              lastUpdatedNodeLabelsToRM = nodeLabelsForHeartbeat;
-              LOG.info("Node Labels {"
-                  + StringUtils.join(",", nodeLabelsForHeartbeat)
-                  + "} were Accepted by RM ");
-            } else if (nodeLabelsForHeartbeat != null) {
-              // case where NodeLabelsProvider is set and updated labels were
-              // sent to RM and RM rejected the labels
-              LOG.error(response.getDiagnosticsMessage());
-            }
+            nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(response);
 
             // Explicitly put this method after checking the resync response. We
             // don't want to remove the completed containers before resync
@@ -833,23 +790,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         }
       }
 
-      /**
-       * Caller should take care of sending non null nodelabels for both
-       * arguments
-       * 
-       * @param nodeLabelsNew
-       * @param nodeLabelsOld
-       * @return if the New node labels are diff from the older one.
-       */
-      private boolean areNodeLabelsUpdated(Set<NodeLabel> nodeLabelsNew,
-          Set<NodeLabel> nodeLabelsOld) {
-        if (nodeLabelsNew.size() != nodeLabelsOld.size()
-            || !nodeLabelsOld.containsAll(nodeLabelsNew)) {
-          return true;
-        }
-        return false;
-      }
-
       private void updateMasterKeys(NodeHeartbeatResponse response) {
         // See if the master-key has rolled over
         MasterKey updatedMasterKey = response.getContainerTokenMasterKey();
@@ -879,4 +819,183 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     reports.addAll(logAggregationReportForAppsTempList);
     return reports;
   }
+
+  private NMNodeLabelsHandler createNMNodeLabelsHandler(
+      NodeLabelsProvider nodeLabelsProvider) {
+    if (nodeLabelsProvider == null) {
+      return new NMCentralizedNodeLabelsHandler();
+    } else {
+      return new NMDistributedNodeLabelsHandler(nodeLabelsProvider);
+    }
+  }
+
+  private static interface NMNodeLabelsHandler {
+    /**
+     * validates nodeLabels From Provider and returns it to the caller. Also
+     * ensures that if provider returns null then empty label set is considered
+     */
+    Set<NodeLabel> getNodeLabelsForRegistration();
+
+    /**
+     * @return RMRegistration Success message and on failure will log
+     *         independently and returns empty string
+     */
+    String verifyRMRegistrationResponseForNodeLabels(
+        RegisterNodeManagerResponse regNMResponse);
+
+    /**
+     * If nodeLabels From Provider is different previous node labels then it
+     * will check the syntax correctness and throws exception if invalid. If
+     * valid, returns nodeLabels From Provider. Also ensures that if provider
+     * returns null then empty label set is considered and If labels are not
+     * modified it returns null.
+     */
+    Set<NodeLabel> getNodeLabelsForHeartbeat();
+
+    /**
+     * check whether if updated labels sent to RM was accepted or not
+     * @param response
+     */
+    void verifyRMHeartbeatResponseForNodeLabels(NodeHeartbeatResponse response);
+  }
+
+  /**
+   * In centralized configuration, NM need not send Node labels or process the
+   * response
+   */
+  private static class NMCentralizedNodeLabelsHandler
+      implements NMNodeLabelsHandler {
+    @Override
+    public Set<NodeLabel> getNodeLabelsForHeartbeat() {
+      return null;
+    }
+
+    @Override
+    public Set<NodeLabel> getNodeLabelsForRegistration() {
+      return null;
+    }
+
+    @Override
+    public void verifyRMHeartbeatResponseForNodeLabels(
+        NodeHeartbeatResponse response) {
+    }
+
+    @Override
+    public String verifyRMRegistrationResponseForNodeLabels(
+        RegisterNodeManagerResponse regNMResponse) {
+      return "";
+    }
+  }
+
+  private static class NMDistributedNodeLabelsHandler
+      implements NMNodeLabelsHandler {
+    private NMDistributedNodeLabelsHandler(
+        NodeLabelsProvider nodeLabelsProvider) {
+      this.nodeLabelsProvider = nodeLabelsProvider;
+    }
+
+    private final NodeLabelsProvider nodeLabelsProvider;
+    private Set<NodeLabel> previousNodeLabels;
+    private boolean updatedLabelsSentToRM;
+
+    @Override
+    public Set<NodeLabel> getNodeLabelsForRegistration() {
+      Set<NodeLabel> nodeLabels = nodeLabelsProvider.getNodeLabels();
+      nodeLabels = (null == nodeLabels)
+          ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET : nodeLabels;
+      previousNodeLabels = nodeLabels;
+      try {
+        validateNodeLabels(nodeLabels);
+      } catch (IOException e) {
+        nodeLabels = null;
+      }
+      return nodeLabels;
+    }
+
+    @Override
+    public String verifyRMRegistrationResponseForNodeLabels(
+        RegisterNodeManagerResponse regNMResponse) {
+      StringBuilder successfulNodeLabelsRegistrationMsg = new StringBuilder("");
+      if (regNMResponse.getAreNodeLabelsAcceptedByRM()) {
+        successfulNodeLabelsRegistrationMsg
+            .append(" and with following Node label(s) : {")
+            .append(StringUtils.join(",", previousNodeLabels)).append("}");
+      } else {
+        // case where provider is set but RM did not accept the Node Labels
+        LOG.error(regNMResponse.getDiagnosticsMessage());
+      }
+      return successfulNodeLabelsRegistrationMsg.toString();
+    }
+
+    @Override
+    public Set<NodeLabel> getNodeLabelsForHeartbeat() {
+      Set<NodeLabel> nodeLabelsForHeartbeat =
+          nodeLabelsProvider.getNodeLabels();
+      // if the provider returns null then consider empty labels are set
+      nodeLabelsForHeartbeat = (nodeLabelsForHeartbeat == null)
+          ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET
+          : nodeLabelsForHeartbeat;
+      // take some action only on modification of labels
+      boolean areNodeLabelsUpdated =
+          nodeLabelsForHeartbeat.size() != previousNodeLabels.size()
+              || !previousNodeLabels.containsAll(nodeLabelsForHeartbeat);
+
+      updatedLabelsSentToRM = false;
+      if (areNodeLabelsUpdated) {
+        previousNodeLabels = nodeLabelsForHeartbeat;
+        try {
+          validateNodeLabels(nodeLabelsForHeartbeat);
+          updatedLabelsSentToRM = true;
+        } catch (IOException e) {
+          // set previous node labels to invalid set, so that invalid
+          // labels are not verified for every HB, and send empty set
+          // to RM to have same nodeLabels which was earlier set.
+          nodeLabelsForHeartbeat = null;
+        }
+      } else {
+        // if nodelabels have not changed then no need to send
+        nodeLabelsForHeartbeat = null;
+      }
+      return nodeLabelsForHeartbeat;
+    }
+
+    private void validateNodeLabels(Set<NodeLabel> nodeLabelsForHeartbeat)
+        throws IOException {
+      Iterator<NodeLabel> iterator = nodeLabelsForHeartbeat.iterator();
+      boolean hasInvalidLabel = false;
+      StringBuilder errorMsg = new StringBuilder("");
+      while (iterator.hasNext()) {
+        try {
+          CommonNodeLabelsManager
+              .checkAndThrowLabelName(iterator.next().getName());
+        } catch (IOException e) {
+          errorMsg.append(e.getMessage());
+          errorMsg.append(" , ");
+          hasInvalidLabel = true;
+        }
+      }
+      if (hasInvalidLabel) {
+        LOG.error("Invalid Node Label(s) from Provider : " + errorMsg);
+        throw new IOException(errorMsg.toString());
+      }
+    }
+
+    @Override
+    public void verifyRMHeartbeatResponseForNodeLabels(
+        NodeHeartbeatResponse response) {
+      if (updatedLabelsSentToRM) {
+        if (response.getAreNodeLabelsAcceptedByRM()) {
+          LOG.info("Node Labels {" + StringUtils.join(",", previousNodeLabels)
+              + "} were Accepted by RM ");
+        } else {
+          // case where updated labels from NodeLabelsProvider is sent to RM and
+          // RM rejected the labels
+          LOG.error(
+              "NM node labels {" + StringUtils.join(",", previousNodeLabels)
+                  + "} were not accepted by RM and message from RM : "
+                  + response.getDiagnosticsMessage());
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc07464d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/AbstractNodeLabelsProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/AbstractNodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/AbstractNodeLabelsProvider.java
new file mode 100644
index 0000000..bbc6710
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/AbstractNodeLabelsProvider.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Provides base implementation of NodeLabelsProvider with Timer and expects
+ * subclass to provide TimerTask which can fetch NodeLabels
+ */
+public abstract class AbstractNodeLabelsProvider extends NodeLabelsProvider {
+  public static final long DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER = -1;
+
+  // Delay after which timer task are triggered to fetch NodeLabels
+  protected long intervalTime;
+
+  // Timer used to schedule node labels fetching
+  protected Timer nodeLabelsScheduler;
+
+  public static final String NODE_LABELS_SEPRATOR = ",";
+
+  protected Lock readLock = null;
+  protected Lock writeLock = null;
+
+  protected TimerTask timerTask;
+
+  protected Set<NodeLabel> nodeLabels =
+      CommonNodeLabelsManager.EMPTY_NODELABEL_SET;
+
+  @VisibleForTesting
+  long startTime = 0;
+
+  public AbstractNodeLabelsProvider(String name) {
+    super(name);
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.intervalTime =
+        conf.getLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
+            YarnConfiguration.DEFAULT_NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS);
+
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    readLock = readWriteLock.readLock();
+    writeLock = readWriteLock.writeLock();
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    timerTask = createTimerTask();
+    if (intervalTime != DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER) {
+      nodeLabelsScheduler =
+          new Timer("DistributedNodeLabelsRunner-Timer", true);
+      // Start the timer task and then periodically at the configured interval
+      // time. Illegal values for intervalTime is handled by timer api
+      nodeLabelsScheduler.scheduleAtFixedRate(timerTask, startTime,
+          intervalTime);
+    }
+    super.serviceStart();
+  }
+
+  /**
+   * terminate the timer
+   * @throws Exception
+   */
+  @Override
+  protected void serviceStop() throws Exception {
+    if (nodeLabelsScheduler != null) {
+      nodeLabelsScheduler.cancel();
+    }
+    super.serviceStop();
+  }
+
+  /**
+   * @return Returns output from provider.
+   */
+  @Override
+  public Set<NodeLabel> getNodeLabels() {
+    readLock.lock();
+    try {
+      return nodeLabels;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  protected void setNodeLabels(Set<NodeLabel> nodeLabelsSet) {
+    writeLock.lock();
+    try {
+      nodeLabels = nodeLabelsSet;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Used only by tests to access the timer task directly
+   *
+   * @return the timer task
+   */
+  TimerTask getTimerTask() {
+    return timerTask;
+  }
+
+  static Set<NodeLabel> convertToNodeLabelSet(Set<String> nodeLabels) {
+    if (null == nodeLabels) {
+      return null;
+    }
+    Set<NodeLabel> labels = new HashSet<NodeLabel>();
+    for (String label : nodeLabels) {
+      labels.add(NodeLabel.newInstance(label));
+    }
+    return labels;
+  }
+
+  public abstract TimerTask createTimerTask();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc07464d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java
new file mode 100644
index 0000000..f549d1a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/ConfigurationNodeLabelsProvider.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Provides Node's Labels by constantly monitoring the configuration.
+ */
+public class ConfigurationNodeLabelsProvider extends AbstractNodeLabelsProvider {
+
+  private static final Log LOG = LogFactory
+      .getLog(ConfigurationNodeLabelsProvider.class);
+
+  public ConfigurationNodeLabelsProvider() {
+    super("Configuration Based NodeLabels Provider");
+  }
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    // In case timer is not configured avoid calling timertask.run thus avoiding
+    // unnecessary creation of YarnConfiguration Object
+    updateNodeLabelsFromConfig(conf);
+    if (intervalTime != DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER) {
+      startTime = new Date().getTime() + intervalTime;
+    }
+  }
+
+  private void updateNodeLabelsFromConfig(Configuration conf)
+      throws IOException {
+    String confLabelString =
+        conf.get(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, null);
+    String[] nodeLabelsFromConfiguration =
+        (confLabelString == null || confLabelString.isEmpty()) ? new String[] {}
+            : StringUtils.getStrings(confLabelString);
+    setNodeLabels(convertToNodeLabelSet(new HashSet<String>(
+        Arrays.asList(nodeLabelsFromConfiguration))));
+  }
+
+  private class ConfigurationMonitorTimerTask extends TimerTask {
+    @Override
+    public void run() {
+      try {
+        updateNodeLabelsFromConfig(new YarnConfiguration());
+      } catch (Exception e) {
+        LOG.error("Failed to update node Labels from configuration.xml ", e);
+      }
+    }
+  }
+
+  @Override
+  public TimerTask createTimerTask() {
+    return new ConfigurationMonitorTimerTask();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc07464d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
index 20ef712..2d390ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
@@ -22,8 +22,11 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestNodeManager {
@@ -53,5 +56,50 @@ public class TestNodeManager {
       nm.stop();
     }
   }
-  
+
+  @Test
+  public void testCreationOfNodeLabelsProviderService()
+      throws InterruptedException {
+    try {
+      NodeManager nodeManager = new NodeManager();
+      Configuration conf = new Configuration();
+      NodeLabelsProvider labelsProviderService =
+          nodeManager.createNodeLabelsProvider(conf);
+      Assert
+          .assertNull(
+              "LabelsProviderService should not be initialized in default configuration",
+              labelsProviderService);
+
+      // With valid className
+      conf.set(
+          YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG,
+          "org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider");
+      labelsProviderService = nodeManager.createNodeLabelsProvider(conf);
+      Assert.assertNotNull("LabelsProviderService should be initialized When "
+          + "node labels provider class is configured", labelsProviderService);
+
+      // With invalid className
+      conf.set(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG,
+          "org.apache.hadoop.yarn.server.nodemanager.NodeManager");
+      try {
+        labelsProviderService = nodeManager.createNodeLabelsProvider(conf);
+        Assert.fail("Expected to throw IOException on Invalid configuration");
+      } catch (IOException e) {
+        // exception expected on invalid configuration
+      }
+      Assert.assertNotNull("LabelsProviderService should be initialized When "
+          + "node labels provider class is configured", labelsProviderService);
+
+      // With valid whitelisted configurations
+      conf.set(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG,
+          YarnConfiguration.CONFIG_NODE_LABELS_PROVIDER);
+      labelsProviderService = nodeManager.createNodeLabelsProvider(conf);
+      Assert.assertNotNull("LabelsProviderService should be initialized When "
+          + "node labels provider class is configured", labelsProviderService);
+
+    } catch (Exception e) {
+      Assert.fail("Exception caught");
+      e.printStackTrace();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc07464d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
index 7e1bbd8..099e4b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -108,7 +109,7 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
       if (receivedNMHeartbeat) {
         return;
       }
-      int i = 500;
+      int i = 10;
       while (!receivedNMHeartbeat && i > 0) {
         synchronized (ResourceTrackerForLabels.class) {
           if (!receivedNMHeartbeat) {
@@ -193,7 +194,6 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
 
   public static class DummyNodeLabelsProvider extends NodeLabelsProvider {
 
-    @SuppressWarnings("unchecked")
     private Set<NodeLabel> nodeLabels = CommonNodeLabelsManager.EMPTY_NODELABEL_SET;
 
     public DummyNodeLabelsProvider() {
@@ -224,8 +224,8 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
         new ResourceTrackerForLabels();
     nm = new NodeManager() {
       @Override
-      protected NodeLabelsProvider createNodeLabelsProvider(
-          Configuration conf) throws IOException {
+      protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
+          throws IOException {
         return dummyLabelsProviderRef;
       }
 
@@ -255,8 +255,7 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
     nm.start();
     resourceTracker.waitTillRegister();
     assertNLCollectionEquals(resourceTracker.labels,
-        dummyLabelsProviderRef
-            .getNodeLabels());
+        dummyLabelsProviderRef.getNodeLabels());
 
     resourceTracker.waitTillHeartbeat();// wait till the first heartbeat
     resourceTracker.resetNMHeartbeatReceiveFlag();
@@ -278,15 +277,76 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
     assertNull(
         "If no change in labels then null should be sent as part of request",
         resourceTracker.labels);
-    
+
     // provider return with null labels
-    dummyLabelsProviderRef.setNodeLabels(null);    
+    dummyLabelsProviderRef.setNodeLabels(null);
     nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
     resourceTracker.waitTillHeartbeat();
+    assertNotNull(
+        "If provider sends null then empty label set should be sent and not null",
+        resourceTracker.labels);
     assertTrue("If provider sends null then empty labels should be sent",
         resourceTracker.labels.isEmpty());
     resourceTracker.resetNMHeartbeatReceiveFlag();
 
     nm.stop();
   }
+
+  @Test
+  public void testInvalidNodeLabelsFromProvider() throws InterruptedException,
+      IOException {
+    final ResourceTrackerForLabels resourceTracker =
+        new ResourceTrackerForLabels();
+    nm = new NodeManager() {
+      @Override
+      protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
+          throws IOException {
+        return dummyLabelsProviderRef;
+      }
+
+      @Override
+      protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+          Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+          NodeLabelsProvider labelsProvider) {
+
+        return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
+            metrics, labelsProvider) {
+          @Override
+          protected ResourceTracker getRMClient() {
+            return resourceTracker;
+          }
+
+          @Override
+          protected void stopRMProxy() {
+            return;
+          }
+        };
+      }
+    };
+    dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("P"));
+    YarnConfiguration conf = createNMConfigForDistributeNodeLabels();
+    nm.init(conf);
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+    nm.start();
+    resourceTracker.waitTillHeartbeat();// wait till the first heartbeat
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+
+    // heartbeat with invalid labels
+    dummyLabelsProviderRef.setNodeLabels(toNodeLabelSet("_.P"));
+
+    nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
+    resourceTracker.waitTillHeartbeat();
+    assertNull("On Invalid Labels we need to retain earlier labels, HB "
+        + "needs to send null", resourceTracker.labels);
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+
+    // on next heartbeat same invalid labels will be given by the provider, but
+    // again label validation check and reset RM with empty labels set should
+    // not happen
+    nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
+    resourceTracker.waitTillHeartbeat();
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+    assertNull("NodeStatusUpdater need not send repeatedly empty labels on "
+        + "invalid labels from provider ", resourceTracker.labels);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc07464d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java
new file mode 100644
index 0000000..27fd4cb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/TestConfigurationNodeLabelsProvider.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.TimerTask;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestConfigurationNodeLabelsProvider extends NodeLabelTestBase {
+
+  protected static File testRootDir = new File("target",
+      TestConfigurationNodeLabelsProvider.class.getName() + "-localDir")
+      .getAbsoluteFile();
+
+  final static File nodeLabelsConfigFile = new File(testRootDir,
+      "yarn-site.xml");
+
+  private static XMLPathClassLoader loader;
+
+  private ConfigurationNodeLabelsProvider nodeLabelsProvider;
+
+  @Before
+  public void setup() {
+    loader =
+        new XMLPathClassLoader(
+            TestConfigurationNodeLabelsProvider.class.getClassLoader());
+    testRootDir.mkdirs();
+
+    nodeLabelsProvider = new ConfigurationNodeLabelsProvider();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (nodeLabelsProvider != null) {
+      nodeLabelsProvider.close();
+    }
+    if (testRootDir.exists()) {
+      FileContext.getLocalFSFileContext().delete(
+          new Path(testRootDir.getAbsolutePath()), true);
+    }
+  }
+
+  private Configuration getConfForNodeLabels() {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, "A,B,CX");
+    return conf;
+  }
+
+  @Test
+  public void testNodeLabelsFromConfig() throws IOException,
+      InterruptedException {
+    Configuration conf = getConfForNodeLabels();
+    nodeLabelsProvider.init(conf);
+    // test for ensuring labels are set during initialization of the class
+    nodeLabelsProvider.start();
+    Thread.sleep(1000l); // sleep so that timer has run once during
+                         // initialization
+    assertNLCollectionEquals(toNodeLabelSet("A", "B", "CX"),
+        nodeLabelsProvider.getNodeLabels());
+
+    // test for valid Modification
+    TimerTask timerTask = nodeLabelsProvider.getTimerTask();
+    modifyConfAndCallTimer(timerTask, "X,y,Z");
+    assertNLCollectionEquals(toNodeLabelSet("X", "y", "Z"),
+        nodeLabelsProvider.getNodeLabels());
+  }
+
+  @Test
+  public void testConfigForNoTimer() throws Exception {
+    Configuration conf = getConfForNodeLabels();
+    conf.setLong(YarnConfiguration.NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
+        AbstractNodeLabelsProvider.DISABLE_NODE_LABELS_PROVIDER_FETCH_TIMER);
+
+    nodeLabelsProvider.init(conf);
+    nodeLabelsProvider.start();
+    Assert
+        .assertNull(
+            "Timer is not expected to be created when interval is configured as -1",
+            nodeLabelsProvider.nodeLabelsScheduler);
+    // Ensure that even though timer is not run, node labels are fetched at least once so
+    // that NM registers/updates Labels with RM
+    assertNLCollectionEquals(toNodeLabelSet("A", "B", "CX"),
+        nodeLabelsProvider.getNodeLabels());
+  }
+
+  private static void modifyConfAndCallTimer(TimerTask timerTask,
+      String nodeLabels) throws FileNotFoundException, IOException {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.NM_PROVIDER_CONFIGURED_NODE_LABELS, nodeLabels);
+    conf.writeXml(new FileOutputStream(nodeLabelsConfigFile));
+    ClassLoader actualLoader = Thread.currentThread().getContextClassLoader();
+    try {
+      Thread.currentThread().setContextClassLoader(loader);
+      timerTask.run();
+    } finally {
+      Thread.currentThread().setContextClassLoader(actualLoader);
+    }
+  }
+
+  private static class XMLPathClassLoader extends ClassLoader {
+    public XMLPathClassLoader(ClassLoader wrapper) {
+      super(wrapper);
+    }
+
+    public URL getResource(String name) {
+      if (name.equals(YarnConfiguration.YARN_SITE_CONFIGURATION_FILE)) {
+        try {
+          return nodeLabelsConfigFile.toURI().toURL();
+        } catch (MalformedURLException e) {
+          e.printStackTrace();
+          Assert.fail();
+        }
+      }
+      return super.getResource(name);
+    }
+  }
+}


Mime
View raw message