hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject [08/37] hadoop git commit: YARN-6619. AMRMClient Changes to use the PlacementConstraint and SchcedulingRequest objects. (Arun Suresh via wangda)
Date Wed, 31 Jan 2018 15:55:38 GMT
YARN-6619. AMRMClient Changes to use the PlacementConstraint and SchcedulingRequest objects. (Arun Suresh via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/29d9e4d5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/29d9e4d5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/29d9e4d5

Branch: refs/heads/YARN-6592
Commit: 29d9e4d5814900d5c59d77fe05d32186d4ad9385
Parents: a5c1fc8
Author: Wangda Tan <wangda@apache.org>
Authored: Wed Jan 17 11:36:26 2018 -0800
Committer: Arun Suresh <asuresh@apache.org>
Committed: Wed Jan 31 01:30:17 2018 -0800

----------------------------------------------------------------------
 .../hadoop/yarn/client/api/AMRMClient.java      |  38 +++-
 .../yarn/client/api/async/AMRMClientAsync.java  |  48 +++++
 .../api/async/impl/AMRMClientAsyncImpl.java     |  49 ++++-
 .../yarn/client/api/impl/AMRMClientImpl.java    | 142 ++++++++++++-
 .../client/api/impl/BaseAMRMClientTest.java     | 212 +++++++++++++++++++
 .../yarn/client/api/impl/TestAMRMClient.java    | 156 +-------------
 .../TestAMRMClientPlacementConstraints.java     | 204 ++++++++++++++++++
 .../rmcontainer/RMContainerImpl.java            |   3 +
 .../scheduler/AbstractYarnScheduler.java        |   1 +
 .../scheduler/SchedulerApplicationAttempt.java  |   1 +
 .../constraint/PlacementConstraintsUtil.java    |   4 +-
 11 files changed, 700 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/29d9e4d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index d3d1974..914a146 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.client.api;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
 import java.util.function.Supplier;
 import java.util.List;
 
@@ -39,7 +41,9 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ProfileCapability;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -554,6 +558,18 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
   }
 
   /**
+   * Add a Collection of SchedulingRequests. The AMRMClient will ensure that
+   * all requests in the same batch are sent in the same allocate call.
+   * @param schedulingRequests Collection of Scheduling Requests.
+   */
+  @Public
+  @InterfaceStability.Unstable
+  public void addSchedulingRequests(
+      Collection<SchedulingRequest> schedulingRequests) {
+
+  }
+
+  /**
    * Register the application master. This must be called before any 
    * other interaction
    * @param appHostName Name of the host on which master is running
@@ -568,7 +584,27 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
                                          int appHostPort,
                                          String appTrackingUrl) 
                throws YarnException, IOException;
-  
+
+  /**
+   * Register the application master. This must be called before any
+   * other interaction
+   * @param appHostName Name of the host on which master is running
+   * @param appHostPort Port master is listening on
+   * @param appTrackingUrl URL at which the master info can be seen
+   * @param placementConstraints Placement Constraints mappings.
+   * @return <code>RegisterApplicationMasterResponse</code>
+   * @throws YarnException
+   * @throws IOException
+   */
+  @Public
+  @InterfaceStability.Unstable
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      String appHostName, int appHostPort, String appTrackingUrl,
+      Map<Set<String>, PlacementConstraint> placementConstraints)
+      throws YarnException, IOException {
+    throw new YarnException("Not supported");
+  }
+
   /**
    * Request additional containers and receive new container allocations.
    * Requests made via <code>addContainerRequest</code> are sent to the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29d9e4d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index 2b82ad6..0af687b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.client.api.async;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
@@ -38,9 +40,12 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.TimelineV2Client;
@@ -206,6 +211,19 @@ extends AbstractService {
                                                    Resource capability);
 
   /**
+   * Add a Collection of SchedulingRequests. The AMRMClient will ensure that
+   * all requests in the same batch are sent in the same allocate call.
+   * @param schedulingRequests Collection of Scheduling Requests.
+   */
+  @Public
+  @Unstable
+  public void addSchedulingRequests(
+      Collection<SchedulingRequest> schedulingRequests) {
+
+  }
+
+
+  /**
    * Returns all matching ContainerRequests that match the given Priority,
    * ResourceName, ExecutionType and Capability.
    *
@@ -250,6 +268,26 @@ extends AbstractService {
       throws YarnException, IOException;
 
   /**
+   * Register the application master. This must be called before any
+   * other interaction
+   * @param appHostName Name of the host on which master is running
+   * @param appHostPort Port master is listening on
+   * @param appTrackingUrl URL at which the master info can be seen
+   * @param placementConstraints Placement Constraints mappings.
+   * @return <code>RegisterApplicationMasterResponse</code>
+   * @throws YarnException
+   * @throws IOException
+   */
+  @Public
+  @Unstable
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      String appHostName, int appHostPort, String appTrackingUrl,
+      Map<Set<String>, PlacementConstraint> placementConstraints)
+      throws YarnException, IOException {
+    throw new YarnException("Not supported");
+  }
+
+  /**
    * Unregister the application master. This must be called in the end.
    * @param appStatus Success/Failure status of the master
    * @param appMessage Diagnostics message on failure
@@ -494,6 +532,16 @@ extends AbstractService {
     public void onContainersReceivedFromPreviousAttempts(
         List<Container> containers) {
     }
+
+    /**
+     * Called when the RM has rejected Scheduling Requests.
+     * @param rejectedSchedulingRequests Rejected Scheduling Requests.
+     */
+    @Public
+    @Unstable
+    public void onRequestsRejected(
+        List<RejectedSchedulingRequest> rejectedSchedulingRequests) {
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29d9e4d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 33b0aba..4f04b66 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -36,9 +38,12 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.TimelineV2Client;
@@ -150,18 +155,50 @@ extends AMRMClientAsync<T> {
                                                    Resource capability) {
     return client.getMatchingRequests(priority, resourceName, capability);
   }
-  
+
+  @Override
+  public void addSchedulingRequests(
+      Collection<SchedulingRequest> schedulingRequests) {
+    client.addSchedulingRequests(schedulingRequests);
+  }
+
   /**
    * Registers this application master with the resource manager. On successful
    * registration, starts the heartbeating thread.
+   *
+   * @param appHostName Name of the host on which master is running
+   * @param appHostPort Port master is listening on
+   * @param appTrackingUrl URL at which the master info can be seen
+   * @return Register AM Response.
    * @throws YarnException
    * @throws IOException
    */
   public RegisterApplicationMasterResponse registerApplicationMaster(
       String appHostName, int appHostPort, String appTrackingUrl)
       throws YarnException, IOException {
+    return registerApplicationMaster(
+        appHostName, appHostPort, appTrackingUrl, null);
+  }
+
+  /**
+   * Registers this application master with the resource manager. On successful
+   * registration, starts the heartbeating thread.
+   *
+   * @param appHostName Name of the host on which master is running
+   * @param appHostPort Port master is listening on
+   * @param appTrackingUrl URL at which the master info can be seen
+   * @param placementConstraintsMap Placement Constraints Mapping.
+   * @return Register AM Response.
+   * @throws YarnException
+   * @throws IOException
+   */
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      String appHostName, int appHostPort, String appTrackingUrl,
+      Map<Set<String>, PlacementConstraint> placementConstraintsMap)
+      throws YarnException, IOException {
     RegisterApplicationMasterResponse response = client
-        .registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
+        .registerApplicationMaster(appHostName, appHostPort,
+            appTrackingUrl, placementConstraintsMap);
     heartbeatThread.start();
     return response;
   }
