hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [2/2] hadoop git commit: YARN-8002. Support NOT_SELF and ALL namespace types for allocation tag. (Weiwei Yang via wangda)
Date Mon, 19 Mar 2018 18:06:59 GMT
YARN-8002. Support NOT_SELF and ALL namespace types for allocation tag. (Weiwei Yang via wangda)

Change-Id: I63b4e4192a95bf7ded98c54e46a2871c72869700


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a08921ca
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a08921ca
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a08921ca

Branch: refs/heads/trunk
Commit: a08921ca6cb1dad98935808c8f474b654f861263
Parents: d67a5e2
Author: Wangda Tan <wangda@apache.org>
Authored: Mon Mar 19 11:04:27 2018 -0700
Committer: Wangda Tan <wangda@apache.org>
Committed: Mon Mar 19 11:04:27 2018 -0700

----------------------------------------------------------------------
 .../api/records/AllocationTagNamespace.java     | 336 -------------------
 .../api/records/AllocationTagNamespaceType.java |  29 --
 .../hadoop/yarn/api/records/AllocationTags.java |  50 ---
 .../hadoop/yarn/api/records/Evaluable.java      |  38 ---
 .../yarn/api/records/TargetApplications.java    |  53 ---
 .../yarn/api/resource/PlacementConstraints.java |  58 +++-
 .../InvalidAllocationTagException.java          |  34 --
 .../constraint/AllocationTagNamespace.java      | 312 +++++++++++++++++
 .../scheduler/constraint/AllocationTags.java    |  82 +++++
 .../constraint/AllocationTagsManager.java       | 146 ++++++--
 .../scheduler/constraint/Evaluable.java         |  38 +++
 .../constraint/PlacementConstraintsUtil.java    |  55 +--
 .../constraint/TargetApplications.java          |  51 +++
 .../algorithm/LocalAllocationTagsManager.java   |  27 +-
 .../SingleConstraintAppPlacementAllocator.java  |  17 +-
 .../yarn/server/resourcemanager/MockAM.java     |  11 +-
 .../rmcontainer/TestRMContainerImpl.java        |  47 ++-
 ...estSchedulingRequestContainerAllocation.java | 126 +++++++
 .../constraint/TestAllocationTagsManager.java   | 272 ++++++++++++---
 .../constraint/TestAllocationTagsNamespace.java |  36 +-
 .../TestPlacementConstraintsUtil.java           | 247 +++++++++++++-
 .../TestLocalAllocationTagsManager.java         |  33 +-
 ...stSingleConstraintAppPlacementAllocator.java |   9 +-
 23 files changed, 1383 insertions(+), 724 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08921ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java
