hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject [26/37] hadoop git commit: YARN-6599. Support anti-affinity constraint via AppPlacementAllocator. (Wangda Tan via asuresh)
Date Wed, 31 Jan 2018 15:55:56 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
index 73b4f9e..24c5a5e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
 import java.util.Iterator;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -30,9 +32,12 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
 
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE_PARTITION;
+
 /**
  * This class contains various static methods used by the Placement Algorithms
  * to simplify constrained placement.
@@ -41,16 +46,20 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algori
 @Public
 @Unstable
 public final class PlacementConstraintsUtil {
+  private static final Log LOG =
+      LogFactory.getLog(PlacementConstraintsUtil.class);
 
   // Suppresses default constructor, ensuring non-instantiability.
   private PlacementConstraintsUtil() {
   }
 
   /**
-   * Returns true if **single** application constraint with associated
+   * Returns true if **single** placement constraint with associated
    * allocationTags and scope is satisfied by a specific scheduler Node.
    *
-   * @param appId the application id
+   * @param targetApplicationId the application id, which could be override by
+   *                           target application id specified inside allocation
+   *                           tags.
    * @param sc the placement constraint
    * @param te the target expression
    * @param node the scheduler node
@@ -59,32 +68,123 @@ public final class PlacementConstraintsUtil {
    * @throws InvalidAllocationTagsQueryException
    */
   private static boolean canSatisfySingleConstraintExpression(
-      ApplicationId appId, SingleConstraint sc, TargetExpression te,
-      SchedulerNode node, AllocationTagsManager tm)
+      ApplicationId targetApplicationId, SingleConstraint sc,
+      TargetExpression te, SchedulerNode node, AllocationTagsManager tm)
       throws InvalidAllocationTagsQueryException {
     long minScopeCardinality = 0;
     long maxScopeCardinality = 0;
+    
+    // Optimizations to only check cardinality if necessary.
+    int desiredMinCardinality = sc.getMinCardinality();
+    int desiredMaxCardinality = sc.getMaxCardinality();
+    boolean checkMinCardinality = desiredMinCardinality > 0;
+    boolean checkMaxCardinality = desiredMaxCardinality < Integer.MAX_VALUE;
+
     if (sc.getScope().equals(PlacementConstraints.NODE)) {
-      minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
-          te.getTargetValues(), Long::max);
-      maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
-          te.getTargetValues(), Long::min);
+      if (checkMinCardinality) {
+        minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
+            targetApplicationId, te.getTargetValues(), Long::max);
+      }
+      if (checkMaxCardinality) {
+        maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
+            targetApplicationId, te.getTargetValues(), Long::min);
+      }
     } else if (sc.getScope().equals(PlacementConstraints.RACK)) {
-      minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,
-          te.getTargetValues(), Long::max);
-      maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,
-          te.getTargetValues(), Long::min);
+      if (checkMinCardinality) {
+        minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
+            targetApplicationId, te.getTargetValues(), Long::max);
+      }
+      if (checkMaxCardinality) {
+        maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
+            targetApplicationId, te.getTargetValues(), Long::min);
+      }
     }
     // Make sure Anti-affinity satisfies hard upper limit
-    maxScopeCardinality = sc.getMaxCardinality() == 0 ? maxScopeCardinality - 1
+    maxScopeCardinality = desiredMaxCardinality == 0 ? maxScopeCardinality - 1
         : maxScopeCardinality;
 
-    return (minScopeCardinality >= sc.getMinCardinality()
-        && maxScopeCardinality < sc.getMaxCardinality());
+    return (desiredMinCardinality <= 0
+        || minScopeCardinality >= desiredMinCardinality) && (
+        desiredMaxCardinality == Integer.MAX_VALUE
+            || maxScopeCardinality < desiredMaxCardinality);
+  }
+
+  private static boolean canSatisfyNodePartitionConstraintExpresssion(
+      TargetExpression targetExpression, SchedulerNode schedulerNode) {
+    Set<String> values = targetExpression.getTargetValues();
+    if (values == null || values.isEmpty()) {
+      return schedulerNode.getPartition().equals(
+          RMNodeLabelsManager.NO_LABEL);
+    } else{
+      String nodePartition = values.iterator().next();
+      if (!nodePartition.equals(schedulerNode.getPartition())) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private static boolean canSatisfySingleConstraint(ApplicationId applicationId,
+      SingleConstraint singleConstraint, SchedulerNode schedulerNode,
+      AllocationTagsManager tagsManager)
+      throws InvalidAllocationTagsQueryException {
+    // Iterate through TargetExpressions
+    Iterator<TargetExpression> expIt =
+        singleConstraint.getTargetExpressions().iterator();
+    while (expIt.hasNext()) {
+      TargetExpression currentExp = expIt.next();
+      // Supporting AllocationTag Expressions for now
+      if (currentExp.getTargetType().equals(TargetType.ALLOCATION_TAG)) {
+        // Check if conditions are met
+        if (!canSatisfySingleConstraintExpression(applicationId,
+            singleConstraint, currentExp, schedulerNode, tagsManager)) {
+          return false;
+        }
+      } else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE)
+          && currentExp.getTargetKey().equals(NODE_PARTITION)) {
+        // This is a node partition expression, check it.
+        canSatisfyNodePartitionConstraintExpresssion(currentExp, schedulerNode);
+      }
+    }
+    // return true if all targetExpressions are satisfied
+    return true;
+  }
+
+  /**
+   * Returns true if all placement constraints are **currently** satisfied by a
+   * specific scheduler Node..
+   *
+   * To do so the method retrieves and goes through all application constraint
+   * expressions and checks if the specific allocation is between the allowed
+   * min-max cardinality values under the constraint scope (Node/Rack/etc).
+   *
+   * @param applicationId applicationId,
+   * @param placementConstraint placement constraint.
+   * @param node the scheduler node
+   * @param tagsManager the allocation tags store
+   * @return true if all application constraints are satisfied by node
+   * @throws InvalidAllocationTagsQueryException
+   */
+  public static boolean canSatisfySingleConstraint(ApplicationId applicationId,
+      PlacementConstraint placementConstraint, SchedulerNode node,
+      AllocationTagsManager tagsManager)
+      throws InvalidAllocationTagsQueryException {
+    if (placementConstraint == null) {
+      return true;
+    }
+    // Transform to SimpleConstraint
+    SingleConstraintTransformer singleTransformer =
+        new SingleConstraintTransformer(placementConstraint);
+    placementConstraint = singleTransformer.transform();
+    AbstractConstraint sConstraintExpr = placementConstraint.getConstraintExpr();
+    SingleConstraint single = (SingleConstraint) sConstraintExpr;
+
+    return canSatisfySingleConstraint(applicationId, single, node, tagsManager);
   }
 
   /**
-   * Returns true if all application constraints with associated allocationTags
+   * Returns true if all placement constraints with associated allocationTags
    * are **currently** satisfied by a specific scheduler Node.
    * To do so the method retrieves and goes through all application constraint
    * expressions and checks if the specific allocation is between the allowed
@@ -98,41 +198,12 @@ public final class PlacementConstraintsUtil {
    * @return true if all application constraints are satisfied by node
    * @throws InvalidAllocationTagsQueryException
    */