@@ -366,6 +403,14 @@ extends AMRMClientAsync<T> {
                       response.getContainersFromPreviousAttempts());
             }
           }
+          List<RejectedSchedulingRequest> rejectedSchedulingRequests =
+              response.getRejectedSchedulingRequests();
+          if (!rejectedSchedulingRequests.isEmpty()) {
+            if (handler instanceof AMRMClientAsync.AbstractCallbackHandler) {
+              ((AMRMClientAsync.AbstractCallbackHandler) handler)
+                  .onRequestsRejected(rejectedSchedulingRequests);
+            }
+          }
           progress = handler.getProgress();
         } catch (Throwable ex) {
           handler.onError(ex);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29d9e4d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 5507c07..8e2336f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -30,9 +30,11 @@ import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.AbstractMap.SimpleEntry;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -60,9 +62,11 @@ import org.apache.hadoop.yarn.api.records.ProfileCapability;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@ -106,6 +110,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   protected final Set<String> blacklistedNodes = new HashSet<String>();
   protected final Set<String> blacklistAdditions = new HashSet<String>();
   protected final Set<String> blacklistRemovals = new HashSet<String>();
+  private Map<Set<String>, PlacementConstraint> placementConstraints =
+      new HashMap<>();
+  private Queue<Collection<SchedulingRequest>> batchedSchedulingRequests =
+      new LinkedList<>();
+  private Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests =
+      new ConcurrentHashMap<>();
 
   protected Map<String, Resource> resourceProfilesMap;
   
@@ -218,14 +228,26 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     }
     super.serviceStop();
   }
