hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sun...@apache.org
Subject [23/29] hadoop git commit: YARN-7863. Modify placement constraints to support node attributes. Contributed by Sunil Govindan.
Date Wed, 12 Sep 2018 11:15:01 GMT
YARN-7863. Modify placement constraints to support node attributes. Contributed by Sunil Govindan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/67ae81f0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/67ae81f0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/67ae81f0

Branch: refs/heads/trunk
Commit: 67ae81f0e0ac7f107261ee15f2eb4d189e3b1983
Parents: 8c94739
Author: Naganarasimha <naganarasimha_gr@apache.org>
Authored: Mon Aug 27 10:27:33 2018 +0800
Committer: Sunil G <sunilg@apache.org>
Committed: Wed Sep 12 16:01:01 2018 +0530

----------------------------------------------------------------------
 .../yarn/api/records/NodeAttributeOpCode.java   |  43 +++++++
 .../yarn/api/resource/PlacementConstraint.java  |  40 +++++-
 .../yarn/api/resource/PlacementConstraints.java |  19 +++
 .../constraint/PlacementConstraintParser.java   | 112 +++++++++++++++--
 .../src/main/proto/yarn_protos.proto            |   7 ++
 .../resource/TestPlacementConstraintParser.java |  61 ++++++++-
 .../distributedshell/ApplicationMaster.java     |  35 ++++--
 .../applications/distributedshell/Client.java   |   5 +-
 .../distributedshell/PlacementSpec.java         |  19 ++-
 .../PlacementConstraintFromProtoConverter.java  |  10 +-
 .../pb/PlacementConstraintToProtoConverter.java |  11 ++
 .../server/resourcemanager/ResourceManager.java |   7 +-
 .../nodelabels/NodeAttributesManagerImpl.java   |  24 +++-
 .../scheduler/SchedulerNode.java                |  11 ++
 .../scheduler/capacity/CapacityScheduler.java   |  36 +++++-
 .../constraint/PlacementConstraintsUtil.java    | 126 ++++++++++++++++---
 .../NodeAttributesUpdateSchedulerEvent.java     |  41 ++++++
 .../scheduler/event/SchedulerEventType.java     |   1 +
 .../LocalityAppPlacementAllocator.java          |   4 +
 19 files changed, 567 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/67ae81f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeAttributeOpCode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeAttributeOpCode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeAttributeOpCode.java