-  public static boolean canSatisfyConstraints(ApplicationId appId,
+  public static boolean canSatisfySingleConstraint(ApplicationId appId,
       Set<String> allocationTags, SchedulerNode node,
       PlacementConstraintManager pcm, AllocationTagsManager tagsManager)
       throws InvalidAllocationTagsQueryException {
     PlacementConstraint constraint = pcm.getConstraint(appId, allocationTags);
-    if (constraint == null) {
-      return true;
-    }
-    // Transform to SimpleConstraint
-    SingleConstraintTransformer singleTransformer =
-        new SingleConstraintTransformer(constraint);
-    constraint = singleTransformer.transform();
-    AbstractConstraint sConstraintExpr = constraint.getConstraintExpr();
-    SingleConstraint single = (SingleConstraint) sConstraintExpr;
-    // Iterate through TargetExpressions
-    Iterator<TargetExpression> expIt = single.getTargetExpressions().iterator();
-    while (expIt.hasNext()) {
-      TargetExpression currentExp = expIt.next();
-      // Supporting AllocationTag Expressions for now
-      if (currentExp.getTargetType().equals(TargetType.ALLOCATION_TAG)) {
-        // If source and tag allocation tags are the same, we do not enforce
-        // constraints with minimum cardinality.
-        if (currentExp.getTargetValues().equals(allocationTags)
-            && single.getMinCardinality() > 0) {
-          return true;
-        }
-        // Check if conditions are met
-        if (!canSatisfySingleConstraintExpression(appId, single, currentExp,
-            node, tagsManager)) {
-          return false;
-        }
-      }
-    }
-    // return true if all targetExpressions are satisfied
-    return true;
+    return canSatisfySingleConstraint(appId, constraint, node, tagsManager);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
index 9ed9ab1..eb3fe88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java
@@ -67,7 +67,7 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
       throws InvalidAllocationTagsQueryException {
     int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations();
     if (numAllocs > 0) {
-      if (PlacementConstraintsUtil.canSatisfyConstraints(appId,
+      if (PlacementConstraintsUtil.canSatisfySingleConstraint(appId,
           schedulingRequest.getAllocationTags(), schedulerNode,
           constraintManager, tagsManager)) {
         return true;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
index 8e9c79c..2a6b889 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java
@@ -188,12 +188,18 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
   @Override
   public void allocate(ApplicationAttemptId appAttemptId,
       AllocateRequest request, AllocateResponse response) throws YarnException {
+    // Copy the scheduling request since we will clear it later after sending
+    // to dispatcher
     List<SchedulingRequest> schedulingRequests =
-        request.getSchedulingRequests();
+        new ArrayList<>(request.getSchedulingRequests());
     dispatchRequestsForPlacement(appAttemptId, schedulingRequests);
     reDispatchRetryableRequests(appAttemptId);
     schedulePlacedRequests(appAttemptId);
 
+    // Remove SchedulingRequest from AllocateRequest to avoid SchedulingRequest
+    // added to scheduler.
+    request.setSchedulingRequests(Collections.emptyList());
+
     nextAMSProcessor.allocate(appAttemptId, request, response);
 
     handleRejectedRequests(appAttemptId, response);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index e2a62ec..1f85814 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -830,9 +831,9 @@ public class FairScheduler extends
 
   @Override
   public Allocation allocate(ApplicationAttemptId appAttemptId,
-      List<ResourceRequest> ask, List<ContainerId> release,
-      List<String> blacklistAdditions, List<String> blacklistRemovals,
-      ContainerUpdates updateRequests) {
+      List<ResourceRequest> ask, List<SchedulingRequest> schedulingRequests,
+      List<ContainerId> release, List<String> blacklistAdditions,
+      List<String> blacklistRemovals, ContainerUpdates updateRequests) {
 
     // Make sure this application exists
     FSAppAttempt application = getSchedulerApp(appAttemptId);
@@ -857,7 +858,9 @@ public class FairScheduler extends
     handleContainerUpdates(application, updateRequests);
 
     // Sanity check
-    normalizeRequests(ask);
+    normalizeResourceRequests(ask);
+
+    // TODO, normalize SchedulingRequest
 
     // Record container allocation start time
     application.recordContainerRequestTime(getClock().getTime());
@@ -879,6 +882,7 @@ public class FairScheduler extends
         // Update application requests
         application.updateResourceRequests(ask);
 
+        // TODO, handle SchedulingRequest
         application.showRequests();
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 59b9608..7ac9027 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -320,8 +321,8 @@ public class FifoScheduler extends
 
   @Override
   public Allocation allocate(ApplicationAttemptId applicationAttemptId,
-      List<ResourceRequest> ask, List<ContainerId> release,
-      List<String> blacklistAdditions, List<String> blacklistRemovals,
+      List<ResourceRequest> ask, List<SchedulingRequest> schedulingRequests,
+      List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals,
       ContainerUpdates updateRequests) {
     FifoAppAttempt application = getApplicationAttempt(applicationAttemptId);
     if (application == null) {
@@ -342,7 +343,7 @@ public class FifoScheduler extends
     }
 
     // Sanity check
-    normalizeRequests(ask);
+    normalizeResourceRequests(ask);
 
     // Release containers
     releaseContainers(release, application);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java
index 5c49450..72a6c4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
 
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@@ -29,7 +31,6 @@ import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -50,13 +51,18 @@ import java.util.Map;
  * requests.
  * </p>
  */
-public interface AppPlacementAllocator<N extends SchedulerNode> {
+public abstract class AppPlacementAllocator<N extends SchedulerNode> {
+  protected AppSchedulingInfo appSchedulingInfo;
+  protected SchedulerRequestKey schedulerRequestKey;
+  protected RMContext rmContext;
+
   /**
    * Get iterator of preferred node depends on requirement and/or availability
    * @param candidateNodeSet input CandidateNodeSet
    * @return iterator of preferred node
    */
-  Iterator<N> getPreferredNodeIterator(CandidateNodeSet<N> candidateNodeSet);
+  public abstract Iterator<N> getPreferredNodeIterator(
+      CandidateNodeSet<N> candidateNodeSet);
 
   /**
    * Replace existing pending asks by the new requests
@@ -66,15 +72,29 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
    * requests for preempted container
    * @return true if total pending resource changed
    */
-  PendingAskUpdateResult updatePendingAsk(
+  public abstract PendingAskUpdateResult updatePendingAsk(
       Collection<ResourceRequest> requests,
       boolean recoverPreemptedRequestForAContainer);
 
   /**
+   * Replace existing pending asks by the new SchedulingRequest
+   *
+   * @param schedulerRequestKey                  scheduler request key
+   * @param schedulingRequest                    new asks
+   * @param recoverPreemptedRequestForAContainer if we're recovering resource
+   *                                             requests for preempted container
+   * @return true if total pending resource changed
+   */
+  public abstract PendingAskUpdateResult updatePendingAsk(
+      SchedulerRequestKey schedulerRequestKey,
+      SchedulingRequest schedulingRequest,
+      boolean recoverPreemptedRequestForAContainer);
+
+  /**
    * Get pending ResourceRequests by given schedulerRequestKey
    * @return Map of resourceName to ResourceRequest
    */
-  Map<String, ResourceRequest> getResourceRequests();
+  public abstract Map<String, ResourceRequest> getResourceRequests();
 
   /**
    * Get pending ask for given resourceName. If there's no such pendingAsk,
@@ -83,7 +103,7 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
    * @param resourceName resourceName
    * @return PendingAsk
    */
-  PendingAsk getPendingAsk(String resourceName);
+  public abstract PendingAsk getPendingAsk(String resourceName);
 
   /**
    * Get #pending-allocations for given resourceName. If there's no such
@@ -92,7 +112,7 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
    * @param resourceName resourceName
    * @return #pending-allocations
    */
-  int getOutstandingAsksCount(String resourceName);
+  public abstract int getOutstandingAsksCount(String resourceName);
 
   /**
    * Notify container allocated.
@@ -103,7 +123,7 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
    *         the container. This will be used by scheduler to recover requests.
    *         Please refer to {@link ContainerRequest} for more details.
    */
-  ContainerRequest allocate(SchedulerRequestKey schedulerKey,
+  public abstract ContainerRequest allocate(SchedulerRequestKey schedulerKey,
       NodeType type, SchedulerNode node);
 
   /**
@@ -112,7 +132,7 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
    * @param node which node we will allocate on
    * @return true if we has pending requirement
    */
-  boolean canAllocate(NodeType type, SchedulerNode node);
+  public abstract boolean canAllocate(NodeType type, SchedulerNode node);
 
   /**
    * Can delay to give locality?
@@ -123,16 +143,16 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
    * @param resourceName resourceName
    * @return can/cannot
    */
-  boolean canDelayTo(String resourceName);
+  public abstract boolean canDelayTo(String resourceName);
 
   /**
-   * Does this {@link AppPlacementAllocator} accept resources on nodePartition?
+   * Does this {@link AppPlacementAllocator} accept resources on given node?
    *
-   * @param nodePartition nodePartition
+   * @param schedulerNode schedulerNode
    * @param schedulingMode schedulingMode
    * @return accepted/not
    */
-  boolean acceptNodePartition(String nodePartition,
+  public abstract boolean precheckNode(SchedulerNode schedulerNode,
       SchedulingMode schedulingMode);
 
   /**
@@ -142,7 +162,7 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
    *
    * @return primary requested node partition
    */
-  String getPrimaryRequestedNodePartition();
+  public abstract String getPrimaryRequestedNodePartition();
 
   /**
    * @return number of unique location asks with #pending greater than 0,
@@ -152,18 +172,24 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
    * and should belong to specific delay scheduling policy impl.
    * See YARN-7457 for more details.
    */
-  int getUniqueLocationAsks();
+  public abstract int getUniqueLocationAsks();
 
   /**
    * Print human-readable requests to LOG debug.
    */
-  void showRequests();
+  public abstract void showRequests();
 
   /**
-   * Set app scheduling info.
+   * Initialize this allocator, this will be called by Factory automatically
    *
-   * @param appSchedulingInfo
-   *          app info object.
+   * @param appSchedulingInfo appSchedulingInfo
+   * @param schedulerRequestKey schedulerRequestKey
+   * @param rmContext rmContext
    */
-  void setAppSchedulingInfo(AppSchedulingInfo appSchedulingInfo);
+  public void initialize(AppSchedulingInfo appSchedulingInfo,
+      SchedulerRequestKey schedulerRequestKey, RMContext rmContext) {
+    this.appSchedulingInfo = appSchedulingInfo;
+    this.rmContext = rmContext;
+    this.schedulerRequestKey = schedulerRequestKey;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
index be1c1cc..a0358b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
@@ -22,8 +22,9 @@ import org.apache.commons.collections.IteratorUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@@ -46,26 +47,18 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  * containers.
  */
 public class LocalityAppPlacementAllocator <N extends SchedulerNode>
-    implements AppPlacementAllocator<N> {
+    extends AppPlacementAllocator<N> {
   private static final Log LOG =
       LogFactory.getLog(LocalityAppPlacementAllocator.class);
 
   private final Map<String, ResourceRequest> resourceRequestMap =
       new ConcurrentHashMap<>();
-  private AppSchedulingInfo appSchedulingInfo;
   private volatile String primaryRequestedPartition =
       RMNodeLabelsManager.NO_LABEL;
 
   private final ReentrantReadWriteLock.ReadLock readLock;
   private final ReentrantReadWriteLock.WriteLock writeLock;
 
-  public LocalityAppPlacementAllocator(AppSchedulingInfo info) {
-    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-    readLock = lock.readLock();
-    writeLock = lock.writeLock();
-    this.appSchedulingInfo = info;
-  }
-
   public LocalityAppPlacementAllocator() {
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     readLock = lock.readLock();
@@ -182,6 +175,19 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
   }
 
   @Override
+  public PendingAskUpdateResult updatePendingAsk(
+      SchedulerRequestKey schedulerRequestKey,
+      SchedulingRequest schedulingRequest,
+      boolean recoverPreemptedRequestForAContainer)
+      throws SchedulerInvalidResoureRequestException {
+    throw new SchedulerInvalidResoureRequestException(this.getClass().getName()
+        + " not be able to handle SchedulingRequest, there exists a "
+        + "ResourceRequest with the same scheduler key=" + schedulerRequestKey
+        + ", please send SchedulingRequest with a different allocationId and "
+        + "priority");
+  }
+
+  @Override
   public Map<String, ResourceRequest> getResourceRequests() {
     return resourceRequestMap;
   }
@@ -362,13 +368,13 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
   }
 
   @Override
-  public boolean acceptNodePartition(String nodePartition,
+  public boolean precheckNode(SchedulerNode schedulerNode,
       SchedulingMode schedulingMode) {
     // We will only look at node label = nodeLabelToLookAt according to
     // schedulingMode and partition of node.
     String nodePartitionToLookAt;
     if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
-      nodePartitionToLookAt = nodePartition;
+      nodePartitionToLookAt = schedulerNode.getPartition();
     } else {
       nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL;
     }
@@ -425,9 +431,4 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
       writeLock.unlock();
     }
   }
-
-  @Override
-  public void setAppSchedulingInfo(AppSchedulingInfo appSchedulingInfo) {
-    this.appSchedulingInfo = appSchedulingInfo;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
new file mode 100644
index 0000000..f8f758c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java
@@ -0,0 +1,531 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.APPLICATION_LABEL_INTRA_APPLICATION;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE_PARTITION;
+
+/**
+ * This is a simple implementation to do affinity or anti-affinity for
+ * inter/intra apps.
+ */
+public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
+    extends AppPlacementAllocator<N> {
+  private static final Log LOG =
+      LogFactory.getLog(SingleConstraintAppPlacementAllocator.class);
+
+  private ReentrantReadWriteLock.ReadLock readLock;
+  private ReentrantReadWriteLock.WriteLock writeLock;
+
+  private SchedulingRequest schedulingRequest = null;
+  private String targetNodePartition;
+  private Set<String> targetAllocationTags;
+  private AllocationTagsManager allocationTagsManager;
+
+  public SingleConstraintAppPlacementAllocator() {
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Iterator<N> getPreferredNodeIterator(
+      CandidateNodeSet<N> candidateNodeSet) {
+    // Now only handle the case that single node in the candidateNodeSet
+    // TODO, Add support to multi-hosts inside candidateNodeSet which is passed
+    // in.
+
+    N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet);
+    if (null != singleNode) {
+      return IteratorUtils.singletonIterator(singleNode);
+    }
+
+    return IteratorUtils.emptyIterator();
+  }
+
+  @Override
+  public PendingAskUpdateResult updatePendingAsk(
+      Collection<ResourceRequest> requests,
+      boolean recoverPreemptedRequestForAContainer) {
+    if (requests != null && !requests.isEmpty()) {
+      throw new SchedulerInvalidResoureRequestException(
+          this.getClass().getName()
+              + " not be able to handle ResourceRequest, there exists a "
+              + "SchedulingRequest with the same scheduler key="
+              + SchedulerRequestKey.create(requests.iterator().next())
+              + ", please send ResourceRequest with a different allocationId and "
+              + "priority");
+    }
+
+    // Do nothing
+    return null;
+  }
+
+  private PendingAskUpdateResult internalUpdatePendingAsk(
+      SchedulingRequest newSchedulingRequest, boolean recoverContainer) {
+    // When it is a recover container, there must exists an schedulingRequest.
+    if (recoverContainer && schedulingRequest == null) {
+      throw new SchedulerInvalidResoureRequestException("Trying to recover a "
+          + "container request=" + newSchedulingRequest.toString() + ", however"
+          + "there's no existing scheduling request, this should not happen.");
+    }
+
+    if (schedulingRequest != null) {
+      // If we have an old scheduling request, we will make sure that no changes
+      // made except sizing.
+      // To avoid unnecessary copy of the data structure, we do this by
+      // replacing numAllocations with old numAllocations in the
+      // newSchedulingRequest#getResourceSizing, and compare the two objects.
+      ResourceSizing sizing = newSchedulingRequest.getResourceSizing();
+      int existingNumAllocations =
+          schedulingRequest.getResourceSizing().getNumAllocations();
+
+      // When it is a recovered container request, just set
+      // #newAllocations = #existingAllocations + 1;
+      int newNumAllocations;
+      if (recoverContainer) {
+        newNumAllocations = existingNumAllocations + 1;
+      } else {
+        newNumAllocations = sizing.getNumAllocations();
+      }
+      sizing.setNumAllocations(existingNumAllocations);
+
+      // Compare two objects
+      if (!schedulingRequest.equals(newSchedulingRequest)) {
+        // Rollback #numAllocations
+        sizing.setNumAllocations(newNumAllocations);
+        throw new SchedulerInvalidResoureRequestException(
+            "Invalid updated SchedulingRequest added to scheduler, "
+                + " we only allows changing numAllocations for the updated "
+                + "SchedulingRequest. Old=" + schedulingRequest.toString()
+                + " new=" + newSchedulingRequest.toString()
+                + ", if any fields need to be updated, please cancel the "
+                + "old request (by setting numAllocations to 0) and send a "
+                + "SchedulingRequest with different combination of "
+                + "priority/allocationId");
+      } else {
+        if (newNumAllocations == existingNumAllocations) {
+          // No update on pending asks, return null.
+          return null;
+        }
+      }
+
+      // Rollback #numAllocations
+      sizing.setNumAllocations(newNumAllocations);
+
+      // Basic sanity check
+      if (newNumAllocations < 0) {
+        throw new SchedulerInvalidResoureRequestException(
+            "numAllocation in ResourceSizing field must be >= 0, "
+                + "updating schedulingRequest failed.");
+      }
+
+      PendingAskUpdateResult updateResult = new PendingAskUpdateResult(
+          new PendingAsk(schedulingRequest.getResourceSizing()),
+          new PendingAsk(newSchedulingRequest.getResourceSizing()),
+          targetNodePartition, targetNodePartition);
+
+      // Ok, now everything is same except numAllocation, update numAllocation.
+      this.schedulingRequest.getResourceSizing().setNumAllocations(
+          newNumAllocations);
+      LOG.info(
+          "Update numAllocation from old=" + existingNumAllocations + " to new="
+              + newNumAllocations);
+
+      return updateResult;
+    }
+
+    // For a new schedulingRequest, we need to validate if we support its asks.
+    // This will update internal partitions, etc. after the SchedulingRequest is
+    // valid.
+    validateAndSetSchedulingRequest(newSchedulingRequest);
+
+    return new PendingAskUpdateResult(null,
+        new PendingAsk(newSchedulingRequest.getResourceSizing()), null,
+        targetNodePartition);
+  }
+
+  @Override
+  public PendingAskUpdateResult updatePendingAsk(
+      SchedulerRequestKey schedulerRequestKey,
+      SchedulingRequest newSchedulingRequest,
+      boolean recoverPreemptedRequestForAContainer) {
+    writeLock.lock();
+    try {
+      return internalUpdatePendingAsk(newSchedulingRequest,
+          recoverPreemptedRequestForAContainer);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private String throwExceptionWithMetaInfo(String message) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("AppId=").append(appSchedulingInfo.getApplicationId()).append(
+        " Key=").append(this.schedulerRequestKey).append(". Exception message:")
+        .append(message);
+    throw new SchedulerInvalidResoureRequestException(sb.toString());
+  }
+
+  private void validateAndSetSchedulingRequest(SchedulingRequest newSchedulingRequest)
+      throws SchedulerInvalidResoureRequestException {
+    // Check sizing exists
+    if (newSchedulingRequest.getResourceSizing() == null
+        || newSchedulingRequest.getResourceSizing().getResources() == null) {
+      throwExceptionWithMetaInfo(
+          "No ResourceSizing found in the scheduling request, please double "
+              + "check");
+    }
+
+    // Check execution type == GUARANTEED
+    if (newSchedulingRequest.getExecutionType() != null
+        && newSchedulingRequest.getExecutionType().getExecutionType()
+        != ExecutionType.GUARANTEED) {
+      throwExceptionWithMetaInfo(
+          "Only GUARANTEED execution type is supported.");
+    }
+
+    PlacementConstraint constraint =
+        newSchedulingRequest.getPlacementConstraint();
+
+    // We only accept SingleConstraint
+    PlacementConstraint.AbstractConstraint ac = constraint.getConstraintExpr();
+    if (!(ac instanceof PlacementConstraint.SingleConstraint)) {
+      throwExceptionWithMetaInfo(
+          "Only accepts " + PlacementConstraint.SingleConstraint.class.getName()
+              + " as constraint-expression. Rejecting the new added "
+              + "constraint-expression.class=" + ac.getClass().getName());
+    }
+
+    PlacementConstraint.SingleConstraint singleConstraint =
+        (PlacementConstraint.SingleConstraint) ac;
+
+    // Make sure it is an anti-affinity request (actually this implementation
+    // should be able to support both affinity / anti-affinity without much
+    // effort. Considering potential test effort required. Limit to
+    // anti-affinity to intra-app and scope is node.
+    if (!singleConstraint.getScope().equals(PlacementConstraints.NODE)) {
+      throwExceptionWithMetaInfo(
+          "Only support scope=" + PlacementConstraints.NODE
+              + "now. PlacementConstraint=" + singleConstraint);
+    }
+
+    if (singleConstraint.getMinCardinality() != 0
+        || singleConstraint.getMaxCardinality() != 1) {
+      throwExceptionWithMetaInfo(
+          "Only support anti-affinity, which is: minCardinality=0, "
+              + "maxCardinality=1");
+    }
+
+    Set<PlacementConstraint.TargetExpression> targetExpressionSet =
+        singleConstraint.getTargetExpressions();
+    if (targetExpressionSet == null || targetExpressionSet.isEmpty()) {
+      throwExceptionWithMetaInfo(
+          "TargetExpression should not be null or empty");
+    }
+
+    // Set node partition
+    String nodePartition = null;
+
+    // Target allocation tags
+    Set<String> targetAllocationTags = null;
+
+    for (PlacementConstraint.TargetExpression targetExpression : targetExpressionSet) {
+      // Handle node partition
+      if (targetExpression.getTargetType().equals(
+          PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE)) {
+        // For node attribute target, we only support Partition now. And once
+        // YARN-3409 is merged, we will support node attribute.
+        if (!targetExpression.getTargetKey().equals(NODE_PARTITION)) {
+          throwExceptionWithMetaInfo("When TargetType="
+              + PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE
+              + " only " + NODE_PARTITION + " is accepted as TargetKey.");
+        }
+
+        if (nodePartition != null) {
+          // This means we have duplicated node partition entry inside placement
+          // constraint, which might be set by mistake.
+          throwExceptionWithMetaInfo(
+              "Only one node partition targetExpression is allowed");
+        }
+
+        Set<String> values = targetExpression.getTargetValues();
+        if (values == null || values.isEmpty()) {
+          nodePartition = RMNodeLabelsManager.NO_LABEL;
+          continue;
+        }
+
+        if (values.size() > 1) {
+          throwExceptionWithMetaInfo("Inside one targetExpression, we only "
+              + "support affinity to at most one node partition now");
+        }
+
+        nodePartition = values.iterator().next();
+      } else if (targetExpression.getTargetType().equals(
+          PlacementConstraint.TargetExpression.TargetType.ALLOCATION_TAG)) {
+        // Handle allocation tags
+        if (targetAllocationTags != null) {
+          // This means we have duplicated AllocationTag expressions entries
+          // inside placement constraint, which might be set by mistake.
+          throwExceptionWithMetaInfo(
+              "Only one AllocationTag targetExpression is allowed");
+        }
+
+        if (targetExpression.getTargetValues() == null || targetExpression
+            .getTargetValues().isEmpty()) {
+          throwExceptionWithMetaInfo("Failed to find allocation tags from "
+              + "TargetExpressions or couldn't find self-app target.");
+        }
+
+        targetAllocationTags = new HashSet<>(
+            targetExpression.getTargetValues());
+
+        if (targetExpression.getTargetKey() == null || !targetExpression
+            .getTargetKey().equals(APPLICATION_LABEL_INTRA_APPLICATION)) {
+          throwExceptionWithMetaInfo(
+              "As of now, the only accepted target key for targetKey of "
+                  + "allocation_tag target expression is: ["
+                  + APPLICATION_LABEL_INTRA_APPLICATION
+                  + "]. Please make changes to placement constraints "
+                  + "accordingly.");
+        }
+      }
+    }
+
+    if (targetAllocationTags == null) {
+      // That means we don't have ALLOCATION_TAG specified
+      throwExceptionWithMetaInfo(
+          "Couldn't find target expression with type == ALLOCATION_TAG, it is "
+              + "required to include one and only one target expression with "
+              + "type == ALLOCATION_TAG");
+
+    }
+
+    if (nodePartition == null) {
+      nodePartition = RMNodeLabelsManager.NO_LABEL;
+    }
+
+    // Validation is done. set local results:
+    this.targetNodePartition = nodePartition;
+    this.targetAllocationTags = targetAllocationTags;
+
+    this.schedulingRequest = new SchedulingRequestPBImpl(
+        ((SchedulingRequestPBImpl) newSchedulingRequest).getProto());
+
+    LOG.info("Successfully added SchedulingRequest to app=" + appSchedulingInfo
+        .getApplicationAttemptId() + " targetAllocationTags=[" + StringUtils
+        .join(",", targetAllocationTags) + "]. nodePartition="
+        + targetNodePartition);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Map<String, ResourceRequest> getResourceRequests() {
+    return Collections.EMPTY_MAP;
+  }
+
+  @Override
+  public PendingAsk getPendingAsk(String resourceName) {
+    readLock.lock();
+    try {
+      if (resourceName.equals("*") && schedulingRequest != null) {
+        return new PendingAsk(schedulingRequest.getResourceSizing());
+      }
+      return PendingAsk.ZERO;
+    } finally {
+      readLock.unlock();
+    }
+
+  }
+
+  @Override
+  public int getOutstandingAsksCount(String resourceName) {
+    readLock.lock();
+    try {
+      if (resourceName.equals("*") && schedulingRequest != null) {
+        return schedulingRequest.getResourceSizing().getNumAllocations();
+      }
+      return 0;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private void decreasePendingNumAllocation() {
+    // Deduct pending #allocations by 1
+    ResourceSizing sizing = schedulingRequest.getResourceSizing();
+    sizing.setNumAllocations(sizing.getNumAllocations() - 1);
+  }
+
+  @Override
+  public ContainerRequest allocate(SchedulerRequestKey schedulerKey,
+      NodeType type, SchedulerNode node) {
+    writeLock.lock();
+    try {
+      // Per container scheduling request, it is just a copy of existing
+      // scheduling request with #allocations=1
+      SchedulingRequest containerSchedulingRequest = new SchedulingRequestPBImpl(
+          ((SchedulingRequestPBImpl) schedulingRequest).getProto());
+      containerSchedulingRequest.getResourceSizing().setNumAllocations(1);
+
+      // Deduct sizing
+      decreasePendingNumAllocation();
+
+      return new ContainerRequest(containerSchedulingRequest);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private boolean checkCardinalityAndPending(SchedulerNode node) {
+    // Do we still have pending resource?
+    if (schedulingRequest.getResourceSizing().getNumAllocations() <= 0) {
+      return false;
+    }
+
+    // node type will be ignored.
+    try {
+      return PlacementConstraintsUtil.canSatisfySingleConstraint(
+          appSchedulingInfo.getApplicationId(),
+          this.schedulingRequest.getPlacementConstraint(), node,
+          allocationTagsManager);
+    } catch (InvalidAllocationTagsQueryException e) {
+      LOG.warn("Failed to query node cardinality:", e);
+      return false;
+    }
+  }
+
+  @Override
+  public boolean canAllocate(NodeType type, SchedulerNode node) {
+    try {
+      readLock.lock();
+      return checkCardinalityAndPending(node);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public boolean canDelayTo(String resourceName) {
+    return true;
+  }
+
+  @Override
+  public boolean precheckNode(SchedulerNode schedulerNode,
+      SchedulingMode schedulingMode) {
+    // We will only look at node label = nodeLabelToLookAt according to
+    // schedulingMode and partition of node.
+    String nodePartitionToLookAt;
+    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
+      nodePartitionToLookAt = schedulerNode.getPartition();
+    } else{
+      nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL;
+    }
+
+    readLock.lock();
+    try {
+      // Check node partition as well as cardinality/pending resources.
+      return this.targetNodePartition.equals(nodePartitionToLookAt)
+          && checkCardinalityAndPending(schedulerNode);
+    } finally {
+      readLock.unlock();
+    }
+
+  }
+
+  @Override
+  public String getPrimaryRequestedNodePartition() {
+    return targetNodePartition;
+  }
+
+  @Override
+  public int getUniqueLocationAsks() {
+    return 1;
+  }
+
+  @Override
+  public void showRequests() {
+    try {
+      readLock.lock();
+      if (schedulingRequest != null) {
+        LOG.info(schedulingRequest.toString());
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @VisibleForTesting
+  SchedulingRequest getSchedulingRequest() {
+    return schedulingRequest;
+  }
+
+  @VisibleForTesting
+  String getTargetNodePartition() {
+    return targetNodePartition;
+  }
+
+  @VisibleForTesting
+  Set<String> getTargetAllocationTags() {
+    return targetAllocationTags;
+  }
+
+  @Override
+  public void initialize(AppSchedulingInfo appSchedulingInfo,
+      SchedulerRequestKey schedulerRequestKey, RMContext rmContext) {
+    super.initialize(appSchedulingInfo, schedulerRequestKey, rmContext);
+    this.allocationTagsManager = rmContext.getAllocationTagsManager();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
index fbde681..7d1140d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
@@ -331,8 +331,7 @@ public class Application {
     
     // Get resources from the ResourceManager
     Allocation allocation = resourceManager.getResourceScheduler().allocate(
-        applicationAttemptId, new ArrayList<ResourceRequest>(ask),
-        new ArrayList<ContainerId>(), null, null,
+        applicationAttemptId, new ArrayList<ResourceRequest>(ask), null, new ArrayList<ContainerId>(), null, null,
         new ContainerUpdates());
 
     if (LOG.isInfoEnabled()) {
@@ -431,7 +430,7 @@ public class Application {
     if (type == NodeType.NODE_LOCAL) {
       for (String host : task.getHosts()) {
         if(LOG.isDebugEnabled()) {
-          LOG.debug("updatePendingAsk:" + " application=" + applicationId
+          LOG.debug("updateResourceDemands:" + " application=" + applicationId
             + " type=" + type + " host=" + host
             + " request=" + ((requests == null) ? "null" : requests.get(host)));
         }
@@ -442,7 +441,7 @@ public class Application {
     if (type == NodeType.NODE_LOCAL || type == NodeType.RACK_LOCAL) {
       for (String rack : task.getRacks()) {
         if(LOG.isDebugEnabled()) {
-          LOG.debug("updatePendingAsk:" + " application=" + applicationId
+          LOG.debug("updateResourceDemands:" + " application=" + applicationId
             + " type=" + type + " rack=" + rack
             + " request=" + ((requests == null) ? "null" : requests.get(rack)));
         }
@@ -453,7 +452,7 @@ public class Application {
     updateResourceRequest(requests.get(ResourceRequest.ANY));
     
     if(LOG.isDebugEnabled()) {
-      LOG.debug("updatePendingAsk:" + " application=" + applicationId
+      LOG.debug("updateResourceDemands:" + " application=" + applicationId
         + " #asks=" + ask.size());
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index 975abe6..9fa2c40 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -37,14 +38,17 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -281,6 +285,53 @@ public class MockAM {
     }
     return allocate(req);
   }
+
+  public AllocateResponse allocate(List<ResourceRequest> resourceRequest,
+      List<SchedulingRequest> newSchedulingRequests, List<ContainerId> releases)
+      throws Exception {
+    final AllocateRequest req =
+        AllocateRequest.newInstance(0, 0F, resourceRequest,
+            releases, null);
+    if (newSchedulingRequests != null) {
+      addSchedulingRequest(newSchedulingRequests);
+    }
+    if (!schedulingRequests.isEmpty()) {
+      req.setSchedulingRequests(schedulingRequests);
+      schedulingRequests.clear();
+    }
+    return allocate(req);
+  }
+
+  public AllocateResponse allocateIntraAppAntiAffinity(
+      ResourceSizing resourceSizing, Priority priority, long allocationId,
+      Set<String> allocationTags, String... targetTags) throws Exception {
+    return this.allocate(null,
+        Arrays.asList(SchedulingRequest.newBuilder().executionType(
+            ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+            .allocationRequestId(allocationId).priority(priority)
+            .allocationTags(allocationTags).placementConstraintExpression(
+                PlacementConstraints
+                    .targetCardinality(PlacementConstraints.NODE, 0, 1,
+                        PlacementConstraints.PlacementTargets
+                            .allocationTagToIntraApp(targetTags)).build())
+            .resourceSizing(resourceSizing).build()), null);
+  }
+
+  public AllocateResponse allocateIntraAppAntiAffinity(
+      String nodePartition, ResourceSizing resourceSizing, Priority priority,
+      long allocationId, String... tags) throws Exception {
+    return this.allocate(null,
+        Arrays.asList(SchedulingRequest.newBuilder().executionType(
+            ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
+            .allocationRequestId(allocationId).priority(priority)
+            .placementConstraintExpression(PlacementConstraints
+                .targetCardinality(PlacementConstraints.NODE, 0, 1,
+                    PlacementConstraints.PlacementTargets
+                        .allocationTagToIntraApp(tags),
+                    PlacementConstraints.PlacementTargets
+                        .nodePartition(nodePartition)).build())
+            .resourceSizing(resourceSizing).build()), null);
+  }
   
   public AllocateResponse sendContainerResizingRequest(
       List<UpdateContainerRequest> updateRequests) throws Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
index 0e4f308..4a5c671 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
@@ -474,7 +474,7 @@ public class TestRMAppAttemptTransitions {
 
     assertEquals(expectedState, applicationAttempt.getAppAttemptState());
     verify(scheduler, times(expectedAllocateCount)).allocate(
-        any(ApplicationAttemptId.class), any(List.class), any(List.class),
+        any(ApplicationAttemptId.class), any(List.class), eq(null), any(List.class),
         any(List.class), any(List.class), any(ContainerUpdates.class));
 
     assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
@@ -495,7 +495,7 @@ public class TestRMAppAttemptTransitions {
     // Check events
     verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class));
     verify(scheduler, times(2)).allocate(any(ApplicationAttemptId.class),
-        any(List.class), any(List.class), any(List.class), any(List.class),
+        any(List.class), any(List.class), any(List.class), any(List.class), any(List.class),
         any(ContainerUpdates.class));
     verify(nmTokenManager).clearNodeSetForAttempt(
       applicationAttempt.getAppAttemptId());
@@ -643,7 +643,7 @@ public class TestRMAppAttemptTransitions {
     when(allocation.getContainers()).
         thenReturn(Collections.singletonList(container));
     when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class),
-        any(List.class), any(List.class), any(List.class),
+        any(List.class), any(List.class), any(List.class), any(List.class),
         any(ContainerUpdates.class))).
     thenReturn(allocation);
     RMContainer rmContainer = mock(RMContainerImpl.class);
@@ -1161,7 +1161,7 @@ public class TestRMAppAttemptTransitions {
     when(allocation.getContainers()).
         thenReturn(Collections.singletonList(amContainer));
     when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class),
-        any(List.class), any(List.class), any(List.class),
+        any(List.class), any(List.class), any(List.class), any(List.class),
         any(ContainerUpdates.class)))
         .thenReturn(allocation);
     RMContainer rmContainer = mock(RMContainerImpl.class);
@@ -1636,7 +1636,7 @@ public class TestRMAppAttemptTransitions {
   public void testScheduleTransitionReplaceAMContainerRequestWithDefaults() {
     YarnScheduler mockScheduler = mock(YarnScheduler.class);
     when(mockScheduler.allocate(any(ApplicationAttemptId.class),
-        any(List.class), any(List.class), any(List.class), any(List.class),
+        any(List.class), any(List.class), any(List.class), any(List.class), any(List.class),
         any(ContainerUpdates.class)))
         .thenAnswer(new Answer<Allocation>() {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
index b927870..2bf6a21 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
@@ -33,6 +33,7 @@ import java.util.ArrayList;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -420,9 +421,10 @@ public class TestRMContainerImpl {
     when(rmContext.getYarnConfiguration()).thenReturn(conf);
 
     /* First container: ALLOCATED -> KILLED */
-    RMContainer rmContainer = new RMContainerImpl(container,
+    RMContainerImpl rmContainer = new RMContainerImpl(container,
         SchedulerRequestKey.extractFrom(container), appAttemptId,
         nodeId, "user", rmContext);
+    rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
 
     Assert.assertEquals(0,
         tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
@@ -448,6 +450,7 @@ public class TestRMContainerImpl {
     Assert.assertEquals(0,
         tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
 
+    rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
     rmContainer.handle(new RMContainerEvent(containerId,
         RMContainerEventType.START));
 
@@ -468,6 +471,7 @@ public class TestRMContainerImpl {
     rmContainer = new RMContainerImpl(container,
         SchedulerRequestKey.extractFrom(container), appAttemptId,
         nodeId, "user", rmContext);
+    rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
 
     Assert.assertEquals(0,
         tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
index 3692b29..b7b0eb7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
@@ -46,7 +46,7 @@ public class TestAppSchedulingInfo {
     doReturn("test").when(queue).getQueueName();
     AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(appAttemptId,
         "test", queue, null, 0, new ResourceUsage(),
-        new HashMap<String, String>());
+        new HashMap<String, String>(), null);
 
     appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList<String>(),
         new ArrayList<String>());
@@ -118,7 +118,7 @@ public class TestAppSchedulingInfo {
     doReturn(mock(QueueMetrics.class)).when(queue).getMetrics();
     AppSchedulingInfo  info = new AppSchedulingInfo(
         appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0,
-        new ResourceUsage(), new HashMap<String, String>());
+        new ResourceUsage(), new HashMap<>(), null);
     Assert.assertEquals(0, info.getSchedulerKeys().size());
 
     Priority pri1 = Priority.newInstance(1);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java
new file mode 100644
index 0000000..5cea3a2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.junit.Assert;
+
+import java.util.Set;
+
+public class CapacitySchedulerTestBase {
+  protected final int GB = 1024;
+
+  protected static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+  protected static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+  protected static final String A1 = A + ".a1";
+  protected static final String A2 = A + ".a2";
+  protected static final String B1 = B + ".b1";
+  protected static final String B2 = B + ".b2";
+  protected static final String B3 = B + ".b3";
+  protected static float A_CAPACITY = 10.5f;
+  protected static float B_CAPACITY = 89.5f;
+  protected static final String P1 = CapacitySchedulerConfiguration.ROOT + ".p1";
+  protected static final String P2 = CapacitySchedulerConfiguration.ROOT + ".p2";
+  protected static final String X1 = P1 + ".x1";
+  protected static final String X2 = P1 + ".x2";
+  protected static final String Y1 = P2 + ".y1";
+  protected static final String Y2 = P2 + ".y2";
+  protected static float A1_CAPACITY = 30;
+  protected static float A2_CAPACITY = 70;
+  protected static float B1_CAPACITY = 79.2f;
+  protected static float B2_CAPACITY = 0.8f;
+  protected static float B3_CAPACITY = 20;
+
+
+  @SuppressWarnings("unchecked")
+  protected <E> Set<E> toSet(E... elements) {
+    Set<E> set = Sets.newHashSet(elements);
+    return set;
+  }
+
+  protected void checkPendingResource(MockRM rm, String queueName, int memory,
+      String label) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    CSQueue queue = cs.getQueue(queueName);
+    Assert.assertEquals(
+        memory,
+        queue.getQueueResourceUsage()
+            .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
+            .getMemorySize());
+  }
+
+
+  protected void checkPendingResourceGreaterThanZero(MockRM rm, String queueName,
+      String label) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    CSQueue queue = cs.getQueue(queueName);
+    Assert.assertTrue(queue.getQueueResourceUsage()
+        .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
+        .getMemorySize() > 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 7628312..79898bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -103,7 +103,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMW
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@@ -167,33 +166,10 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
-public class TestCapacityScheduler {
+public class TestCapacityScheduler extends CapacitySchedulerTestBase {
   private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
-  private final int GB = 1024;
   private final static ContainerUpdates NULL_UPDATE_REQUESTS =
       new ContainerUpdates();
-
-  private static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
-  private static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
-  private static final String A1 = A + ".a1";
-  private static final String A2 = A + ".a2";
-  private static final String B1 = B + ".b1";
-  private static final String B2 = B + ".b2";
-  private static final String B3 = B + ".b3";
-  private static float A_CAPACITY = 10.5f;
-  private static float B_CAPACITY = 89.5f;
-  private static final String P1 = CapacitySchedulerConfiguration.ROOT + ".p1";
-  private static final String P2 = CapacitySchedulerConfiguration.ROOT + ".p2";
-  private static final String X1 = P1 + ".x1";
-  private static final String X2 = P1 + ".x2";
-  private static final String Y1 = P2 + ".y1";
-  private static final String Y2 = P2 + ".y2";
-  private static float A1_CAPACITY = 30;
-  private static float A2_CAPACITY = 70;
-  private static float B1_CAPACITY = 79.2f;
-  private static float B2_CAPACITY = 0.8f;
-  private static float B3_CAPACITY = 20;
-
   private ResourceManager resourceManager = null;
   private RMContext mockContext;
 
@@ -1116,12 +1092,12 @@ public class TestCapacityScheduler {
     cs.handle(addAttemptEvent);
 
     // Verify the blacklist can be updated independent of requesting containers
-    cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
+    cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), null,
         Collections.<ContainerId>emptyList(),
         Collections.singletonList(host), null, NULL_UPDATE_REQUESTS);
     Assert.assertTrue(cs.getApplicationAttempt(appAttemptId)
         .isPlaceBlacklisted(host));
-    cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
+    cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), null,
         Collections.<ContainerId>emptyList(), null,
         Collections.singletonList(host), NULL_UPDATE_REQUESTS);
     Assert.assertFalse(cs.getApplicationAttempt(appAttemptId)
@@ -1217,8 +1193,7 @@ public class TestCapacityScheduler {
 
     //This will allocate for app1
     cs.allocate(appAttemptId1,
-        Collections.<ResourceRequest>singletonList(r1),
-        Collections.<ContainerId>emptyList(),
+        Collections.<ResourceRequest>singletonList(r1), null, Collections.<ContainerId>emptyList(),
         null, null, NULL_UPDATE_REQUESTS);
 
     //And this will result in container assignment for app1
@@ -1234,8 +1209,7 @@ public class TestCapacityScheduler {
     //Now, allocate for app2 (this would be the first/AM allocation)
     ResourceRequest r2 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory);
     cs.allocate(appAttemptId2,
-        Collections.<ResourceRequest>singletonList(r2),
-        Collections.<ContainerId>emptyList(),
+        Collections.<ResourceRequest>singletonList(r2), null, Collections.<ContainerId>emptyList(),
         null, null, NULL_UPDATE_REQUESTS);
 
     //In this case we do not perform container assignment because we want to
@@ -3481,12 +3455,6 @@ public class TestCapacityScheduler {
         + "queue-a's max capacity will be violated if container allocated");
   }
 
-  @SuppressWarnings("unchecked")
-  private <E> Set<E> toSet(E... elements) {
-    Set<E> set = Sets.newHashSet(elements);
-    return set;
-  }
-
   @Test
   public void testQueueHierarchyPendingResourceUpdate() throws Exception {
     Configuration conf =
@@ -3618,26 +3586,6 @@ public class TestCapacityScheduler {
     checkPendingResource(rm, "root", 0 * GB, "x");
   }
 
-  private void checkPendingResource(MockRM rm, String queueName, int memory,
-      String label) {
-    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
-    CSQueue queue = cs.getQueue(queueName);
-    Assert.assertEquals(
-        memory,
-        queue.getQueueResourceUsage()
-            .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
-            .getMemorySize());
-  }
-
-  private void checkPendingResourceGreaterThanZero(MockRM rm, String queueName,
-      String label) {
-    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
-    CSQueue queue = cs.getQueue(queueName);
-    Assert.assertTrue(queue.getQueueResourceUsage()
-        .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
-        .getMemorySize() > 0);
-  }
-
   // Test verifies AM Used resource for LeafQueue when AM ResourceRequest is
   // lesser than minimumAllocation
   @Test(timeout = 30000)
@@ -3707,7 +3655,7 @@ public class TestCapacityScheduler {
 
     Allocation allocate =
         cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(),
-            Collections.<ContainerId> emptyList(), null, null,
+            null, Collections.<ContainerId> emptyList(), null, null,
             NULL_UPDATE_REQUESTS);
 
     Assert.assertNotNull(attempt);
@@ -3724,7 +3672,7 @@ public class TestCapacityScheduler {
 
     allocate =
         cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(),
-            Collections.<ContainerId> emptyList(), null, null,
+            null, Collections.<ContainerId> emptyList(), null, null,
             NULL_UPDATE_REQUESTS);
 
     // All resources should be sent as headroom
@@ -4250,8 +4198,7 @@ public class TestCapacityScheduler {
       y1Req = TestUtils.createResourceRequest(
           ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
       cs.allocate(appAttemptId3,
-          Collections.<ResourceRequest>singletonList(y1Req),
-          Collections.<ContainerId>emptyList(),
+          Collections.<ResourceRequest>singletonList(y1Req), null, Collections.<ContainerId>emptyList(),
           null, null, NULL_UPDATE_REQUESTS);
       CapacityScheduler.schedule(cs);
     }
@@ -4264,8 +4211,7 @@ public class TestCapacityScheduler {
       x1Req = TestUtils.createResourceRequest(
           ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
       cs.allocate(appAttemptId1,
-          Collections.<ResourceRequest>singletonList(x1Req),
-          Collections.<ContainerId>emptyList(),
+          Collections.<ResourceRequest>singletonList(x1Req), null, Collections.<ContainerId>emptyList(),
           null, null, NULL_UPDATE_REQUESTS);
       CapacityScheduler.schedule(cs);
     }
@@ -4277,8 +4223,7 @@ public class TestCapacityScheduler {
     x2Req = TestUtils.createResourceRequest(
         ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory);
     cs.allocate(appAttemptId2,
-        Collections.<ResourceRequest>singletonList(x2Req),
-        Collections.<ContainerId>emptyList(),
+        Collections.<ResourceRequest>singletonList(x2Req), null, Collections.<ContainerId>emptyList(),
         null, null, NULL_UPDATE_REQUESTS);
     CapacityScheduler.schedule(cs);
     assertEquals("X2 Used Resource should be 0", 0,
@@ -4289,8 +4234,7 @@ public class TestCapacityScheduler {
     x1Req = TestUtils.createResourceRequest(
         ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
     cs.allocate(appAttemptId1,
-        Collections.<ResourceRequest>singletonList(x1Req),
-        Collections.<ContainerId>emptyList(),
+        Collections.<ResourceRequest>singletonList(x1Req), null, Collections.<ContainerId>emptyList(),
         null, null, NULL_UPDATE_REQUESTS);
     CapacityScheduler.schedule(cs);
     assertEquals("X1 Used Resource should be 7 GB", 7 * GB,
@@ -4303,8 +4247,7 @@ public class TestCapacityScheduler {
       y1Req = TestUtils.createResourceRequest(
           ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
       cs.allocate(appAttemptId3,
-          Collections.<ResourceRequest>singletonList(y1Req),
-          Collections.<ContainerId>emptyList(),
+          Collections.<ResourceRequest>singletonList(y1Req), null, Collections.<ContainerId>emptyList(),
           null, null, NULL_UPDATE_REQUESTS);
       CapacityScheduler.schedule(cs);
     }
@@ -4363,7 +4306,7 @@ public class TestCapacityScheduler {
         ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory);
     //This will allocate for app1
     cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1),
-        Collections.<ContainerId>emptyList(),
+        null, Collections.<ContainerId>emptyList(),
         null, null, NULL_UPDATE_REQUESTS).getContainers().size();
     CapacityScheduler.schedule(cs);
     ResourceRequest r2 = null;
@@ -4371,8 +4314,7 @@ public class TestCapacityScheduler {
       r2 = TestUtils.createResourceRequest(
           ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
       cs.allocate(appAttemptId2,
-          Collections.<ResourceRequest>singletonList(r2),
-          Collections.<ContainerId>emptyList(),
+          Collections.<ResourceRequest>singletonList(r2), null, Collections.<ContainerId>emptyList(),
           null, null, NULL_UPDATE_REQUESTS);
       CapacityScheduler.schedule(cs);
     }
@@ -4385,12 +4327,12 @@ public class TestCapacityScheduler {
     r2 = TestUtils.createResourceRequest(
         ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
     cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1),
-        Collections.<ContainerId>emptyList(),
+        null, Collections.<ContainerId>emptyList(),
         null, null, NULL_UPDATE_REQUESTS).getContainers().size();
     CapacityScheduler.schedule(cs);
 
     cs.allocate(appAttemptId2, Collections.<ResourceRequest>singletonList(r2),
-        Collections.<ContainerId>emptyList(), null, null, NULL_UPDATE_REQUESTS);
+        null, Collections.<ContainerId>emptyList(), null, null, NULL_UPDATE_REQUESTS);
     CapacityScheduler.schedule(cs);
     //Check blocked Resource
     assertEquals("A Used Resource should be 2 GB", 2 * GB,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/38af2379/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
index eddf8c8..18cd942 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -106,7 +106,7 @@ public class TestCapacitySchedulerAsyncScheduling {
         CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
         numThreads);
     conf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
-        + ".scheduling-interval-ms", 100);
+        + ".scheduling-interval-ms", 0);
 
     final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
     mgr.init(conf);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message