-  
+
   @Override
   public RegisterApplicationMasterResponse registerApplicationMaster(
       String appHostName, int appHostPort, String appTrackingUrl)
       throws YarnException, IOException {
+    return registerApplicationMaster(appHostName, appHostPort, appTrackingUrl,
+        null);
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      String appHostName, int appHostPort, String appTrackingUrl,
+      Map<Set<String>, PlacementConstraint> placementConstraintsMap)
+      throws YarnException, IOException {
     this.appHostName = appHostName;
     this.appHostPort = appHostPort;
     this.appTrackingUrl = appTrackingUrl;
+    if (placementConstraintsMap != null && !placementConstraintsMap.isEmpty()) {
+      this.placementConstraints.putAll(placementConstraintsMap);
+    }
     Preconditions.checkArgument(appHostName != null,
         "The host name should not be null");
     Preconditions.checkArgument(appHostPort >= -1, "Port number of the host"
@@ -240,6 +262,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     RegisterApplicationMasterRequest request =
         RegisterApplicationMasterRequest.newInstance(this.appHostName,
             this.appHostPort, this.appTrackingUrl);
+    if (!this.placementConstraints.isEmpty()) {
+      request.setPlacementConstraints(this.placementConstraints);
+    }
     RegisterApplicationMasterResponse response =
         rmClient.registerApplicationMaster(request);
     synchronized (this) {
@@ -248,11 +273,23 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
         populateNMTokens(response.getNMTokensFromPreviousAttempts());
       }
       this.resourceProfilesMap = response.getResourceProfiles();
+      List<Container> prevContainers =
+          response.getContainersFromPreviousAttempts();
+      removeFromOutstandingSchedulingRequests(prevContainers);
+      recreateSchedulingRequestBatch();
     }
     return response;
   }
 
   @Override