deleted file mode 100644
index 25f8761..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.api.records;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableSet;
-import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.SELF;
-import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.NOT_SELF;
-import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_LABEL;
-import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_ID;
-import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.ALL;
-import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.fromString;
-
-/**
- * Class to describe the namespace of an allocation tag.
- * Each namespace can be evaluated against a set of applications.
- * After evaluation, the namespace should have an implicit set of
- * applications which defines its scope.
- */
-public abstract class AllocationTagNamespace implements
-    Evaluable<TargetApplications> {
-
-  public final static String NAMESPACE_DELIMITER = "/";
-
-  private AllocationTagNamespaceType nsType;
-  // Namespace scope value will be delay binding by eval method.
-  private Set<ApplicationId> nsScope;
-
-  public AllocationTagNamespace(AllocationTagNamespaceType
-      allocationTagNamespaceType) {
-    this.nsType = allocationTagNamespaceType;
-  }
-
-  protected void setScopeIfNotNull(Set<ApplicationId> appIds) {
-    if (appIds != null) {
-      this.nsScope = appIds;
-    }
-  }
-
-  /**
-   * Get the type of the namespace.
-   * @return namespace type.
-   */
-  public AllocationTagNamespaceType getNamespaceType() {
-    return nsType;
-  }
-
-  /**
-   * Get the scope of the namespace, in form of a set of applications.
-   * Before calling this method, {@link #evaluate(TargetApplications)}
-   * must be called in prior to ensure the scope is proper evaluated.
-   *
-   * @return a set of applications.
-   */
-  public Set<ApplicationId> getNamespaceScope() {
-    if (this.nsScope == null) {
-      throw new IllegalStateException("Invalid namespace scope,"
-          + " it is not initialized. Evaluate must be called before"
-          + " a namespace can be consumed.");
-    }
-    return this.nsScope;
-  }
-
-  @Override
-  public abstract void evaluate(TargetApplications target)
-      throws InvalidAllocationTagException;
-
-  /**
-   * @return true if the namespace is effective in all applications
-   * in this cluster. Specifically the namespace prefix should be
-   * "all".
-   */
-  public boolean isGlobal() {
-    return AllocationTagNamespaceType.ALL.equals(getNamespaceType());
-  }
-
-  /**
-   * @return true if the namespace is effective within a single application
-   * by its application ID, the namespace prefix should be "app-id";
-   * false otherwise.
-   */
-  public boolean isSingleInterApp() {
-    return AllocationTagNamespaceType.APP_ID.equals(getNamespaceType());
-  }
-
-  /**
-   * @return true if the namespace is effective to the application itself,
-   * the namespace prefix should be "self"; false otherwise.
-   */
-  public boolean isIntraApp() {
-    return AllocationTagNamespaceType.SELF.equals(getNamespaceType());
-  }
-
-  /**
-   * @return true if the namespace is effective to all applications except
-   * itself, the namespace prefix should be "not-self"; false otherwise.
-   */
-  public boolean isNotSelf() {
-    return AllocationTagNamespaceType.NOT_SELF.equals(getNamespaceType());
-  }
-
-  /**
-   * @return true if the namespace is effective to a group of applications
-   * identified by a application label, the namespace prefix should be
-   * "app-label"; false otherwise.
-   */
-  public boolean isAppLabel() {
-    return AllocationTagNamespaceType.APP_LABEL.equals(getNamespaceType());
-  }
-
-  @Override
-  public String toString() {
-    return this.nsType.toString();
-  }
-
-  /**
-   * Namespace within application itself.
-   */
-  public static class Self extends AllocationTagNamespace {
-
-    public Self() {
-      super(SELF);
-    }
-
-    @Override
-    public void evaluate(TargetApplications target)
-        throws InvalidAllocationTagException {
-      if (target == null || target.getCurrentApplicationId() == null) {
-        throw new InvalidAllocationTagException("Namespace Self must"
-            + " be evaluated against a single application ID.");
-      }
-      ApplicationId applicationId = target.getCurrentApplicationId();
-      setScopeIfNotNull(ImmutableSet.of(applicationId));
-    }
-  }
-
-  /**
-   * Namespace to all applications except itself.
-   */
-  public static class NotSelf extends AllocationTagNamespace {
-
-    private ApplicationId applicationId;
-
-    public NotSelf() {
-      super(NOT_SELF);
-    }
-
-    /**
-     * The scope of self namespace is to an application itself,
-     * the application ID can be delay binding to the namespace.
-     *
-     * @param appId application ID.
-     */
-    public void setApplicationId(ApplicationId appId) {
-      this.applicationId = appId;
-    }
-
-    public ApplicationId getApplicationId() {
-      return this.applicationId;
-    }
-
-    @Override
-    public void evaluate(TargetApplications target) {
-      Set<ApplicationId> otherAppIds = target.getOtherApplicationIds();
-      setScopeIfNotNull(otherAppIds);
-    }
-  }
-
-  /**
-   * Namespace to all applications in the cluster.
-   */
-  public static class All extends AllocationTagNamespace {
-
-    public All() {
-      super(ALL);
-    }
-
-    @Override
-    public void evaluate(TargetApplications target) {
-      Set<ApplicationId> allAppIds = target.getAllApplicationIds();
-      setScopeIfNotNull(allAppIds);
-    }
-  }
-
-  /**
-   * Namespace to all applications in the cluster.
-   */
-  public static class AppLabel extends AllocationTagNamespace {
-
-    public AppLabel() {
-      super(APP_LABEL);
-    }
-
-    @Override
-    public void evaluate(TargetApplications target) {
-      // TODO Implement app-label namespace evaluation
-    }
-  }
-
-  /**
-   * Namespace defined by a certain application ID.
-   */
-  public static class AppID extends AllocationTagNamespace {
-
-    private ApplicationId targetAppId;
-    // app-id namespace requires an extra value of an application id.
-    public AppID(ApplicationId applicationId) {
-      super(APP_ID);
-      this.targetAppId = applicationId;
-    }
-
-    @Override
-    public void evaluate(TargetApplications target) {
-      setScopeIfNotNull(ImmutableSet.of(targetAppId));
-    }
-
-    @Override
-    public String toString() {
-      return APP_ID.toString() + NAMESPACE_DELIMITER + this.targetAppId;
-    }
-  }
-
-  /**
-   * Parse namespace from a string. The string must be in legal format
-   * defined by each {@link AllocationTagNamespaceType}.
-   *
-   * @param namespaceStr namespace string.
-   * @return an instance of {@link AllocationTagNamespace}.
-   * @throws InvalidAllocationTagException
-   * if given string is not in valid format
-   */
-  public static AllocationTagNamespace parse(String namespaceStr)
-      throws InvalidAllocationTagException {
-    // Return the default namespace if no valid string is given.
-    if (Strings.isNullOrEmpty(namespaceStr)) {
-      return new Self();
-    }
-
-    // Normalize the input, escape additional chars.
-    List<String> nsValues = normalize(namespaceStr);
-    // The first string should be the prefix.
-    String nsPrefix = nsValues.get(0);
-    AllocationTagNamespaceType allocationTagNamespaceType =
-        fromString(nsPrefix);
-    switch (allocationTagNamespaceType) {
-    case SELF:
-      return new Self();
-    case NOT_SELF:
-      return new NotSelf();
-    case ALL:
-      return new All();
-    case APP_ID:
-      if (nsValues.size() != 2) {
-        throw new InvalidAllocationTagException(
-            "Missing the application ID in the namespace string: "
-                + namespaceStr);
-      }
-      String appIDStr = nsValues.get(1);
-      return parseAppID(appIDStr);
-    case APP_LABEL:
-      return new AppLabel();
-    default:
-      throw new InvalidAllocationTagException(
-          "Invalid namespace string " + namespaceStr);
-    }
-  }
-
-  private static AllocationTagNamespace parseAppID(String appIDStr)
-      throws InvalidAllocationTagException {
-    try {
-      ApplicationId applicationId = ApplicationId.fromString(appIDStr);
-      return new AppID(applicationId);
-    } catch (IllegalArgumentException e) {
-      throw new InvalidAllocationTagException(
-          "Invalid application ID for "
-              + APP_ID.getTypeKeyword() + ": " + appIDStr);
-    }
-  }
-
-  /**
-   * Valid given namespace string and parse it to a list of sub-strings
-   * that can be consumed by the parser according to the type of the
-   * namespace. Currently the size of return list should be either 1 or 2.
-   * Extra slash is escaped during the normalization.
-   *
-   * @param namespaceStr namespace string.
-   * @return a list of parsed strings.
-   * @throws InvalidAllocationTagException
-   * if namespace format is unexpected.
-   */
-  private static List<String> normalize(String namespaceStr)
-      throws InvalidAllocationTagException {
-    List<String> result = new ArrayList<>();
-    if (namespaceStr == null) {
-      return result;
-    }
-
-    String[] nsValues = namespaceStr.split(NAMESPACE_DELIMITER);
-    for (String str : nsValues) {
-      if (!Strings.isNullOrEmpty(str)) {
-        result.add(str);
-      }
-    }
-
-    // Currently we only allow 1 or 2 values for a namespace string
-    if (result.size() == 0 || result.size() > 2) {
-      throw new InvalidAllocationTagException("Invalid namespace string: "
-          + namespaceStr + ", the syntax is <namespace_prefix> or"
-          + " <namespace_prefix>/<namespace_value>");
-    }
-
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08921ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java
index 5e46cd0..de5492e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java
@@ -18,12 +18,6 @@
 
 package org.apache.hadoop.yarn.api.records;
 
-import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
-
-import java.util.Arrays;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 /**
  * Class to describe all supported forms of namespaces for an allocation tag.
  */
@@ -44,29 +38,6 @@ public enum AllocationTagNamespaceType {
     return this.typeKeyword;
   }
 
-  /**
-   * Parses the namespace type from a given string.
-   * @param prefix namespace prefix.
-   * @return namespace type.
-   * @throws InvalidAllocationTagException
-   */
-  public static AllocationTagNamespaceType fromString(String prefix) throws
-      InvalidAllocationTagException {
-    for (AllocationTagNamespaceType type :
-        AllocationTagNamespaceType.values()) {
-      if(type.getTypeKeyword().equals(prefix)) {
-        return type;
-      }
-    }
-
-    Set<String> values = Arrays.stream(AllocationTagNamespaceType.values())
-        .map(AllocationTagNamespaceType::toString)
-        .collect(Collectors.toSet());
-    throw new InvalidAllocationTagException(
-        "Invalid namespace prefix: " + prefix
-            + ", valid values are: " + String.join(",", values));
-  }
-
   @Override
   public String toString() {
     return this.getTypeKeyword();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08921ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java
deleted file mode 100644
index 50bffc3..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.api.records;
-
-import java.util.Set;
-
-/**
- * Allocation tags under same namespace.
- */
-public class AllocationTags {
-
-  private AllocationTagNamespace ns;
-  private Set<String> tags;
-
-  public AllocationTags(AllocationTagNamespace namespace,
-      Set<String> allocationTags) {
-    this.ns = namespace;
-    this.tags = allocationTags;
-  }
-
-  /**
-   * @return the namespace of these tags.
-   */
-  public AllocationTagNamespace getNamespace() {
-    return this.ns;
-  }
-
-  /**
-   * @return the allocation tags.
-   */
-  public Set<String> getTags() {
-    return this.tags;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08921ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Evaluable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Evaluable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Evaluable.java
deleted file mode 100644
index 7a74002..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Evaluable.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.api.records;
-
-import org.apache.hadoop.yarn.exceptions.YarnException;
-
-/**
- * A class implements Evaluable interface represents the internal state
- * of the class can be changed against a given target.
- * @param <T> a target to evaluate against
- */
-public interface Evaluable<T> {
-
-  /**
-   * Evaluate against a given target, this process changes the internal state
-   * of current class.
-   *
-   * @param target a generic type target that impacts this evaluation.
-   * @throws YarnException
-   */
-  void evaluate(T target) throws YarnException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08921ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java
deleted file mode 100644
index de0ea26..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.api.records;
-
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * This class is used by
- * {@link AllocationTagNamespace#evaluate(TargetApplications)} to evaluate
- * a namespace.
- */
-public class TargetApplications {
-
-  private ApplicationId currentAppId;
-  private Set<ApplicationId> allAppIds;
-
-  public TargetApplications(ApplicationId currentApplicationId,
-      Set<ApplicationId> allApplicationIds) {
-    this.currentAppId = currentApplicationId;
-    this.allAppIds = allApplicationIds;
-  }
-
-  public Set<ApplicationId> getAllApplicationIds() {
-    return this.allAppIds;
-  }
-
-  public ApplicationId getCurrentApplicationId() {
-    return this.currentAppId;
-  }
-
-  public Set<ApplicationId> getOtherApplicationIds() {
-    return allAppIds == null ? null : allAppIds.stream().filter(appId ->
-        !appId.equals(getCurrentApplicationId()))
-        .collect(Collectors.toSet());
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08921ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
index af70e2a..02138bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
+import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.DelayedOr;
@@ -108,6 +108,25 @@ public final class PlacementConstraints {
   }
 
   /**
+   * Similar to {@link #cardinality(String, int, int, String...)}, but let you
+   * attach a namespace to the given allocation tags.
+   *
+   * @param scope the scope of the constraint
+   * @param namespace the namespace of the allocation tags
+   * @param minCardinality determines the minimum number of allocations within
+   *                       the scope
+   * @param maxCardinality determines the maximum number of allocations within
+   *                       the scope
+   * @param allocationTags allocation tags
+   * @return the resulting placement constraint
+   */
+  public static AbstractConstraint cardinality(String scope, String namespace,
+      int minCardinality, int maxCardinality, String... allocationTags) {
+    return new SingleConstraint(scope, minCardinality, maxCardinality,
+        PlacementTargets.allocationTagWithNamespace(namespace, allocationTags));
+  }
+
+  /**
    * Similar to {@link #cardinality(String, int, int, String...)}, but
    * determines only the minimum cardinality (the maximum cardinality is
    * unbound).
@@ -125,6 +144,23 @@ public final class PlacementConstraints {
   }
 
   /**
+   * Similar to {@link #minCardinality(String, int, String...)}, but let you
+   * attach a namespace to the allocation tags.
+   *
+   * @param scope the scope of the constraint
+   * @param namespace the namespace of these tags
+   * @param minCardinality determines the minimum number of allocations within
+   *                       the scope
+   * @param allocationTags the constraint targets allocations with these tags
+   * @return the resulting placement constraint
+   */
+  public static AbstractConstraint minCardinality(String scope,
+      String namespace, int minCardinality, String... allocationTags) {
+    return cardinality(scope, namespace, minCardinality, Integer.MAX_VALUE,
+        allocationTags);
+  }
+
+  /**
    * Similar to {@link #cardinality(String, int, int, String...)}, but
    * determines only the maximum cardinality (the minimum cardinality is 0).
    *
@@ -140,6 +176,23 @@ public final class PlacementConstraints {
   }
 
   /**
+   * Similar to {@link #maxCardinality(String, int, String...)}, but let you
+   * specify a namespace for the tags, see supported namespaces in
+   * {@link AllocationTagNamespaceType}.
+   *
+   * @param scope the scope of the constraint
+   * @param tagNamespace the namespace of these tags
+   * @param maxCardinality determines the maximum number of allocations within
+   *          the scope
+   * @param allocationTags allocation tags
+   * @return the resulting placement constraint
+   */
+  public static AbstractConstraint maxCardinality(String scope,
+      String tagNamespace, int maxCardinality, String... allocationTags) {
+    return cardinality(scope, tagNamespace, 0, maxCardinality, allocationTags);
+  }
+
+  /**
    * This constraint generalizes the cardinality and target constraints.
    *
    * Consider a set of nodes N that belongs to the scope specified in the
@@ -242,9 +295,8 @@ public final class PlacementConstraints {
      */
     public static TargetExpression allocationTagToIntraApp(
         String... allocationTags) {
-      AllocationTagNamespace selfNs = new AllocationTagNamespace.Self();
       return new TargetExpression(TargetType.ALLOCATION_TAG,
-          selfNs.toString(), allocationTags);
+          AllocationTagNamespaceType.SELF.toString(), allocationTags);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08921ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java
deleted file mode 100644
index be8d881..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.exceptions;
-
-/**
- * This exception is thrown by
- * {@link
- * org.apache.hadoop.yarn.api.records.AllocationTagNamespace#parse(String)}
- * when it fails to parse a namespace.
- */
-public class InvalidAllocationTagException extends YarnException {
-
-  private static final long serialVersionUID = 1L;
-
-  public InvalidAllocationTagException(String message) {
-    super(message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08921ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java
new file mode 100644
index 0000000..7b9f3be
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagNamespace.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.SELF;
+import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.NOT_SELF;
+import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_LABEL;
+import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_ID;
+import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.ALL;
+
+/**
+ * Class to describe the namespace of an allocation tag.
+ * Each namespace can be evaluated against a set of applications.
+ * After evaluation, the namespace should have an implicit set of
+ * applications which defines its scope.
+ */
+public abstract class AllocationTagNamespace implements
+    Evaluable<TargetApplications> {
+
+  public final static String NAMESPACE_DELIMITER = "/";
+
+  private AllocationTagNamespaceType nsType;
+  // Namespace scope value will be delay binding by eval method.
+  private Set<ApplicationId> nsScope;
+
+  public AllocationTagNamespace(AllocationTagNamespaceType
+      allocationTagNamespaceType) {
+    this.nsType = allocationTagNamespaceType;
+  }
+
+  protected void setScopeIfNotNull(Set<ApplicationId> appIds) {
+    if (appIds != null) {
+      this.nsScope = appIds;
+    }
+  }
+
+  /**
+   * Get the type of the namespace.
+   * @return namespace type.
+   */
+  public AllocationTagNamespaceType getNamespaceType() {
+    return nsType;
+  }
+
+  /**
+   * Get the scope of the namespace, in form of a set of applications.
+   *
+   * @return a set of applications.
+   */
+  public Set<ApplicationId> getNamespaceScope() {
+    if (this.nsScope == null) {
+      throw new IllegalStateException("Invalid namespace scope,"
+          + " it is not initialized. Evaluate must be called before"
+          + " a namespace can be consumed.");
+    }
+    return this.nsScope;
+  }
+
+  /**
+   * Evaluate the namespace against given target applications
+   * if it is necessary. Only self/not-self/app-label namespace types
+   * require this evaluation step, because they are not binding to a
+   * specific scope during initiating. So we do lazy binding for them
+   * in this method.
+   *
+   * @param target a generic type target that impacts this evaluation.
+   * @throws InvalidAllocationTagsQueryException
+   */
+  @Override
+  public void evaluate(TargetApplications target)
+      throws InvalidAllocationTagsQueryException {
+    // Sub-class needs to override this when it requires the eval step.
+  }
+
+  @Override
+  public String toString() {
+    return this.nsType.toString();
+  }
+
+  /**
+   * Namespace within application itself.
+   */
+  public static class Self extends AllocationTagNamespace {
+
+    public Self() {
+      super(SELF);
+    }
+
+    @Override
+    public void evaluate(TargetApplications target)
+        throws InvalidAllocationTagsQueryException {
+      if (target == null || target.getCurrentApplicationId() == null) {
+        throw new InvalidAllocationTagsQueryException("Namespace Self must"
+            + " be evaluated against a single application ID.");
+      }
+      ApplicationId applicationId = target.getCurrentApplicationId();
+      setScopeIfNotNull(ImmutableSet.of(applicationId));
+    }
+  }
+
+  /**
+   * Namespace to all applications except itself.
+   */
+  public static class NotSelf extends AllocationTagNamespace {
+
+    private ApplicationId applicationId;
+
+    public NotSelf() {
+      super(NOT_SELF);
+    }
+
+    /**
+     * The scope of self namespace is to an application itself,
+     * the application ID can be delay binding to the namespace.
+     *
+     * @param appId application ID.
+     */
+    public void setApplicationId(ApplicationId appId) {
+      this.applicationId = appId;
+    }
+
+    public ApplicationId getApplicationId() {
+      return this.applicationId;
+    }
+
+    @Override
+    public void evaluate(TargetApplications target) {
+      Set<ApplicationId> otherAppIds = target.getOtherApplicationIds();
+      setScopeIfNotNull(otherAppIds);
+    }
+  }
+
+  /**
+   * Namespace to all applications in the cluster.
+   */
+  public static class All extends AllocationTagNamespace {
+
+    public All() {
+      super(ALL);
+    }
+  }
+
+  /**
+   * Namespace to all applications in the cluster.
+   */
+  public static class AppLabel extends AllocationTagNamespace {
+
+    public AppLabel() {
+      super(APP_LABEL);
+    }
+
+    @Override
+    public void evaluate(TargetApplications target) {
+      // TODO Implement app-label namespace evaluation
+    }
+  }
+
+  /**
+   * Namespace defined by a certain application ID.
+   */
+  public static class AppID extends AllocationTagNamespace {
+
+    private ApplicationId targetAppId;
+    // app-id namespace requires an extra value of an application id.
+    public AppID(ApplicationId applicationId) {
+      super(APP_ID);
+      this.targetAppId = applicationId;
+      setScopeIfNotNull(ImmutableSet.of(targetAppId));
+    }
+
+    @Override
+    public String toString() {
+      return APP_ID.toString() + NAMESPACE_DELIMITER + this.targetAppId;
+    }
+  }
+
+  /**
+   * Parse namespace from a string. The string must be in legal format
+   * defined by each {@link AllocationTagNamespaceType}.
+   *
+   * @param namespaceStr namespace string.
+   * @return an instance of {@link AllocationTagNamespace}.
+   * @throws InvalidAllocationTagsQueryException
+   * if given string is not in valid format
+   */
+  public static AllocationTagNamespace parse(String namespaceStr)
+      throws InvalidAllocationTagsQueryException {
+    // Return the default namespace if no valid string is given.
+    if (Strings.isNullOrEmpty(namespaceStr)) {
+      return new Self();
+    }
+
+    // Normalize the input, escape additional chars.
+    List<String> nsValues = normalize(namespaceStr);
+    // The first string should be the prefix.
+    String nsPrefix = nsValues.get(0);
+    AllocationTagNamespaceType allocationTagNamespaceType =
+        fromString(nsPrefix);
+    switch (allocationTagNamespaceType) {
+    case SELF:
+      return new Self();
+    case NOT_SELF:
+      return new NotSelf();
+    case ALL:
+      return new All();
+    case APP_ID:
+      if (nsValues.size() != 2) {
+        throw new InvalidAllocationTagsQueryException(
+            "Missing the application ID in the namespace string: "
+                + namespaceStr);
+      }
+      String appIDStr = nsValues.get(1);
+      return parseAppID(appIDStr);
+    case APP_LABEL:
+      return new AppLabel();
+    default:
+      throw new InvalidAllocationTagsQueryException(
+          "Invalid namespace string " + namespaceStr);
+    }
+  }
+
+  private static AllocationTagNamespaceType fromString(String prefix) throws
+      InvalidAllocationTagsQueryException {
+    for (AllocationTagNamespaceType type :
+        AllocationTagNamespaceType.values()) {
+      if(type.getTypeKeyword().equals(prefix)) {
+        return type;
+      }
+    }
+
+    Set<String> values = Arrays.stream(AllocationTagNamespaceType.values())
+        .map(AllocationTagNamespaceType::toString)
+        .collect(Collectors.toSet());
+    throw new InvalidAllocationTagsQueryException(
+        "Invalid namespace prefix: " + prefix
+            + ", valid values are: " + String.join(",", values));
+  }
+
+  private static AllocationTagNamespace parseAppID(String appIDStr)
+      throws InvalidAllocationTagsQueryException {
+    try {
+      ApplicationId applicationId = ApplicationId.fromString(appIDStr);
+      return new AppID(applicationId);
+    } catch (IllegalArgumentException e) {
+      throw new InvalidAllocationTagsQueryException(
+          "Invalid application ID for "
+              + APP_ID.getTypeKeyword() + ": " + appIDStr);
+    }
+  }
+
+  /**
+   * Valid given namespace string and parse it to a list of sub-strings
+   * that can be consumed by the parser according to the type of the
+   * namespace. Currently the size of return list should be either 1 or 2.
+   * Extra slash is escaped during the normalization.
+   *
+   * @param namespaceStr namespace string.
+   * @return a list of parsed strings.
+   * @throws InvalidAllocationTagsQueryException
+   * if namespace format is unexpected.
+   */
+  private static List<String> normalize(String namespaceStr)
+      throws InvalidAllocationTagsQueryException {
+    List<String> result = new ArrayList<>();
+    if (namespaceStr == null) {
+      return result;
+    }
+
+    String[] nsValues = namespaceStr.split(NAMESPACE_DELIMITER);
+    for (String str : nsValues) {
+      if (!Strings.isNullOrEmpty(str)) {
+        result.add(str);
+      }
+    }
+
+    // Currently we only allow 1 or 2 values for a namespace string
+    if (result.size() == 0 || result.size() > 2) {
+      throw new InvalidAllocationTagsQueryException("Invalid namespace string: "
+          + namespaceStr + ", the syntax is <namespace_prefix> or"
+          + " <namespace_prefix>/<namespace_value>");
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08921ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTags.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTags.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTags.java
new file mode 100644
index 0000000..dc0237e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTags.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+import java.util.Set;
+
+/**
+ * Allocation tags under same namespace.
+ */
+public final class AllocationTags {
+
+  private AllocationTagNamespace ns;
+  private Set<String> tags;
+
+  private AllocationTags(AllocationTagNamespace namespace,
+      Set<String> allocationTags) {
+    this.ns = namespace;
+    this.tags = allocationTags;
+  }
+
+  /**
+   * @return the namespace of these tags.
+   */
+  public AllocationTagNamespace getNamespace() {
+    return this.ns;
+  }
+
+  /**
+   * @return the allocation tags.
+   */
+  public Set<String> getTags() {
+    return this.tags;
+  }
+
+  @VisibleForTesting
+  public static AllocationTags createSingleAppAllocationTags(
+      ApplicationId appId, Set<String> tags) {
+    AllocationTagNamespace namespace = new AllocationTagNamespace.AppID(appId);
+    return new AllocationTags(namespace, tags);
+  }
+
+  @VisibleForTesting
+  public static AllocationTags createGlobalAllocationTags(Set<String> tags) {
+    AllocationTagNamespace namespace = new AllocationTagNamespace.All();
+    return new AllocationTags(namespace, tags);
+  }
+
+  @VisibleForTesting
+  public static AllocationTags createOtherAppAllocationTags(
+      ApplicationId currentApp, Set<ApplicationId> allIds, Set<String> tags)
+      throws InvalidAllocationTagsQueryException {
+    AllocationTagNamespace namespace = new AllocationTagNamespace.NotSelf();
+    TargetApplications ta = new TargetApplications(currentApp, allIds);
+    namespace.evaluate(ta);
+    return new AllocationTags(namespace, tags);
+  }
+
+  public static AllocationTags newAllocationTags(
+      AllocationTagNamespace namespace, Set<String> tags) {
+    return new AllocationTags(namespace, tags);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08921ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
index fb2619a..830566a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java
@@ -22,9 +22,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.log4j.Logger;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -75,6 +78,12 @@ public class AllocationTagsManager {
     // Map<Type, Map<Tag, Count>>
     private Map<T, Map<String, Long>> typeToTagsWithCount = new HashMap<>();
 
+    public TypeToCountedTags() {}
+
+    private TypeToCountedTags(Map<T, Map<String, Long>> tags) {
+      this.typeToTagsWithCount = tags;
+    }
+
     // protected by external locks
     private void addTags(T type, Set<String> tags) {
       Map<String, Long> innerMap =
@@ -206,6 +215,52 @@ public class AllocationTagsManager {
     public Map<T, Map<String, Long>> getTypeToTagsWithCount() {
       return typeToTagsWithCount;
     }
+
+    /**
+     * Absorbs the given {@link TypeToCountedTags} to current mapping,
+     * this will aggregate the count of the tags with same name.
+     *
+     * @param target a {@link TypeToCountedTags} to merge with.
+     */
+    protected void absorb(final TypeToCountedTags<T> target) {
+      // No opt if the given target is null.
+      if (target == null || target.getTypeToTagsWithCount() == null) {
+        return;
+      }
+
+      // Merge the target.
+      Map<T, Map<String, Long>> targetMap = target.getTypeToTagsWithCount();
+      for (Map.Entry<T, Map<String, Long>> targetEntry :
+          targetMap.entrySet()) {
+        // Get a mutable copy, do not modify the target reference.
+        Map<String, Long> copy = Maps.newHashMap(targetEntry.getValue());
+
+        // If the target type doesn't exist in the current mapping,
+        // add as a new entry.
+        Map<String, Long> existingMapping =
+            this.typeToTagsWithCount.putIfAbsent(targetEntry.getKey(), copy);
+        // There was a mapping for this target type,
+        // do proper merging on the operator.
+        if (existingMapping != null) {
+          Map<String, Long> localMap =
+              this.typeToTagsWithCount.get(targetEntry.getKey());
+          // Merge the target map to the inner map.
+          Map<String, Long> targetValue = targetEntry.getValue();
+          for (Map.Entry<String, Long> entry : targetValue.entrySet()) {
+            localMap.merge(entry.getKey(), entry.getValue(),
+                (a, b) -> Long.sum(a, b));
+          }
+        }
+      }
+    }
+
+    /**
+     * @return an immutable copy of current instance.
+     */
+    protected TypeToCountedTags immutableCopy() {
+      return new TypeToCountedTags(
+          Collections.unmodifiableMap(this.typeToTagsWithCount));
+    }
   }
 
   @VisibleForTesting
@@ -236,6 +291,34 @@ public class AllocationTagsManager {
   }
 
   /**
+   * Aggregates multiple {@link TypeToCountedTags} to a single one based on
+   * a given set of application IDs, the values are properly merged.
+   *
+   * @param appIds a set of application IDs.
+   * @return an aggregated {@link TypeToCountedTags}.
+   */
+  private TypeToCountedTags aggregateAllocationTags(Set<ApplicationId> appIds,
+      Map<ApplicationId, TypeToCountedTags> mapping) {
+    TypeToCountedTags result = new TypeToCountedTags();
+    if (appIds != null) {
+      if (appIds.size() == 1) {
+        // If there is only one app, we simply return the mapping
+        // without any extra computation.
+        return mapping.get(appIds.iterator().next());
+      }
+
+      for (ApplicationId applicationId : appIds) {
+        TypeToCountedTags appIdTags = mapping.get(applicationId);
+        if (appIdTags != null) {
+          // Make sure ATM state won't be changed.
+          result.absorb(appIdTags.immutableCopy());
+        }
+      }
+    }
+    return result;
+  }
+
+  /**
    * Notify container allocated on a node.
    *
    * @param nodeId         allocated node.
@@ -458,9 +541,8 @@ public class AllocationTagsManager {
    * to implement customized logic.
    *
    * @param nodeId        nodeId, required.
-   * @param applicationId applicationId. When null is specified, return
-   *                      aggregated cardinality among all applications.
-   * @param tags          allocation tags, see
+   * @param tags          {@link AllocationTags}, allocation tags under a
+   *                      specific namespace. See
    *                      {@link SchedulingRequest#getAllocationTags()},
    *                      When multiple tags specified. Returns cardinality
    *                      depends on op. If a specified tag doesn't exist, 0
@@ -474,29 +556,28 @@ public class AllocationTagsManager {
    * @throws InvalidAllocationTagsQueryException when illegal query
    *                                            parameter specified
    */
-  public long getNodeCardinalityByOp(NodeId nodeId, ApplicationId applicationId,
-      Set<String> tags, LongBinaryOperator op)
-      throws InvalidAllocationTagsQueryException {
+  public long getNodeCardinalityByOp(NodeId nodeId, AllocationTags tags,
+      LongBinaryOperator op) throws InvalidAllocationTagsQueryException {
     readLock.lock();
-
     try {
-      if (nodeId == null || op == null) {
+      if (nodeId == null || op == null || tags == null) {
         throw new InvalidAllocationTagsQueryException(
             "Must specify nodeId/tags/op to query cardinality");
       }
 
       TypeToCountedTags mapping;
-      if (applicationId != null) {
-        mapping = perAppNodeMappings.get(applicationId);
-      } else {
+      if (AllocationTagNamespaceType.ALL.equals(
+          tags.getNamespace().getNamespaceType())) {
         mapping = globalNodeMapping;
+      } else {
+        // Aggregate app tags cardinality by applications.
+        mapping = aggregateAllocationTags(
+            tags.getNamespace().getNamespaceScope(),
+            perAppNodeMappings);
       }
 
-      if (mapping == null) {
-        return 0;
-      }
-
-      return mapping.getCardinality(nodeId, tags, op);
+      return mapping == null ? 0 :
+          mapping.getCardinality(nodeId, tags.getTags(), op);
     } finally {
       readLock.unlock();
     }
@@ -507,9 +588,8 @@ public class AllocationTagsManager {
    * to implement customized logic.
    *
    * @param rack          rack, required.
-   * @param applicationId applicationId. When null is specified, return
-   *                      aggregated cardinality among all applications.
-   * @param tags          allocation tags, see
+   * @param tags          {@link AllocationTags}, allocation tags under a
+   *                      specific namespace. See
    *                      {@link SchedulingRequest#getAllocationTags()},
    *                      When multiple tags specified. Returns cardinality
    *                      depends on op. If a specified tag doesn't exist, 0
@@ -523,30 +603,28 @@ public class AllocationTagsManager {
    * @throws InvalidAllocationTagsQueryException when illegal query
    *                                            parameter specified
    */
-  @SuppressWarnings("unchecked")
-  public long getRackCardinalityByOp(String rack, ApplicationId applicationId,
-      Set<String> tags, LongBinaryOperator op)
-      throws InvalidAllocationTagsQueryException {
+  public long getRackCardinalityByOp(String rack, AllocationTags tags,
+      LongBinaryOperator op) throws InvalidAllocationTagsQueryException {
     readLock.lock();
-
     try {
-      if (rack == null || op == null) {
+      if (rack == null || op == null || tags == null) {
         throw new InvalidAllocationTagsQueryException(
-            "Must specify rack/tags/op to query cardinality");
+            "Must specify nodeId/tags/op to query cardinality");
       }
 
       TypeToCountedTags mapping;
-      if (applicationId != null) {
-        mapping = perAppRackMappings.get(applicationId);
-      } else {
+      if (AllocationTagNamespaceType.ALL.equals(
+          tags.getNamespace().getNamespaceType())) {
         mapping = globalRackMapping;
+      } else {
+        // Aggregates cardinality by rack.
+        mapping = aggregateAllocationTags(
+            tags.getNamespace().getNamespaceScope(),
+            perAppRackMappings);
       }
 
-      if (mapping == null) {
-        return 0;
-      }
-
-      return mapping.getCardinality(rack, tags, op);
+      return mapping == null ? 0 :
+          mapping.getCardinality(rack, tags.getTags(), op);
     } finally {
       readLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08921ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/Evaluable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/Evaluable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/Evaluable.java
new file mode 100644
index 0000000..6a7e54e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/Evaluable.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * A class implements Evaluable interface represents the internal state
+ * of the class can be changed against a given target.
+ * @param <T> a target to evaluate against
+ */
+public interface Evaluable<T> {
+
+  /**
+   * Evaluate against a given target, this process changes the internal state
+   * of current class.
+   *
+   * @param target a generic type target that impacts this evaluation.
+   * @throws YarnException
+   */
+  void evaluate(T target) throws YarnException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08921ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
index 2d0e95a..389fc5c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
@@ -24,11 +24,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
 import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
-import org.apache.hadoop.yarn.api.records.TargetApplications;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
@@ -38,7 +36,6 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
-import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
@@ -70,43 +67,25 @@ public final class PlacementConstraintsUtil {
    */
   private static AllocationTagNamespace getAllocationTagNamespace(
       ApplicationId currentAppId, String targetKey, AllocationTagsManager atm)
-      throws InvalidAllocationTagException{
+      throws InvalidAllocationTagsQueryException {
     // Parse to a valid namespace.
     AllocationTagNamespace namespace = AllocationTagNamespace.parse(targetKey);
 
-    // TODO remove such check once we support all forms of namespaces
-    if (!namespace.isIntraApp() && !namespace.isSingleInterApp()) {
-      throw new InvalidAllocationTagException(
-          "Only support " + AllocationTagNamespaceType.SELF.toString()
-              + " and "+ AllocationTagNamespaceType.APP_ID + " now,"
-              + namespace.toString() + " is not supported yet!");
+    // TODO Complete remove this check once we support app-label.
+    if (AllocationTagNamespaceType.APP_LABEL
+        .equals(namespace.getNamespaceType())) {
+      throw new InvalidAllocationTagsQueryException(
+          namespace.toString() + " is not supported yet!");
     }
 
     // Evaluate the namespace according to the given target
     // before it can be consumed.
-    TargetApplications ta = new TargetApplications(currentAppId,
-        atm.getAllApplicationIds());
+    TargetApplications ta =
+        new TargetApplications(currentAppId, atm.getAllApplicationIds());
     namespace.evaluate(ta);
     return namespace;
   }
 
-  // We return a single app Id now, because at present,
-  // only self and app-id namespace is supported. But moving on,
-  // this will return a set of application IDs.
-  // TODO support other forms of namespaces
-  private static ApplicationId getNamespaceScope(
-      AllocationTagNamespace namespace)
-      throws InvalidAllocationTagException {
-    if (namespace.getNamespaceScope() == null
-        || namespace.getNamespaceScope().size() != 1) {
-      throw new InvalidAllocationTagException(
-          "Invalid allocation tag namespace " + namespace.toString()
-              + ", expecting it is not null and only 1 application"
-              + " ID in the scope.");
-    }
-    return namespace.getNamespaceScope().iterator().next();
-  }
-
   /**
    * Returns true if <b>single</b> placement constraint with associated
    * allocationTags and scope is satisfied by a specific scheduler Node.
@@ -128,14 +107,10 @@ public final class PlacementConstraintsUtil {
     // Parse the allocation tag's namespace from the given target key,
     // then evaluate the namespace and get its scope,
     // which is represented by one or more application IDs.
-    ApplicationId effectiveAppID;
-    try {
-      AllocationTagNamespace namespace = getAllocationTagNamespace(
+    AllocationTagNamespace namespace = getAllocationTagNamespace(
           targetApplicationId, te.getTargetKey(), tm);
-      effectiveAppID = getNamespaceScope(namespace);
-    } catch (InvalidAllocationTagException e) {
-      throw new InvalidAllocationTagsQueryException(e);
-    }
+    AllocationTags allocationTags = AllocationTags
+        .newAllocationTags(namespace, te.getTargetValues());
 
     long minScopeCardinality = 0;
     long maxScopeCardinality = 0;
@@ -149,20 +124,20 @@ public final class PlacementConstraintsUtil {
     if (sc.getScope().equals(PlacementConstraints.NODE)) {
       if (checkMinCardinality) {
         minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
-            effectiveAppID, te.getTargetValues(), Long::max);
+            allocationTags, Long::max);
       }
       if (checkMaxCardinality) {
         maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
-            effectiveAppID, te.getTargetValues(), Long::min);
+            allocationTags, Long::min);
       }
     } else if (sc.getScope().equals(PlacementConstraints.RACK)) {
       if (checkMinCardinality) {
         minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
-            effectiveAppID, te.getTargetValues(), Long::max);
+            allocationTags, Long::max);
       }
       if (checkMaxCardinality) {
         maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
-            effectiveAppID, te.getTargetValues(), Long::min);
+            allocationTags, Long::min);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08921ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java
new file mode 100644
index 0000000..0de7c9e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TargetApplications.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class is used by
+ * {@link AllocationTagNamespace#evaluate(TargetApplications)} to evaluate
+ * a namespace.
+ */
+public class TargetApplications {
+
+  private ApplicationId currentAppId;
+  private Set<ApplicationId> allAppIds;
+
+  public TargetApplications(ApplicationId currentApplicationId,
+      Set<ApplicationId> allApplicationIds) {
+    this.currentAppId = currentApplicationId;
+    this.allAppIds = allApplicationIds;
+  }
+
+  public ApplicationId getCurrentApplicationId() {
+    return this.currentAppId;
+  }
+
+  public Set<ApplicationId> getOtherApplicationIds() {
+    return allAppIds == null ? null : allAppIds.stream().filter(appId ->
+        !appId.equals(getCurrentApplicationId()))
+        .collect(Collectors.toSet());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08921ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java
index 9472719..1fce466 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -139,29 +140,27 @@ class LocalAllocationTagsManager extends AllocationTagsManager {
   }
 
   @Override
-  public long getRackCardinality(String rack, ApplicationId applicationId,
-      String tag) throws InvalidAllocationTagsQueryException {
-    return tagsManager.getRackCardinality(rack, applicationId, tag);
+  public long getNodeCardinalityByOp(NodeId nodeId, AllocationTags tags,
+      LongBinaryOperator op) throws InvalidAllocationTagsQueryException {
+    return tagsManager.getNodeCardinalityByOp(nodeId, tags, op);
   }
 
   @Override
-  public boolean allocationTagExistsOnNode(NodeId nodeId,
-      ApplicationId applicationId, String tag)
-      throws InvalidAllocationTagsQueryException {
-    return tagsManager.allocationTagExistsOnNode(nodeId, applicationId, tag);
+  public long getRackCardinality(String rack, ApplicationId applicationId,
+      String tag) throws InvalidAllocationTagsQueryException {
+    return tagsManager.getRackCardinality(rack, applicationId, tag);
   }
 
   @Override
-  public long getNodeCardinalityByOp(NodeId nodeId,
-      ApplicationId applicationId, Set<String> tags, LongBinaryOperator op)
-      throws InvalidAllocationTagsQueryException {
-    return tagsManager.getNodeCardinalityByOp(nodeId, applicationId, tags, op);
+  public long getRackCardinalityByOp(String rack, AllocationTags tags,
+      LongBinaryOperator op) throws InvalidAllocationTagsQueryException {
+    return tagsManager.getRackCardinalityByOp(rack, tags, op);
   }
 
   @Override
-  public long getRackCardinalityByOp(String rack, ApplicationId applicationId,
-      Set<String> tags, LongBinaryOperator op)
+  public boolean allocationTagExistsOnNode(NodeId nodeId,
+      ApplicationId applicationId, String tag)
       throws InvalidAllocationTagsQueryException {
-    return tagsManager.getRackCardinalityByOp(rack, applicationId, tags, op);
+    return tagsManager.allocationTagExistsOnNode(nodeId, applicationId, tag);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08921ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
index 7e5506e..9004110 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
@@ -23,7 +23,7 @@ import org.apache.commons.collections.IteratorUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagNamespace;
 import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
-import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
 import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@@ -339,18 +338,18 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
         try {
           AllocationTagNamespace tagNS =
               AllocationTagNamespace.parse(targetExpression.getTargetKey());
-          if (!AllocationTagNamespaceType.SELF
+          if (AllocationTagNamespaceType.APP_LABEL
               .equals(tagNS.getNamespaceType())) {
             throwExceptionWithMetaInfo(
-                "As of now, the only accepted target key for targetKey of "
-                    + "allocation_tag target expression is: ["
-                    + AllocationTagNamespaceType.SELF.toString()
-                    + "]. Please make changes to placement constraints "
-                    + "accordingly. If this is null, it will be set to "
+                "As of now, allocation tag namespace ["
+                    + AllocationTagNamespaceType.APP_LABEL.toString()
+                    + "] is not supported. Please make changes to placement "
+                    + "constraints accordingly. If this is null, it will be "
+                    + "set to "
                     + AllocationTagNamespaceType.SELF.toString()
                     + " by default.");
           }
-        } catch (InvalidAllocationTagException e) {
+        } catch (InvalidAllocationTagsQueryException e) {
           throwExceptionWithMetaInfo(
               "Invalid allocation tag namespace, message: " + e.getMessage());
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08921ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index 2ed201c..5eb667e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -305,6 +305,14 @@ public class MockAM {
   public AllocateResponse allocateIntraAppAntiAffinity(
       ResourceSizing resourceSizing, Priority priority, long allocationId,
       Set<String> allocationTags, String... targetTags) throws Exception {
+    return allocateAppAntiAffinity(resourceSizing, priority, allocationId,
+        null, allocationTags, targetTags);
+  }
+
+  public AllocateResponse allocateAppAntiAffinity(
+      ResourceSizing resourceSizing, Priority priority, long allocationId,
+      String namespace, Set<String> allocationTags, String... targetTags)
+      throws Exception {
     return this.allocate(null,
         Arrays.asList(SchedulingRequest.newBuilder().executionType(
             ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
@@ -313,7 +321,8 @@ public class MockAM {
                 PlacementConstraints
                     .targetNotIn(PlacementConstraints.NODE,
                         PlacementConstraints.PlacementTargets
-                            .allocationTagToIntraApp(targetTags)).build())
+                            .allocationTagWithNamespace(namespace, targetTags))
+                    .build())
             .resourceSizing(resourceSizing).build()), null);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a08921ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
index 27c5fbd..7a930cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
@@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -428,20 +430,27 @@ public class TestRMContainerImpl {
     rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
 
     Assert.assertEquals(0,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(
+                TestUtils.getMockApplicationId(1), null),
+            Long::max));
 
     rmContainer.handle(new RMContainerEvent(containerId,
         RMContainerEventType.START));
 
     Assert.assertEquals(1,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
 
     rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus
         .newInstance(containerId, ContainerState.COMPLETE, "", 0),
         RMContainerEventType.KILL));
 
     Assert.assertEquals(0,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
 
     /* Second container: ACQUIRED -> FINISHED */
     rmContainer = new RMContainerImpl(container,
@@ -449,14 +458,18 @@ public class TestRMContainerImpl {
         nodeId, "user", rmContext);
 
     Assert.assertEquals(0,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
 
     rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
     rmContainer.handle(new RMContainerEvent(containerId,
         RMContainerEventType.START));
 
     Assert.assertEquals(1,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
 
     rmContainer.handle(
         new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED));
@@ -466,7 +479,9 @@ public class TestRMContainerImpl {
         RMContainerEventType.FINISHED));
 
     Assert.assertEquals(0,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
 
     /* Third container: RUNNING -> FINISHED */
     rmContainer = new RMContainerImpl(container,
@@ -475,13 +490,17 @@ public class TestRMContainerImpl {
     rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
 
     Assert.assertEquals(0,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
 
     rmContainer.handle(new RMContainerEvent(containerId,
         RMContainerEventType.START));
 
     Assert.assertEquals(1,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
 
     rmContainer.handle(
         new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED));
@@ -494,7 +513,9 @@ public class TestRMContainerImpl {
         RMContainerEventType.FINISHED));
 
     Assert.assertEquals(0,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
 
     /* Fourth container: NEW -> RECOVERED */
     rmContainer = new RMContainerImpl(container,
@@ -503,7 +524,9 @@ public class TestRMContainerImpl {
     rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
 
     Assert.assertEquals(0,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
 
     NMContainerStatus containerStatus = NMContainerStatus
         .newInstance(containerId, 0, ContainerState.NEW,
@@ -514,6 +537,8 @@ public class TestRMContainerImpl {
         .handle(new RMContainerRecoverEvent(containerId, containerStatus));
 
     Assert.assertEquals(1,
-        tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
+        tagsManager.getNodeCardinalityByOp(nodeId,
+            AllocationTags.createSingleAppAllocationTags(appId, null),
+            Long::max));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message