new file mode 100644
index 0000000..76db063
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeAttributeOpCode.java
@@ -0,0 +1,43 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Enumeration of various node attribute op codes.
+ */
+@Public
+@Evolving
+public enum NodeAttributeOpCode {
+  /**
+   * Default as No OP.
+   */
+  NO_OP,
+  /**
+   * EQUALS op code for Attribute.
+   */
+  EQ,
+
+  /**
+   * NOT EQUALS op code for Attribute.
+   */
+  NE
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67ae81f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java
index 0fe8273..79196fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
 
 /**
  * {@code PlacementConstraint} represents a placement constraint for a resource
@@ -155,13 +156,22 @@ public class PlacementConstraint {
     private int minCardinality;
     private int maxCardinality;
     private Set<TargetExpression> targetExpressions;
+    private NodeAttributeOpCode attributeOpCode;
 
     public SingleConstraint(String scope, int minCardinality,
-        int maxCardinality, Set<TargetExpression> targetExpressions) {
+        int maxCardinality, NodeAttributeOpCode opCode,
+        Set<TargetExpression> targetExpressions) {
       this.scope = scope;
       this.minCardinality = minCardinality;
       this.maxCardinality = maxCardinality;
       this.targetExpressions = targetExpressions;
+      this.attributeOpCode = opCode;
+    }
+
+    public SingleConstraint(String scope, int minCardinality,
+        int maxCardinality, Set<TargetExpression> targetExpressions) {
+      this(scope, minCardinality, maxCardinality, NodeAttributeOpCode.NO_OP,
+          targetExpressions);
     }
 
     public SingleConstraint(String scope, int minC, int maxC,
@@ -169,6 +179,13 @@ public class PlacementConstraint {
       this(scope, minC, maxC, new HashSet<>(Arrays.asList(targetExpressions)));
     }
 
+    public SingleConstraint(String scope, int minC, int maxC,
+        NodeAttributeOpCode opCode,
+        TargetExpression... targetExpressions) {
+      this(scope, minC, maxC, opCode,
+          new HashSet<>(Arrays.asList(targetExpressions)));
+    }
+
     /**
      * Get the scope of the constraint.
      *
@@ -205,6 +222,15 @@ public class PlacementConstraint {
       return targetExpressions;
     }
 
+    /**
+     * Get the NodeAttributeOpCode of the constraint.
+     *
+     * @return nodeAttribute Op Code
+     */
+    public NodeAttributeOpCode getNodeAttributeOpCode() {
+      return attributeOpCode;
+    }
+
     @Override
     public boolean equals(Object o) {
       if (this == o) {
@@ -225,6 +251,10 @@ public class PlacementConstraint {
       if (!getScope().equals(that.getScope())) {
         return false;
       }
+      if (getNodeAttributeOpCode() != null && !getNodeAttributeOpCode()
+          .equals(that.getNodeAttributeOpCode())) {
+        return false;
+      }
       return getTargetExpressions().equals(that.getTargetExpressions());
     }
 
@@ -233,6 +263,7 @@ public class PlacementConstraint {
       int result = getScope().hashCode();
       result = 31 * result + getMinCardinality();
       result = 31 * result + getMaxCardinality();
+      result = 31 * result + getNodeAttributeOpCode().hashCode();
       result = 31 * result + getTargetExpressions().hashCode();
       return result;
     }
@@ -259,6 +290,13 @@ public class PlacementConstraint {
               .append(getScope()).append(",")
               .append(targetExpr)
               .toString());
+        } else if (min == -1 && max == -1) {
+          // node attribute
+          targetConstraints.add(new StringBuilder()
+              .append(getScope()).append(",")
+              .append(getNodeAttributeOpCode()).append(",")
+              .append(targetExpr)
+              .toString());
         } else {
           // cardinality
           targetConstraints.add(new StringBuilder()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67ae81f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
index d22a6bd..73fa328 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
+import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.DelayedOr;
@@ -86,6 +87,24 @@ public final class PlacementConstraints {
   }
 
   /**
+   * Creates a constraint that requires allocations to be placed on nodes that
+   * belong to a scope (e.g., node or rack) that satisfy any of the
+   * target expressions based on node attribute op code.
+   *
+   * @param scope the scope within which the target expressions should not be
+   *          true
+   * @param opCode Node Attribute code which could be equals, not equals.
+   * @param targetExpressions the expressions that need to not be true within
+   *          the scope
+   * @return the resulting placement constraint
+   */
+  public static AbstractConstraint targetNodeAttribute(String scope,
+      NodeAttributeOpCode opCode,
+      TargetExpression... targetExpressions) {
+    return new SingleConstraint(scope, -1, -1, opCode, targetExpressions);
+  }
+
+  /**
    * Creates a constraint that restricts the number of allocations within a
    * given scope (e.g., node or rack).
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67ae81f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java
index 2926c9d..93fd706 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.util.constraint;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
@@ -44,11 +45,12 @@ import java.util.regex.Pattern;
 @InterfaceStability.Unstable
 public final class PlacementConstraintParser {
 
+  public static final char EXPRESSION_VAL_DELIM = ',';
   private static final char EXPRESSION_DELIM = ':';
   private static final char KV_SPLIT_DELIM = '=';
-  private static final char EXPRESSION_VAL_DELIM = ',';
   private static final char BRACKET_START = '(';
   private static final char BRACKET_END = ')';
+  private static final String KV_NE_DELIM = "!=";
   private static final String IN = "in";
   private static final String NOT_IN = "notin";
   private static final String AND = "and";
@@ -350,6 +352,91 @@ public final class PlacementConstraintParser {
   }
 
   /**
+   * Constraint parser used to parse a given target expression.
+   */
+  public static class NodeConstraintParser extends ConstraintParser {
+
+    public NodeConstraintParser(String expression) {
+      super(new BaseStringTokenizer(expression,
+          String.valueOf(EXPRESSION_VAL_DELIM)));
+    }
+
+    @Override
+    public AbstractConstraint parse()
+        throws PlacementConstraintParseException {
+      PlacementConstraint.AbstractConstraint placementConstraints = null;
+      String attributeName = "";
+      NodeAttributeOpCode opCode = NodeAttributeOpCode.EQ;
+      String scope = SCOPE_NODE;
+
+      Set<String> constraintEntities = new TreeSet<>();
+      while (hasMoreTokens()) {
+        String currentTag = nextToken();
+        StringTokenizer attributeKV = getAttributeOpCodeTokenizer(currentTag);
+
+        // Usually there will be only one k=v pair. However in case when
+        // multiple values are present for same attribute, it will also be
+        // coming as next token. for example, java=1.8,1.9 or python!=2.
+        if (attributeKV.countTokens() > 1) {
+          opCode = getAttributeOpCode(currentTag);
+          attributeName = attributeKV.nextToken();
+          currentTag = attributeKV.nextToken();
+        }
+        constraintEntities.add(currentTag);
+      }
+
+      if(attributeName.isEmpty()) {
+        throw new PlacementConstraintParseException(
+            "expecting valid expression like k=v or k!=v, but get "
+                + constraintEntities);
+      }
+
+      PlacementConstraint.TargetExpression target = null;
+      if (!constraintEntities.isEmpty()) {
+        target = PlacementConstraints.PlacementTargets
+            .nodeAttribute(attributeName,
+                constraintEntities
+                    .toArray(new String[constraintEntities.size()]));
+      }
+
+      placementConstraints = PlacementConstraints
+          .targetNodeAttribute(scope, opCode, target);
+      return placementConstraints;
+    }
+
+    private StringTokenizer getAttributeOpCodeTokenizer(String currentTag) {
+      StringTokenizer attributeKV = new StringTokenizer(currentTag,
+          KV_NE_DELIM);
+
+      // Try with '!=' delim as well.
+      if (attributeKV.countTokens() < 2) {
+        attributeKV = new StringTokenizer(currentTag,
+            String.valueOf(KV_SPLIT_DELIM));
+      }
+      return attributeKV;
+    }
+
+    /**
+     * Below conditions are validated.
+     * java=8   : OpCode = EQUALS
+     * java!=8  : OpCode = NEQUALS
+     * @param currentTag tag
+     * @return Attribute op code.
+     */
+    private NodeAttributeOpCode getAttributeOpCode(String currentTag)
+        throws PlacementConstraintParseException {
+      if (currentTag.contains(KV_NE_DELIM)) {
+        return NodeAttributeOpCode.NE;
+      } else if (currentTag.contains(String.valueOf(KV_SPLIT_DELIM))) {
+        return NodeAttributeOpCode.EQ;
+      }
+      throw new PlacementConstraintParseException(
+          "expecting valid expression like k=v or k!=v, but get "
+              + currentTag);
+    }
+  }
+
+  /**
    * Constraint parser used to parse a given target expression, such as
    * "NOTIN, NODE, foo, bar".
    */
@@ -363,20 +450,23 @@ public final class PlacementConstraintParser {
     @Override
     public AbstractConstraint parse()
         throws PlacementConstraintParseException {
-      PlacementConstraint.AbstractConstraint placementConstraints;
+      PlacementConstraint.AbstractConstraint placementConstraints = null;
       String op = nextToken();
       if (op.equalsIgnoreCase(IN) || op.equalsIgnoreCase(NOT_IN)) {
         String scope = nextToken();
         scope = parseScope(scope);
 
-        Set<String> allocationTags = new TreeSet<>();
+        Set<String> constraintEntities = new TreeSet<>();
         while(hasMoreTokens()) {
           String tag = nextToken();
-          allocationTags.add(tag);
+          constraintEntities.add(tag);
+        }
+        PlacementConstraint.TargetExpression target = null;
+        if(!constraintEntities.isEmpty()) {
+          target = PlacementConstraints.PlacementTargets.allocationTag(
+              constraintEntities
+                  .toArray(new String[constraintEntities.size()]));
         }
-        PlacementConstraint.TargetExpression target =
-            PlacementConstraints.PlacementTargets.allocationTag(
-                allocationTags.toArray(new String[allocationTags.size()]));
         if (op.equalsIgnoreCase(IN)) {
           placementConstraints = PlacementConstraints
               .targetIn(scope, target);
@@ -551,6 +641,11 @@ public final class PlacementConstraintParser {
         constraintOptional = Optional.ofNullable(jp.tryParse());
       }
       if (!constraintOptional.isPresent()) {
+        NodeConstraintParser np =
+            new NodeConstraintParser(constraintStr);
+        constraintOptional = Optional.ofNullable(np.tryParse());
+      }
+      if (!constraintOptional.isPresent()) {
         throw new PlacementConstraintParseException(
             "Invalid constraint expression " + constraintStr);
       }
@@ -584,12 +679,13 @@ public final class PlacementConstraintParser {
    */
   public static Map<SourceTags, PlacementConstraint> parsePlacementSpec(
       String expression) throws PlacementConstraintParseException {
+    // Continue handling for application tag based constraint otherwise.
     // Respect insertion order.
     Map<SourceTags, PlacementConstraint> result = new LinkedHashMap<>();
     PlacementConstraintParser.ConstraintTokenizer tokenizer =
         new PlacementConstraintParser.MultipleConstraintsTokenizer(expression);
     tokenizer.validate();
-    while(tokenizer.hasMoreElements()) {
+    while (tokenizer.hasMoreElements()) {
       String specStr = tokenizer.nextElement();
       // each spec starts with sourceAllocationTag=numOfContainers and
       // followed by a constraint expression.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67ae81f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 10b36c7..5fe2cc9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -646,11 +646,18 @@ message PlacementConstraintProto {
   optional CompositePlacementConstraintProto compositeConstraint = 2;
 }
 
+enum NodeAttributeOpCodeProto {
+  NO_OP = 1;
+  EQ = 2;
+  NE = 3;
+}
+
 message SimplePlacementConstraintProto {
   required string scope = 1;
   repeated PlacementConstraintTargetProto targetExpressions = 2;
   optional int32 minCardinality = 3;
   optional int32 maxCardinality = 4;
+  optional NodeAttributeOpCodeProto attributeOpCode = 5;
 }
 
 message PlacementConstraintTargetProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67ae81f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java
index a69571c..9806ba4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java
@@ -22,6 +22,8 @@ import com.google.common.collect.Sets;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.Or;
@@ -38,8 +40,14 @@ import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.Multiple
 import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTagsTokenizer;
 import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.ConstraintTokenizer;
 
-import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.*;
 import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.cardinality;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.or;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNodeAttribute;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -443,4 +451,55 @@ public class TestPlacementConstraintParser {
           + constrainExpr + ", caused by: " + e.getMessage());
     }
   }
+
+  @Test
+  public void testParseNodeAttributeSpec()
+      throws PlacementConstraintParseException {
+    Map<SourceTags, PlacementConstraint> result;
+    PlacementConstraint.AbstractConstraint expectedPc1, expectedPc2;
+    PlacementConstraint actualPc1, actualPc2;
+
+    // A single node attribute constraint
+    result = PlacementConstraintParser
+        .parsePlacementSpec("xyz=4,rm.yarn.io/foo=true");
+    Assert.assertEquals(1, result.size());
+    TargetExpression target = PlacementTargets
+        .nodeAttribute("rm.yarn.io/foo", "true");
+    expectedPc1 = targetNodeAttribute("node", NodeAttributeOpCode.EQ, target);
+
+    actualPc1 = result.values().iterator().next();
+    Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
+
+    // A single node attribute constraint
+    result = PlacementConstraintParser
+        .parsePlacementSpec("xyz=3,rm.yarn.io/foo!=abc");
+    Assert.assertEquals(1, result.size());
+    target = PlacementTargets
+        .nodeAttribute("rm.yarn.io/foo", "abc");
+    expectedPc1 = targetNodeAttribute("node", NodeAttributeOpCode.NE, target);
+
+    actualPc1 = result.values().iterator().next();
+    Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
+
+    actualPc1 = result.values().iterator().next();
+    Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
+
+    // A single node attribute constraint
+    result = PlacementConstraintParser
+        .parsePlacementSpec(
+            "xyz=1,rm.yarn.io/foo!=abc:zxy=1,rm.yarn.io/bar=true");
+    Assert.assertEquals(2, result.size());
+    target = PlacementTargets
+        .nodeAttribute("rm.yarn.io/foo", "abc");
+    expectedPc1 = targetNodeAttribute("node", NodeAttributeOpCode.NE, target);
+    target = PlacementTargets
+        .nodeAttribute("rm.yarn.io/bar", "true");
+    expectedPc2 = targetNodeAttribute("node", NodeAttributeOpCode.EQ, target);
+
+    Iterator<PlacementConstraint> valueIt = result.values().iterator();
+    actualPc1 = valueIt.next();
+    actualPc2 = valueIt.next();
+    Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
+    Assert.assertEquals(expectedPc2, actualPc2.getConstraintExpr());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67ae81f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index ecf07b1..09a796e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -523,9 +523,13 @@ public class ApplicationMaster {
 
     if (cliParser.hasOption("placement_spec")) {
       String placementSpec = cliParser.getOptionValue("placement_spec");
-      LOG.info("Placement Spec received [{}]", placementSpec);
-      parsePlacementSpecs(placementSpec);
+      String decodedSpec = getDecodedPlacementSpec(placementSpec);
+      LOG.info("Placement Spec received [{}]", decodedSpec);
+
+      this.numTotalContainers = 0;
+      parsePlacementSpecs(decodedSpec);
       LOG.info("Total num containers requested [{}]", numTotalContainers);
+
       if (numTotalContainers == 0) {
         throw new IllegalArgumentException(
             "Cannot run distributed shell with no containers");
@@ -694,23 +698,25 @@ public class ApplicationMaster {
     return true;
   }
 
-  private void parsePlacementSpecs(String placementSpecifications) {
-    // Client sends placement spec in encoded format
-    Base64.Decoder decoder = Base64.getDecoder();
-    byte[] decodedBytes = decoder.decode(
-        placementSpecifications.getBytes(StandardCharsets.UTF_8));
-    String decodedSpec = new String(decodedBytes, StandardCharsets.UTF_8);
-    LOG.info("Decode placement spec: " + decodedSpec);
+  private void parsePlacementSpecs(String decodedSpec) {
     Map<String, PlacementSpec> pSpecs =
         PlacementSpec.parse(decodedSpec);
     this.placementSpecs = new HashMap<>();
-    this.numTotalContainers = 0;
     for (PlacementSpec pSpec : pSpecs.values()) {
-      this.numTotalContainers += pSpec.numContainers;
+      this.numTotalContainers += pSpec.getNumContainers();
       this.placementSpecs.put(pSpec.sourceTag, pSpec);
     }
   }
 
+  private String getDecodedPlacementSpec(String placementSpecifications) {
+    Base64.Decoder decoder = Base64.getDecoder();
+    byte[] decodedBytes = decoder.decode(
+        placementSpecifications.getBytes(StandardCharsets.UTF_8));
+    String decodedSpec = new String(decodedBytes, StandardCharsets.UTF_8);
+    LOG.info("Decode placement spec: " + decodedSpec);
+    return decodedSpec;
+  }
+
   /**
    * Helper function to print usage
    *
@@ -798,6 +804,7 @@ public class ApplicationMaster {
         }
       }
     }
+
     RegisterApplicationMasterResponse response = amRMClient
         .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
             appMasterTrackingUrl, placementConstraintMap);
@@ -845,14 +852,18 @@ public class ApplicationMaster {
     // Keep looping until all the containers are launched and shell script
     // executed on them ( regardless of success/failure).
     if (this.placementSpecs == null) {
+      LOG.info("placementSpecs null");
       for (int i = 0; i < numTotalContainersToRequest; ++i) {
         ContainerRequest containerAsk = setupContainerAskForRM();
         amRMClient.addContainerRequest(containerAsk);
       }
     } else {
+      LOG.info("placementSpecs to create req:" + placementSpecs);
       List<SchedulingRequest> schedReqs = new ArrayList<>();
       for (PlacementSpec pSpec : this.placementSpecs.values()) {
-        for (int i = 0; i < pSpec.numContainers; i++) {
+        LOG.info("placementSpec :" + pSpec + ", container:" + pSpec
+            .getNumContainers());
+        for (int i = 0; i < pSpec.getNumContainers(); i++) {
           SchedulingRequest sr = setupSchedulingRequest(pSpec);
           schedReqs.add(sr);
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67ae81f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index 9da9288..e8b69fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -103,7 +103,7 @@ import org.slf4j.LoggerFactory;
  * the provided shell command on a set of containers. </p>
  * 
  * <p>This client is meant to act as an example on how to write yarn-based applications. </p>
- * 
+ *
  * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code> 
  * aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol} 
  * provides a way for the client to get access to cluster information and to request for a
@@ -192,6 +192,8 @@ public class Client {
 
   // Placement specification
   private String placementSpec = "";
+  // Node Attribute specification
+  private String nodeAttributeSpec = "";
   // log4j.properties file 
   // if available, add to local resources and set into classpath 
   private String log4jPropFile = "";	
@@ -448,6 +450,7 @@ public class Client {
       // Check if it is parsable
       PlacementSpec.parse(this.placementSpec);
     }
+
     appName = cliParser.getOptionValue("appname", "DistributedShell");
     amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
     amQueue = cliParser.getOptionValue("queue", "default");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67ae81f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
index 2909259..ceaa37d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java
@@ -37,8 +37,8 @@ public class PlacementSpec {
       LoggerFactory.getLogger(PlacementSpec.class);
 
   public final String sourceTag;
-  public final int numContainers;
   public final PlacementConstraint constraint;
+  private int numContainers;
 
   public PlacementSpec(String sourceTag, int numContainers,
       PlacementConstraint constraint) {
@@ -47,6 +47,22 @@ public class PlacementSpec {
     this.constraint = constraint;
   }
 
+  /**
+   * Get the number of container for this spec.
+   * @return container count
+   */
+  public int getNumContainers() {
+    return numContainers;
+  }
+
+  /**
+   * Set number of containers for this spec.
+   * @param numContainers number of containers.
+   */
+  public void setNumContainers(int numContainers) {
+    this.numContainers = numContainers;
+  }
+
   // Placement specification should be of the form:
   // PlacementSpec => ""|KeyVal;PlacementSpec
   // KeyVal => SourceTag=Constraint
@@ -71,6 +87,7 @@ public class PlacementSpec {
   public static Map<String, PlacementSpec> parse(String specs)
       throws IllegalArgumentException {
     LOG.info("Parsing Placement Specs: [{}]", specs);
+
     Map<String, PlacementSpec> pSpecs = new HashMap<>();
     Map<SourceTags, PlacementConstraint> parsed;
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67ae81f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintFromProtoConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintFromProtoConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintFromProtoConverter.java
index 926b6fa..447905e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintFromProtoConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintFromProtoConverter.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
@@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TimedPlacementConstraint;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnProtos.CompositePlacementConstraintProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeOpCodeProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.PlacementConstraintProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.PlacementConstraintTargetProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.SimplePlacementConstraintProto;
@@ -73,7 +75,8 @@ public class PlacementConstraintFromProtoConverter {
     }
 
     return new SingleConstraint(proto.getScope(), proto.getMinCardinality(),
-        proto.getMaxCardinality(), targets);
+        proto.getMaxCardinality(),
+        convertFromProtoFormat(proto.getAttributeOpCode()), targets);
   }
 
   private TargetExpression convert(PlacementConstraintTargetProto proto) {
@@ -113,4 +116,9 @@ public class PlacementConstraintFromProtoConverter {
     return new TimedPlacementConstraint(pConstraint, proto.getSchedulingDelay(),
         ProtoUtils.convertFromProtoFormat(proto.getDelayUnit()));
   }
+
+  private static NodeAttributeOpCode convertFromProtoFormat(
+      NodeAttributeOpCodeProto p) {
+    return NodeAttributeOpCode.valueOf(p.name());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67ae81f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintToProtoConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintToProtoConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintToProtoConverter.java
index 7816e18..30f7741 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintToProtoConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintToProtoConverter.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.api.pb;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
@@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TimedPlacementCon
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnProtos.CompositePlacementConstraintProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.CompositePlacementConstraintProto.CompositeType;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeOpCodeProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.PlacementConstraintProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.PlacementConstraintTargetProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.SimplePlacementConstraintProto;
@@ -72,6 +74,10 @@ public class PlacementConstraintToProtoConverter
     }
     sb.setMinCardinality(constraint.getMinCardinality());
     sb.setMaxCardinality(constraint.getMaxCardinality());
+    if (constraint.getNodeAttributeOpCode() != null) {
+      sb.setAttributeOpCode(
+          convertToProtoFormat(constraint.getNodeAttributeOpCode()));
+    }
     if (constraint.getTargetExpressions() != null) {
       for (TargetExpression target : constraint.getTargetExpressions()) {
         sb.addTargetExpressions(
@@ -171,4 +177,9 @@ public class PlacementConstraintToProtoConverter
 
     return tb.build();
   }
+
+  private static NodeAttributeOpCodeProto convertToProtoFormat(
+      NodeAttributeOpCode p) {
+    return NodeAttributeOpCodeProto.valueOf(p.name());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67ae81f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 81ef337..16f019f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -519,9 +519,10 @@ public class ResourceManager extends CompositeService
     return new RMNodeLabelsManager();
   }
 
-  protected NodeAttributesManager createNodeAttributesManager()
-      throws InstantiationException, IllegalAccessException {
-    return new NodeAttributesManagerImpl();
+  protected NodeAttributesManager createNodeAttributesManager() {
+    NodeAttributesManagerImpl namImpl = new NodeAttributesManagerImpl();
+    namImpl.setRMContext(rmContext);
+    return namImpl;
   }
 
   protected AllocationTagsManager createAllocationTagsManager() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67ae81f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
index fac2dfd..9111d0f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
@@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.nodelabels.RMNodeAttribute;
 import org.apache.hadoop.yarn.nodelabels.StringAttributeValue;
 import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAttributesUpdateSchedulerEvent;
 
 import com.google.common.base.Strings;
 
@@ -92,6 +94,7 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
 
   private final ReadLock readLock;
   private final WriteLock writeLock;
+  private RMContext rmContext = null;
 
   public NodeAttributesManagerImpl() {
     super("NodeAttributesManagerImpl");
@@ -131,7 +134,7 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
   }
 
   protected void initNodeAttributeStore(Configuration conf) throws Exception {
-    this.store =getAttributeStoreClass(conf);
+    this.store = getAttributeStoreClass(conf);
     this.store.init(conf, this);
     this.store.recover();
   }
@@ -206,6 +209,21 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
             .handle(new NodeAttributesStoreEvent(nodeAttributeMapping, op));
       }
 
+      // Map used to notify RM
+      Map<String, Set<NodeAttribute>> newNodeToAttributesMap =
+          new HashMap<String, Set<NodeAttribute>>();
+      nodeAttributeMapping.forEach((k, v) -> {
+        Host node = nodeCollections.get(k);
+        newNodeToAttributesMap.put(k, node.attributes.keySet());
+      });
+
+      // Notify RM
+      if (rmContext != null && rmContext.getDispatcher() != null) {
+        LOG.info("Updated NodeAttribute event to RM:" + newNodeToAttributesMap
+            .values());
+        rmContext.getDispatcher().getEventHandler().handle(
+            new NodeAttributesUpdateSchedulerEvent(newNodeToAttributesMap));
+      }
     } finally {
       writeLock.unlock();
     }
@@ -703,4 +721,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
       store.close();
     }
   }
+
+  public void setRMContext(RMContext context) {
+    this.rmContext  = context;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67ae81f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 59771fd..b35aeba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
@@ -79,6 +80,8 @@ public abstract class SchedulerNode {
 
   private volatile Set<String> labels = null;
 
+  private volatile Set<NodeAttribute> nodeAttributes = null;
+
   // Last updated time
   private volatile long lastHeartbeatMonotonicTime;
 
@@ -503,6 +506,14 @@ public abstract class SchedulerNode {
     return getNodeID().hashCode();
   }
 
+  public Set<NodeAttribute> getNodeAttributes() {
+    return nodeAttributes;
+  }
+
+  public void updateNodeAttributes(Set<NodeAttribute> attributes) {
+    this.nodeAttributes = attributes;
+  }
+
   private static class ContainerInfo {
     private final RMContainer container;
     private boolean launchedOnNode;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67ae81f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 81dcf86..a1d3f60 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -137,6 +138,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAttributesUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@@ -1767,6 +1769,14 @@ public class CapacityScheduler extends
       updateNodeLabelsAndQueueResource(labelUpdateEvent);
     }
     break;
+    case NODE_ATTRIBUTES_UPDATE:
+    {
+      NodeAttributesUpdateSchedulerEvent attributeUpdateEvent =
+          (NodeAttributesUpdateSchedulerEvent) event;
+
+      updateNodeAttributes(attributeUpdateEvent);
+    }
+    break;
     case NODE_UPDATE:
     {
       NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
@@ -1900,6 +1910,30 @@ public class CapacityScheduler extends
     }
   }
 
+  private void updateNodeAttributes(
+      NodeAttributesUpdateSchedulerEvent attributeUpdateEvent) {
+    try {
+      writeLock.lock();
+      for (Entry<String, Set<NodeAttribute>> entry : attributeUpdateEvent
+          .getUpdatedNodeToAttributes().entrySet()) {
+        String hostname = entry.getKey();
+        Set<NodeAttribute> attributes = entry.getValue();
+        List<NodeId> nodeIds = nodeTracker.getNodeIdsByResourceName(hostname);
+        updateAttributesOnNode(nodeIds, attributes);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private void updateAttributesOnNode(List<NodeId> nodeIds,
+      Set<NodeAttribute> attributes) {
+    nodeIds.forEach((k) -> {
+      SchedulerNode node = nodeTracker.getNode(k);
+      node.updateNodeAttributes(attributes);
+    });
+  }
+
   /**
    * Process node labels update.
    */
@@ -2768,7 +2802,7 @@ public class CapacityScheduler extends
               schedulingRequest, schedulerNode,
               rmContext.getPlacementConstraintManager(),
               rmContext.getAllocationTagsManager())) {
-            LOG.debug("Failed to allocate container for application "
+            LOG.info("Failed to allocate container for application "
                 + appAttempt.getApplicationId() + " on node "
                 + schedulerNode.getNodeName()
                 + " because this allocation violates the"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67ae81f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
index f47e1d4..ccd334c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
@@ -24,8 +24,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
@@ -114,22 +113,92 @@ public final class PlacementConstraintsUtil {
             || maxScopeCardinality <= desiredMaxCardinality);
   }
 
-  private static boolean canSatisfyNodePartitionConstraintExpresssion(
-      TargetExpression targetExpression, SchedulerNode schedulerNode) {
+  private static boolean canSatisfyNodeConstraintExpresssion(
+      SingleConstraint sc, TargetExpression targetExpression,
+      SchedulerNode schedulerNode) {
     Set<String> values = targetExpression.getTargetValues();
-    if (values == null || values.isEmpty()) {
-      return schedulerNode.getPartition().equals(
-          RMNodeLabelsManager.NO_LABEL);
-    } else{
-      String nodePartition = values.iterator().next();
-      if (!nodePartition.equals(schedulerNode.getPartition())) {
+
+    if (targetExpression.getTargetKey().equals(NODE_PARTITION)) {
+      if (values == null || values.isEmpty()) {
+        return schedulerNode.getPartition()
+            .equals(RMNodeLabelsManager.NO_LABEL);
+      } else {
+        String nodePartition = values.iterator().next();
+        if (!nodePartition.equals(schedulerNode.getPartition())) {
+          return false;
+        }
+      }
+    } else {
+      NodeAttributeOpCode opCode = sc.getNodeAttributeOpCode();
+      // compare attributes.
+      String inputAttribute = values.iterator().next();
+      NodeAttribute requestAttribute = getNodeConstraintFromRequest(
+          targetExpression.getTargetKey(), inputAttribute);
+      if (requestAttribute == null) {
+        return true;
+      }
+
+      if (schedulerNode.getNodeAttributes() == null ||
+          !schedulerNode.getNodeAttributes().contains(requestAttribute)) {
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("Incoming requestAttribute:" + requestAttribute
+              + "is not present in " + schedulerNode.getNodeID());
+        }
+        return false;
+      }
+      boolean found = false;
+      for (Iterator<NodeAttribute> it = schedulerNode.getNodeAttributes()
+          .iterator(); it.hasNext();) {
+        NodeAttribute nodeAttribute = it.next();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Starting to compare Incoming requestAttribute :"
+              + requestAttribute
+              + " with requestAttribute value= " + requestAttribute
+              .getAttributeValue()
+              + ", stored nodeAttribute value=" + nodeAttribute
+              .getAttributeValue());
+        }
+        if (requestAttribute.equals(nodeAttribute)) {
+          if (isOpCodeMatches(requestAttribute, nodeAttribute, opCode)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "Incoming requestAttribute:" + requestAttribute
+                      + " matches with node:" + schedulerNode.getNodeID());
+            }
+            found = true;
+            return found;
+          }
+        }
+      }
+      if (!found) {
+        if(LOG.isDebugEnabled()) {
+          LOG.info("skip this node:" + schedulerNode.getNodeID()
+              + " for requestAttribute:" + requestAttribute);
+        }
         return false;
       }
     }
-
     return true;
   }
 
+  private static boolean isOpCodeMatches(NodeAttribute requestAttribute,
+      NodeAttribute nodeAttribute, NodeAttributeOpCode opCode) {
+    boolean retCode = false;
+    switch (opCode) {
+    case EQ:
+      retCode = requestAttribute.getAttributeValue()
+          .equals(nodeAttribute.getAttributeValue());
+      break;
+    case NE:
+      retCode = !(requestAttribute.getAttributeValue()
+          .equals(nodeAttribute.getAttributeValue()));
+      break;
+    default:
+      break;
+    }
+    return retCode;
+  }
+
   private static boolean canSatisfySingleConstraint(ApplicationId applicationId,
       SingleConstraint singleConstraint, SchedulerNode schedulerNode,
       AllocationTagsManager tagsManager)
@@ -146,10 +215,12 @@ public final class PlacementConstraintsUtil {
             singleConstraint, currentExp, schedulerNode, tagsManager)) {
           return false;
         }
-      } else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE)
-          && currentExp.getTargetKey().equals(NODE_PARTITION)) {
-        // This is a node partition expression, check it.
-        canSatisfyNodePartitionConstraintExpresssion(currentExp, schedulerNode);
+      } else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE)) {
+        // This is a node attribute expression, check it.
+        if (!canSatisfyNodeConstraintExpresssion(singleConstraint, currentExp,
+            schedulerNode)) {
+          return false;
+        }
       }
     }
     // return true if all targetExpressions are satisfied
@@ -203,6 +274,11 @@ public final class PlacementConstraintsUtil {
       AllocationTagsManager atm)
       throws InvalidAllocationTagsQueryException {
     if (constraint == null) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Constraint is found empty during constraint validation for app:"
+                + appId);
+      }
       return true;
     }
 
@@ -263,4 +339,24 @@ public final class PlacementConstraintsUtil {
         pcm.getMultilevelConstraint(applicationId, sourceTags, pc),
         schedulerNode, atm);
   }
+
+  private static NodeAttribute getNodeConstraintFromRequest(String attrKey,
+      String attrString) {
+    NodeAttribute nodeAttribute = null;
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Incoming node attribute: " + attrKey + "=" + attrString);
+    }
+
+    // Input node attribute could be like 1.8
+    String[] name = attrKey.split("/");
+    if (name == null || name.length == 1) {
+      nodeAttribute = NodeAttribute
+          .newInstance(attrKey, NodeAttributeType.STRING, attrString);
+    } else {
+      nodeAttribute = NodeAttribute
+          .newInstance(name[0], name[1], NodeAttributeType.STRING, attrString);
+    }
+
+    return nodeAttribute;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67ae81f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java
new file mode 100644
index 0000000..cdc0b69
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+
+/**
+ * Event handler class for Node Attributes which sends events to Scheduler.
+ */
+public class NodeAttributesUpdateSchedulerEvent extends SchedulerEvent {
+  private Map<String, Set<NodeAttribute>> nodeToAttributes;
+
+  public NodeAttributesUpdateSchedulerEvent(
+      Map<String, Set<NodeAttribute>> newNodeToAttributesMap) {
+    super(SchedulerEventType.NODE_ATTRIBUTES_UPDATE);
+    this.nodeToAttributes = newNodeToAttributesMap;
+  }
+
+  public Map<String, Set<NodeAttribute>> getUpdatedNodeToAttributes() {
+    return nodeToAttributes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67ae81f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
index b107cf4..869bf0ed9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
@@ -26,6 +26,7 @@ public enum SchedulerEventType {
   NODE_UPDATE,
   NODE_RESOURCE_UPDATE,
   NODE_LABELS_UPDATE,
+  NODE_ATTRIBUTES_UPDATE,
 
   // Source: RMApp
   APP_ADDED,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67ae81f0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
index 9d30e90..4557350 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
@@ -396,6 +396,10 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
       SchedulingMode schedulingMode) {
     // We will only look at node label = nodeLabelToLookAt according to
     // schedulingMode and partition of node.
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("precheckNode is invoked for " + schedulerNode.getNodeID() + ","
+          + schedulingMode);
+    }
     String nodePartitionToLookAt;
     if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
       nodePartitionToLookAt = schedulerNode.getPartition();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message