+  public void addSchedulingRequests(
+      Collection<SchedulingRequest> schedulingRequests) {
+    synchronized (this.batchedSchedulingRequests) {
+      this.batchedSchedulingRequests.add(schedulingRequests);
+    }
+  }
+
+  @Override
   public AllocateResponse allocate(float progressIndicator) 
       throws YarnException, IOException {
     Preconditions.checkArgument(progressIndicator >= 0,
@@ -288,6 +325,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
             .responseId(lastResponseId).progress(progressIndicator)
             .askList(askList).resourceBlacklistRequest(blacklistRequest)
             .releaseList(releaseList).updateRequests(updateList).build();
+        populateSchedulingRequests(allocateRequest);
         // clear blacklistAdditions and blacklistRemovals before
         // unsynchronized part
         blacklistAdditions.clear();
@@ -296,6 +334,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
 
       try {
         allocateResponse = rmClient.allocate(allocateRequest);
+        removeFromOutstandingSchedulingRequests(
+            allocateResponse.getAllocatedContainers());
+        removeFromOutstandingSchedulingRequests(
+            allocateResponse.getContainersFromPreviousAttempts());
       } catch (ApplicationMasterNotRegisteredException e) {
         LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
             + " hence resyncing.");
@@ -397,6 +439,104 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     return allocateResponse;
   }
 
+  private void populateSchedulingRequests(AllocateRequest allocateRequest) {
+    synchronized (this.batchedSchedulingRequests) {
+      if (!this.batchedSchedulingRequests.isEmpty()) {
+        List<SchedulingRequest> newReqs = new LinkedList<>();
+        Iterator<Collection<SchedulingRequest>> iter =
+            this.batchedSchedulingRequests.iterator();
+        while (iter.hasNext()) {
+          Collection<SchedulingRequest> requests = iter.next();
+          newReqs.addAll(requests);
+          addToOutstandingSchedulingRequests(requests);
+          iter.remove();
+        }
+        allocateRequest.setSchedulingRequests(newReqs);
+      }
+    }
+  }
+
+  private void recreateSchedulingRequestBatch() {
+    List<SchedulingRequest> batched = new ArrayList<>();
+    synchronized (this.outstandingSchedRequests) {
+      for (List<SchedulingRequest> schedReqs :
+          this.outstandingSchedRequests.values()) {
+        batched.addAll(schedReqs);
+      }
+    }
+    synchronized (this.batchedSchedulingRequests) {
+      this.batchedSchedulingRequests.add(batched);
+    }
+  }
+
+  private void addToOutstandingSchedulingRequests(
+      Collection<SchedulingRequest> requests) {
+    for (SchedulingRequest req : requests) {
+      List<SchedulingRequest> schedulingRequests =
+          this.outstandingSchedRequests.computeIfAbsent(
+              req.getAllocationTags(), x -> new LinkedList<>());
+      SchedulingRequest matchingReq = null;
+      synchronized (schedulingRequests) {
+        for (SchedulingRequest schedReq : schedulingRequests) {
+          if (isMatching(req, schedReq)) {
+            matchingReq = schedReq;
+            break;
+          }
+        }
+        if (matchingReq != null) {
+          matchingReq.getResourceSizing().setNumAllocations(
+              req.getResourceSizing().getNumAllocations());
+        } else {
+          schedulingRequests.add(req);
+        }
+      }
+    }
+  }
+
+  private boolean isMatching(SchedulingRequest schedReq1,
+      SchedulingRequest schedReq2) {
+    return schedReq1.getPriority().equals(schedReq2.getPriority()) &&
+        schedReq1.getExecutionType().getExecutionType().equals(
+            schedReq1.getExecutionType().getExecutionType()) &&
+        schedReq1.getAllocationRequestId() ==
+            schedReq2.getAllocationRequestId();
+  }
+
+  private void removeFromOutstandingSchedulingRequests(
+      Collection<Container> containers) {
+    if (containers == null || containers.isEmpty()) {
+      return;
+    }
+    for (Container container : containers) {
+      if (container.getAllocationTags() != null &&
+          !container.getAllocationTags().isEmpty()) {
+        List<SchedulingRequest> schedReqs =
+            this.outstandingSchedRequests.get(container.getAllocationTags());
+        if (schedReqs != null && !schedReqs.isEmpty()) {
+          synchronized (schedReqs) {
+            Iterator<SchedulingRequest> iter = schedReqs.iterator();
+            while (iter.hasNext()) {
+              SchedulingRequest schedReq = iter.next();
+              if (schedReq.getPriority().equals(container.getPriority()) &&
+                  schedReq.getAllocationRequestId() ==
+                      container.getAllocationRequestId()) {
+                int numAllocations =
+                    schedReq.getResourceSizing().getNumAllocations();
+                numAllocations--;
+                if (numAllocations == 0) {
+                  iter.remove();
+                } else {
+                  schedReq.getResourceSizing()
+                      .setNumAllocations(numAllocations);
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
   private List<UpdateContainerRequest> createUpdateList() {
     List<UpdateContainerRequest> updateList = new ArrayList<>();
     for (Map.Entry<ContainerId, SimpleEntry<Container,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29d9e4d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java
new file mode 100644
index 0000000..d18652f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.client.api.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base class for testing AMRMClient.
+ */
+public class BaseAMRMClientTest {
+
+  protected Configuration conf = null;
+  protected MiniYARNCluster yarnCluster = null;
+  protected YarnClient yarnClient = null;
+  protected List<NodeReport> nodeReports = null;
+  protected ApplicationAttemptId attemptId = null;
+
+  protected String schedulerName = CapacityScheduler.class.getName();
+  protected boolean autoUpdate = false;
+
+  protected int nodeCount = 3;
+  protected long amExpireMs = 4000;
+  protected int rollingIntervalSec = 13;
+
+
+  protected Resource capability;
+  protected Priority priority;
+  protected Priority priority2;
+  protected String node;
+  protected String rack;
+  protected String[] nodes;
+  protected String[] racks;
+
+  @Before
+  public void setup() throws Exception {
+    conf = new YarnConfiguration();
+    createClusterAndStartApplication(conf);
+  }
+
+  protected void createClusterAndStartApplication(Configuration conf)
+      throws Exception {
+    // start minicluster
+    this.conf = conf;
+    if (autoUpdate) {
+      conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true);
+    }
+    conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName);
+    conf.setLong(
+        YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
+        rollingIntervalSec);
+    conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, amExpireMs);
+    conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
+    // set the minimum allocation so that resource decrease can go under 1024
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
+    conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
+    conf.setBoolean(
+        YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
+    conf.setInt(
+        YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
+    yarnCluster = new MiniYARNCluster(
+        TestAMRMClient.class.getName(), nodeCount, 1, 1);
+    yarnCluster.init(conf);
+    yarnCluster.start();
+
+    // start rm client
+    yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(conf);
+    yarnClient.start();
+
+    // get node info
+    assertTrue("All node managers did not connect to the RM within the "
+            + "allotted 5-second timeout",
+        yarnCluster.waitForNodeManagersToConnect(5000L));
+    nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
+    assertEquals("Not all node managers were reported running",
+        nodeCount, nodeReports.size());
+
+    priority = Priority.newInstance(1);
+    priority2 = Priority.newInstance(2);
+    capability = Resource.newInstance(1024, 1);
+
+    node = nodeReports.get(0).getNodeId().getHost();
+    rack = nodeReports.get(0).getRackName();
+    nodes = new String[]{ node };
+    racks = new String[]{ rack };
+
+    // submit new app
+    ApplicationSubmissionContext appContext =
+        yarnClient.createApplication().getApplicationSubmissionContext();
+    ApplicationId appId = appContext.getApplicationId();
+    // set the application name
+    appContext.setApplicationName("Test");
+    // Set the priority for the application master
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(0);
+    appContext.setPriority(pri);
+    // Set the queue to which this application is to be submitted in the RM
+    appContext.setQueue("default");
+    // Set up the container launch context for the application master
+    ContainerLaunchContext amContainer =
+        BuilderUtils.newContainerLaunchContext(
+            Collections.<String, LocalResource> emptyMap(),
+            new HashMap<String, String>(), Arrays.asList("sleep", "100"),
+            new HashMap<String, ByteBuffer>(), null,
+            new HashMap<ApplicationAccessType, String>());
+    appContext.setAMContainerSpec(amContainer);
+    appContext.setResource(Resource.newInstance(1024, 1));
+    // Create the request to send to the applications manager
+    SubmitApplicationRequest appRequest = Records
+        .newRecord(SubmitApplicationRequest.class);
+    appRequest.setApplicationSubmissionContext(appContext);
+    // Submit the application to the applications manager
+    yarnClient.submitApplication(appContext);
+
+    // wait for app to start
+    RMAppAttempt appAttempt = null;
+    while (true) {
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      if (appReport.getYarnApplicationState() ==
+          YarnApplicationState.ACCEPTED) {
+        attemptId = appReport.getCurrentApplicationAttemptId();
+        appAttempt =
+            yarnCluster.getResourceManager().getRMContext().getRMApps()
+                .get(attemptId.getApplicationId()).getCurrentAppAttempt();
+        while (true) {
+          if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
+            break;
+          }
+        }
+        break;
+      }
+    }
+    // Just dig into the ResourceManager and get the AMRMToken just for the sake
+    // of testing.
+    UserGroupInformation.setLoginUser(UserGroupInformation
+        .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+
+    // emulate RM setup of AMRM token in credentials by adding the token
+    // *before* setting the token service
+    UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
+    appAttempt.getAMRMToken().setService(
+        ClientRMProxy.getAMRMTokenService(conf));
+  }
+
+  @After
+  public void teardown() throws YarnException, IOException {
+    yarnClient.killApplication(attemptId.getApplicationId());
+    attemptId = null;
+
+    if (yarnClient != null &&
+        yarnClient.getServiceState() == Service.STATE.STARTED) {
+      yarnClient.stop();
+    }
+    if (yarnCluster != null &&
+        yarnCluster.getServiceState() == Service.STATE.STARTED) {
+      yarnCluster.stop();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29d9e4d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 3ecc5cd..b059118 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -43,7 +43,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
@@ -56,24 +55,18 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.NMTokenCache;
-import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -81,10 +74,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedule
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -97,26 +88,8 @@ import org.eclipse.jetty.util.log.Log;
  * Test application master client class to resource manager.
  */
 @RunWith(value = Parameterized.class)
-public class TestAMRMClient {
-  private String schedulerName = null;
-  private boolean autoUpdate = false;
-  private Configuration conf = null;
-  private MiniYARNCluster yarnCluster = null;
-  private YarnClient yarnClient = null;
-  private List<NodeReport> nodeReports = null;
-  private ApplicationAttemptId attemptId = null;
-  private int nodeCount = 3;
-  
-  static final int rolling_interval_sec = 13;
-  static final long am_expire_ms = 4000;
-
-  private Resource capability;
-  private Priority priority;
-  private Priority priority2;
-  private String node;
-  private String rack;
-  private String[] nodes;
-  private String[] racks;
+public class TestAMRMClient extends BaseAMRMClientTest{
+
   private final static int DEFAULT_ITERATION = 3;
 
   public TestAMRMClient(String schedulerName, boolean autoUpdate) {
@@ -134,127 +107,6 @@ public class TestAMRMClient {
     });
   }
 
-  @Before
-  public void setup() throws Exception {
-    conf = new YarnConfiguration();
-    createClusterAndStartApplication(conf);
-  }
-
-  private void createClusterAndStartApplication(Configuration conf)
-      throws Exception {
-    // start minicluster
-    this.conf = conf;
-    if (autoUpdate) {
-      conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true);
-    }
-    conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName);
-    conf.setLong(
-      YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
-      rolling_interval_sec);
-    conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms);
-    conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
-    // set the minimum allocation so that resource decrease can go under 1024
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
-    conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
-    conf.setBoolean(
-        YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
-    conf.setInt(
-        YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
-    yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
-    yarnCluster.init(conf);
-    yarnCluster.start();
-
-    // start rm client
-    yarnClient = YarnClient.createYarnClient();
-    yarnClient.init(conf);
-    yarnClient.start();
-
-    // get node info
-    assertTrue("All node managers did not connect to the RM within the "
-        + "allotted 5-second timeout",
-        yarnCluster.waitForNodeManagersToConnect(5000L));
-    nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
-    assertEquals("Not all node managers were reported running",
-        nodeCount, nodeReports.size());
-
-    priority = Priority.newInstance(1);
-    priority2 = Priority.newInstance(2);
-    capability = Resource.newInstance(1024, 1);
-
-    node = nodeReports.get(0).getNodeId().getHost();
-    rack = nodeReports.get(0).getRackName();
-    nodes = new String[]{ node };
-    racks = new String[]{ rack };
-
-    // submit new app
-    ApplicationSubmissionContext appContext = 
-        yarnClient.createApplication().getApplicationSubmissionContext();
-    ApplicationId appId = appContext.getApplicationId();
-    // set the application name
-    appContext.setApplicationName("Test");
-    // Set the priority for the application master
-    Priority pri = Records.newRecord(Priority.class);
-    pri.setPriority(0);
-    appContext.setPriority(pri);
-    // Set the queue to which this application is to be submitted in the RM
-    appContext.setQueue("default");
-    // Set up the container launch context for the application master
-    ContainerLaunchContext amContainer =
-        BuilderUtils.newContainerLaunchContext(
-          Collections.<String, LocalResource> emptyMap(),
-          new HashMap<String, String>(), Arrays.asList("sleep", "100"),
-          new HashMap<String, ByteBuffer>(), null,
-          new HashMap<ApplicationAccessType, String>());
-    appContext.setAMContainerSpec(amContainer);
-    appContext.setResource(Resource.newInstance(1024, 1));
-    // Create the request to send to the applications manager
-    SubmitApplicationRequest appRequest = Records
-        .newRecord(SubmitApplicationRequest.class);
-    appRequest.setApplicationSubmissionContext(appContext);
-    // Submit the application to the applications manager
-    yarnClient.submitApplication(appContext);
-
-    // wait for app to start
-    RMAppAttempt appAttempt = null;
-    while (true) {
-      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-      if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
-        attemptId = appReport.getCurrentApplicationAttemptId();
-        appAttempt =
-            yarnCluster.getResourceManager().getRMContext().getRMApps()
-              .get(attemptId.getApplicationId()).getCurrentAppAttempt();
-        while (true) {
-          if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
-            break;
-          }
-        }
-        break;
-      }
-    }
-    // Just dig into the ResourceManager and get the AMRMToken just for the sake
-    // of testing.
-    UserGroupInformation.setLoginUser(UserGroupInformation
-      .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
-
-    // emulate RM setup of AMRM token in credentials by adding the token
-    // *before* setting the token service
-    UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
-    appAttempt.getAMRMToken().setService(ClientRMProxy.getAMRMTokenService(conf));
-  }
-  
-  @After
-  public void teardown() throws YarnException, IOException {
-    yarnClient.killApplication(attemptId.getApplicationId());
-    attemptId = null;
-
-    if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) {
-      yarnClient.stop();
-    }
-    if (yarnCluster != null && yarnCluster.getServiceState() == STATE.STARTED) {
-      yarnCluster.stop();
-    }
-  }
-
   @Test (timeout = 60000)
   public void testAMRMClientNoMatchingRequests()
       throws IOException, YarnException {
@@ -905,7 +757,7 @@ public class TestAMRMClient {
     initAMRMClientAndTest(false);
   }
 
-  private void initAMRMClientAndTest(boolean useAllocReqId)
+  protected void initAMRMClientAndTest(boolean useAllocReqId)
       throws YarnException, IOException {
     AMRMClient<ContainerRequest> amClient = null;
     try {
@@ -1946,7 +1798,7 @@ public class TestAMRMClient {
       // Wait for enough time and make sure the roll_over happens
       // At mean time, the old AMRMToken should continue to work
       while (System.currentTimeMillis() - startTime <
-          rolling_interval_sec * 1000) {
+          rollingIntervalSec * 1000) {
         amClient.allocate(0.1f);
         sleep(1000);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29d9e4d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java
new file mode 100644
index 0000000..fdc8d58
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.client.api.impl;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMTokenCache;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
+
+/**
+ * Test Placement Constraints and Scheduling Requests.
+ */
+public class TestAMRMClientPlacementConstraints extends BaseAMRMClientTest {
+
+  @Test(timeout=60000)
+  public void testAMRMClientWithPlacementConstraints()
+      throws Exception {
+    // we have to create a new instance of MiniYARNCluster to avoid SASL qop
+    // mismatches between client and server
+    teardown();
+    conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
+    createClusterAndStartApplication(conf);
+
+    AMRMClient<AMRMClient.ContainerRequest> amClient =
+        AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient();
+    amClient.setNMTokenCache(new NMTokenCache());
+    //asserting we are not using the singleton instance cache
+    Assert.assertNotSame(NMTokenCache.getSingleton(),
+        amClient.getNMTokenCache());
+
+    final List<Container> allocatedContainers = new ArrayList<>();
+    final List<RejectedSchedulingRequest> rejectedSchedulingRequests =
+        new ArrayList<>();
+    AMRMClientAsync asyncClient = new AMRMClientAsyncImpl<>(amClient, 1000,
+        new AMRMClientAsync.AbstractCallbackHandler() {
+          @Override
+          public void onContainersAllocated(List<Container> containers) {
+            allocatedContainers.addAll(containers);
+          }
+
+          @Override
+          public void onRequestsRejected(
+              List<RejectedSchedulingRequest> rejReqs) {
+            rejectedSchedulingRequests.addAll(rejReqs);
+          }
+
+          @Override
+          public void onContainersCompleted(List<ContainerStatus> statuses) {}
+          @Override
+          public void onContainersUpdated(List<UpdatedContainer> containers) {}
+          @Override
+          public void onShutdownRequest() {}
+          @Override
+          public void onNodesUpdated(List<NodeReport> updatedNodes) {}
+          @Override
+          public void onError(Throwable e) {}
+
+          @Override
+          public float getProgress() {
+            return 0.1f;
+          }
+        });
+
+    asyncClient.init(conf);
+    asyncClient.start();
+    Map<Set<String>, PlacementConstraint> pcMapping = new HashMap<>();
+    pcMapping.put(Collections.singleton("foo"),
+        PlacementConstraints.build(
+            PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))));
+    pcMapping.put(Collections.singleton("bar"),
+        PlacementConstraints.build(
+            PlacementConstraints.targetNotIn(NODE, allocationTag("bar"))));
+    asyncClient.registerApplicationMaster("Host", 10000, "", pcMapping);
+
+    // Send two types of requests - 4 with source tag "foo" have numAlloc = 1
+    // and 1 with source tag "bar" and has numAlloc = 4. Both should be
+    // handled similarly. i.e: Since there are only 3 nodes,
+    // 2 schedulingRequests - 1 with source tag "foo" on one with source
+    // tag "bar" should get rejected.
+    asyncClient.addSchedulingRequests(
+        Arrays.asList(
+            // 4 reqs with numAlloc = 1
+            schedulingRequest(1, 1, 1, 1, 512, "foo"),
+            schedulingRequest(1, 1, 2, 1, 512, "foo"),
+            schedulingRequest(1, 1, 3, 1, 512, "foo"),
+            schedulingRequest(1, 1, 4, 1, 512, "foo"),
+            // 1 req with numAlloc = 4
+            schedulingRequest(4, 1, 5, 1, 512, "bar")));
+
+    // kick the scheduler
+    waitForContainerAllocation(allocatedContainers,
+        rejectedSchedulingRequests, 6, 2);
+
+    Assert.assertEquals(6, allocatedContainers.size());
+    Map<NodeId, List<Container>> containersPerNode =
+        allocatedContainers.stream().collect(
+            Collectors.groupingBy(Container::getNodeId));
+
+    // Ensure 2 containers allocated per node.
+    // Each node should have a "foo" and a "bar" container.
+    Assert.assertEquals(3, containersPerNode.entrySet().size());
+    HashSet<String> srcTags = new HashSet<>(Arrays.asList("foo", "bar"));
+    containersPerNode.entrySet().forEach(
+        x ->
+          Assert.assertEquals(
+              srcTags,
+              x.getValue()
+                  .stream()
+                  .map(y -> y.getAllocationTags().iterator().next())
+                  .collect(Collectors.toSet()))
+    );
+
+    // Ensure 2 rejected requests - 1 of "foo" and 1 of "bar"
+    Assert.assertEquals(2, rejectedSchedulingRequests.size());
+    Assert.assertEquals(srcTags,
+        rejectedSchedulingRequests
+            .stream()
+            .map(x -> x.getRequest().getAllocationTags().iterator().next())
+            .collect(Collectors.toSet()));
+
+    asyncClient.stop();
+  }
+
+  private static void waitForContainerAllocation(
+      List<Container> allocatedContainers,
+      List<RejectedSchedulingRequest> rejectedRequests,
+      int containerNum, int rejNum) throws Exception {
+
+    int maxCount = 10;
+    while (maxCount >= 0 &&
+        (allocatedContainers.size() < containerNum ||
+            rejectedRequests.size() < rejNum)) {
+      maxCount--;
+      sleep(1000);
+    }
+  }
+
+  private static SchedulingRequest schedulingRequest(int numAllocations,
+      int priority, long allocReqId, int cores, int mem, String... tags) {
+    return schedulingRequest(numAllocations, priority, allocReqId, cores, mem,
+        ExecutionType.GUARANTEED, tags);
+  }
+
+  private static SchedulingRequest schedulingRequest(int numAllocations,
+      int priority, long allocReqId, int cores, int mem,
+      ExecutionType execType, String... tags) {
+    return SchedulingRequest.newBuilder()
+        .priority(Priority.newInstance(priority))
+        .allocationRequestId(allocReqId)
+        .allocationTags(new HashSet<>(Arrays.asList(tags)))
+        .executionType(ExecutionTypeRequest.newInstance(execType, true))
+        .resourceSizing(
+            ResourceSizing.newInstance(numAllocations,
+                Resource.newInstance(mem, cores)))
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29d9e4d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 563df0d..a504221 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -262,6 +262,9 @@ public class RMContainerImpl implements RMContainer {
       rmContext.getSystemMetricsPublisher().containerCreated(
           this, this.creationTime);
     }
+    if (this.container != null) {
+      this.allocationTags = this.container.getAllocationTags();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29d9e4d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 213d784..72376df 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -589,6 +589,7 @@ public abstract class AbstractYarnScheduler
     container.setVersion(status.getVersion());
     container.setExecutionType(status.getExecutionType());
     container.setAllocationRequestId(status.getAllocationRequestId());
+    container.setAllocationTags(status.getAllocationTags());
     ApplicationAttemptId attemptId =
         container.getId().getApplicationAttemptId();
     RMContainer rmContainer = new RMContainerImpl(container,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29d9e4d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 88a9049..3930a35 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -672,6 +672,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
               containerType, container.getExecutionType(),
               container.getAllocationRequestId(),
               rmContainer.getAllocationTags()));
+      container.setAllocationTags(rmContainer.getAllocationTags());
       updateNMToken(container);
     } catch (IllegalArgumentException e) {
       // DNS might be down, skip returning this container.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29d9e4d5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
index 956a3c9..c4b82e8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java
@@ -64,12 +64,12 @@ public final class PlacementConstraintsUtil {
       throws InvalidAllocationTagsQueryException {
     long minScopeCardinality = 0;
     long maxScopeCardinality = 0;
-    if (sc.getScope() == PlacementConstraints.NODE) {
+    if (sc.getScope().equals(PlacementConstraints.NODE)) {
       minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
           te.getTargetValues(), Long::max);
       maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), appId,
           te.getTargetValues(), Long::min);
-    } else if (sc.getScope() == PlacementConstraints.RACK) {
+    } else if (sc.getScope().equals(PlacementConstraints.RACK)) {
       minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,
           te.getTargetValues(), Long::max);
       maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), appId,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message