hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wan...@apache.org
Subject [2/3] hadoop git commit: YARN-5959. RM changes to support change of container ExecutionType. (Arun Suresh via wangda)
Date Thu, 05 Jan 2017 18:36:39 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
new file mode 100644
index 0000000..7381250
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerError;
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Class encapsulates all outstanding container increase and decrease
+ * requests for an application.
+ */
+public class ContainerUpdateContext {
+
+  public static final ContainerId UNDEFINED =
+      ContainerId.newContainerId(ApplicationAttemptId.newInstance(
+              ApplicationId.newInstance(-1, -1), -1), -1);
+  protected static final RecordFactory RECORD_FACTORY =
+      RecordFactoryProvider.getRecordFactory(null);
+
+  // Keep track of containers that are undergoing promotion
+  private final Map<SchedulerRequestKey, Map<Resource,
+      Map<NodeId, Set<ContainerId>>>> outstandingIncreases = new HashMap<>();
+
+  private final Set<ContainerId> outstandingDecreases = new HashSet<>();
+  private final AppSchedulingInfo appSchedulingInfo;
+
+  ContainerUpdateContext(AppSchedulingInfo appSchedulingInfo) {
+    this.appSchedulingInfo = appSchedulingInfo;
+  }
+
+  private synchronized boolean isBeingIncreased(Container container) {
+    Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
+        outstandingIncreases.get(
+            new SchedulerRequestKey(container.getPriority(),
+                container.getAllocationRequestId(), container.getId()));
+    if (resourceMap != null) {
+      Map<NodeId, Set<ContainerId>> locationMap =
+          resourceMap.get(container.getResource());
+      if (locationMap != null) {
+        Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
+        if (containerIds != null && !containerIds.isEmpty()) {
+          return containerIds.contains(container.getId());
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Add the container to outstanding decreases.
+   * @param container Container.
+   * @return true if updated to outstanding decreases was successful.
+   */
+  public synchronized boolean checkAndAddToOutstandingDecreases(
+      Container container) {
+    if (isBeingIncreased(container)
+        || outstandingDecreases.contains(container.getId())) {
+      return false;
+    }
+    outstandingDecreases.add(container.getId());
+    return true;
+  }
+
+  /**
+   * Add the container to outstanding increases.
+   * @param rmContainer RMContainer.
+   * @param schedulerNode SchedulerNode.
+   * @param updateRequest UpdateContainerRequest.
+   * @return true if updated to outstanding increases was successful.
+   */
+  public synchronized boolean checkAndAddToOutstandingIncreases(
+      RMContainer rmContainer, SchedulerNode schedulerNode,
+      UpdateContainerRequest updateRequest) {
+    Container container = rmContainer.getContainer();
+    SchedulerRequestKey schedulerKey =
+        SchedulerRequestKey.create(updateRequest,
+            rmContainer.getAllocatedSchedulerKey());
+    Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
+        outstandingIncreases.get(schedulerKey);
+    if (resourceMap == null) {
+      resourceMap = new HashMap<>();
+      outstandingIncreases.put(schedulerKey, resourceMap);
+    }
+    Map<NodeId, Set<ContainerId>> locationMap =
+        resourceMap.get(container.getResource());
+    if (locationMap == null) {
+      locationMap = new HashMap<>();
+      resourceMap.put(container.getResource(), locationMap);
+    }
+    Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
+    if (containerIds == null) {
+      containerIds = new HashSet<>();
+      locationMap.put(container.getNodeId(), containerIds);
+    }
+    if (containerIds.contains(container.getId())
+        || outstandingDecreases.contains(container.getId())) {
+      return false;
+    }
+    containerIds.add(container.getId());
+
+    Map<SchedulerRequestKey, Map<String, ResourceRequest>> updateResReqs =
+        new HashMap<>();
+    Resource resToIncrease = getResourceToIncrease(updateRequest, rmContainer);
+    Map<String, ResourceRequest> resMap =
+        createResourceRequests(rmContainer, schedulerNode,
+            schedulerKey, resToIncrease);
+    updateResReqs.put(schedulerKey, resMap);
+    appSchedulingInfo.addToPlacementSets(false, updateResReqs);
+    return true;
+  }
+
+  private Map<String, ResourceRequest> createResourceRequests(
+      RMContainer rmContainer, SchedulerNode schedulerNode,
+      SchedulerRequestKey schedulerKey, Resource resToIncrease) {
+    Map<String, ResourceRequest> resMap = new HashMap<>();
+    resMap.put(rmContainer.getContainer().getNodeId().getHost(),
+        createResourceReqForIncrease(schedulerKey, resToIncrease,
+            RECORD_FACTORY.newRecordInstance(ResourceRequest.class),
+            rmContainer, rmContainer.getContainer().getNodeId().getHost()));
+    resMap.put(schedulerNode.getRackName(),
+        createResourceReqForIncrease(schedulerKey, resToIncrease,
+            RECORD_FACTORY.newRecordInstance(ResourceRequest.class),
+            rmContainer, schedulerNode.getRackName()));
+    resMap.put(ResourceRequest.ANY,
+        createResourceReqForIncrease(schedulerKey, resToIncrease,
+            RECORD_FACTORY.newRecordInstance(ResourceRequest.class),
+            rmContainer, ResourceRequest.ANY));
+    return resMap;
+  }
+
+  private Resource getResourceToIncrease(UpdateContainerRequest updateReq,
+      RMContainer rmContainer) {
+    if (updateReq.getContainerUpdateType() ==
+        ContainerUpdateType.PROMOTE_EXECUTION_TYPE) {
+      return rmContainer.getContainer().getResource();
+    }
+    // TODO: Fix this for container increase..
+    //       This has to equal the Resources in excess of fitsIn()
+    //       for container increase and is equal to the container total
+    //       resource for Promotion.
+    return null;
+  }
+
+  private static ResourceRequest createResourceReqForIncrease(
+      SchedulerRequestKey schedulerRequestKey, Resource resToIncrease,
+      ResourceRequest rr, RMContainer rmContainer, String resourceName) {
+    rr.setResourceName(resourceName);
+    rr.setNumContainers(1);
+    rr.setRelaxLocality(false);
+    rr.setPriority(rmContainer.getContainer().getPriority());
+    rr.setAllocationRequestId(schedulerRequestKey.getAllocationRequestId());
+    rr.setCapability(resToIncrease);
+    rr.setNodeLabelExpression(rmContainer.getNodeLabelExpression());
+    rr.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(
+        ExecutionType.GUARANTEED, true));
+    return rr;
+  }
+
+  /**
+   * Remove Container from outstanding increases / decreases. Calling this
+   * method essentially completes the update process.
+   * @param schedulerKey SchedulerRequestKey.
+   * @param container Container.
+   */
+  public synchronized void removeFromOutstandingUpdate(
+      SchedulerRequestKey schedulerKey, Container container) {
+    Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
+        outstandingIncreases.get(schedulerKey);
+    if (resourceMap != null) {
+      Map<NodeId, Set<ContainerId>> locationMap =
+          resourceMap.get(container.getResource());
+      if (locationMap != null) {
+        Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
+        if (containerIds != null && !containerIds.isEmpty()) {
+          containerIds.remove(container.getId());
+          if (containerIds.isEmpty()) {
+            locationMap.remove(container.getNodeId());
+          }
+        }
+        if (locationMap.isEmpty()) {
+          resourceMap.remove(container.getResource());
+        }
+      }
+      if (resourceMap.isEmpty()) {
+        outstandingIncreases.remove(schedulerKey);
+      }
+    }
+    outstandingDecreases.remove(container.getId());
+  }
+
+  /**
+   * Check if a new container is to be matched up against an outstanding
+   * Container increase request.
+   * @param schedulerKey SchedulerRequestKey.
+   * @param rmContainer RMContainer.
+   * @return ContainerId.
+   */
+  public ContainerId matchContainerToOutstandingIncreaseReq(
+      SchedulerNode node, SchedulerRequestKey schedulerKey,
+      RMContainer rmContainer) {
+    ContainerId retVal = null;
+    Container container = rmContainer.getContainer();
+    Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
+        outstandingIncreases.get(schedulerKey);
+    if (resourceMap != null) {
+      Map<NodeId, Set<ContainerId>> locationMap =
+          resourceMap.get(container.getResource());
+      if (locationMap != null) {
+        Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
+        if (containerIds != null && !containerIds.isEmpty()) {
+          retVal = containerIds.iterator().next();
+        }
+      }
+    }
+    // Allocation happened on NM on the same host, but not on the NM
+    // we need.. We need to signal that this container has to be released.
+    // We also need to add these requests back.. to be reallocated.
+    if (resourceMap != null && retVal == null) {
+      Map<SchedulerRequestKey, Map<String, ResourceRequest>> reqsToUpdate =
+          new HashMap<>();
+      Map<String, ResourceRequest> resMap = createResourceRequests
+          (rmContainer, node, schedulerKey,
+          rmContainer.getContainer().getResource());
+      reqsToUpdate.put(schedulerKey, resMap);
+      appSchedulingInfo.addToPlacementSets(true, reqsToUpdate);
+      return UNDEFINED;
+    }
+    return retVal;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdates.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdates.java
new file mode 100644
index 0000000..77b1545
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdates.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Holder class that maintains list of container update requests
+ */
+public class ContainerUpdates {
+
+  final List<UpdateContainerRequest> increaseRequests = new ArrayList<>();
+  final List<UpdateContainerRequest> decreaseRequests = new ArrayList<>();
+  final List<UpdateContainerRequest> promotionRequests = new ArrayList<>();
+  final List<UpdateContainerRequest> demotionRequests = new ArrayList<>();
+
+  /**
+   * Returns Container Increase Requests.
+   * @return Container Increase Requests.
+   */
+  public List<UpdateContainerRequest> getIncreaseRequests() {
+    return increaseRequests;
+  }
+
+  /**
+   * Returns Container Decrease Requests.
+   * @return Container Decrease Requests.
+   */
+  public List<UpdateContainerRequest> getDecreaseRequests() {
+    return decreaseRequests;
+  }
+
+  /**
+   * Returns Container Promotion Requests.
+   * @return Container Promotion Requests.
+   */
+  public List<UpdateContainerRequest> getPromotionRequests() {
+    return promotionRequests;
+  }
+
+  /**
+   * Returns Container Demotion Requests.
+   * @return Container Demotion Requests.
+   */
+  public List<UpdateContainerRequest> getDemotionRequests() {
+    return demotionRequests;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index e94d800..4a8b2da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NMToken;
@@ -54,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.UpdateContainerError;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -133,10 +136,15 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   private AtomicLong firstContainerAllocatedTime = new AtomicLong(0);
 
   protected List<RMContainer> newlyAllocatedContainers = new ArrayList<>();
+  protected Map<ContainerId, RMContainer> newlyPromotedContainers = new HashMap<>();
+  protected Map<ContainerId, RMContainer> newlyDemotedContainers = new HashMap<>();
+  protected List<RMContainer> tempContainerToKill = new ArrayList<>();
   protected Map<ContainerId, RMContainer> newlyDecreasedContainers = new HashMap<>();
   protected Map<ContainerId, RMContainer> newlyIncreasedContainers = new HashMap<>();
   protected Set<NMToken> updatedNMTokens = new HashSet<>();
 
+  protected List<UpdateContainerError> updateContainerErrors = new ArrayList<>();
+
   // This pendingRelease is used in work-preserving recovery scenario to keep
   // track of the AM's outstanding release requests. RM on recovery could
   // receive the release request form AM before it receives the container status
@@ -247,6 +255,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     return this.appSchedulingInfo;
   }
 
+  public ContainerUpdateContext getUpdateContext() {
+    return this.appSchedulingInfo.getUpdateContext();
+  }
+
   /**
    * Is this application pending?
    * @return true if it is else false.
@@ -537,8 +549,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       writeLock.lock();
       // Create RMContainer if necessary
       if (rmContainer == null) {
-        rmContainer = new RMContainerImpl(container, getApplicationAttemptId(),
-            node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
+        rmContainer = new RMContainerImpl(container, schedulerKey,
+            getApplicationAttemptId(), node.getNodeID(),
+            appSchedulingInfo.getUser(), rmContext);
       }
       if (rmContainer.getState() == RMContainerState.NEW) {
         attemptResourceUsage.incReserved(node.getPartition(),
@@ -635,10 +648,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   }
   
   private Container updateContainerAndNMToken(RMContainer rmContainer,
-      boolean newContainer, boolean increasedContainer) {
+      ContainerUpdateType updateType) {
     Container container = rmContainer.getContainer();
     ContainerType containerType = ContainerType.TASK;
-    if (!newContainer) {
+    if (updateType != null) {
       container.setVersion(container.getVersion() + 1);
     }
     // The working knowledge is that masterContainer for AM is null as it
@@ -662,12 +675,15 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       return null;
     }
     
-    if (newContainer) {
+    if (updateType == null ||
+        ContainerUpdateType.PROMOTE_EXECUTION_TYPE == updateType ||
+        ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateType) {
       rmContainer.handle(new RMContainerEvent(
           rmContainer.getContainerId(), RMContainerEventType.ACQUIRED));
     } else {
       rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
-          rmContainer.getContainerId(), increasedContainer));
+          rmContainer.getContainerId(),
+          ContainerUpdateType.INCREASE_RESOURCE == updateType));
     }
     return container;
   }
@@ -699,8 +715,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       Iterator<RMContainer> i = newlyAllocatedContainers.iterator();
       while (i.hasNext()) {
         RMContainer rmContainer = i.next();
-        Container updatedContainer = updateContainerAndNMToken(rmContainer,
-            true, false);
+        Container updatedContainer =
+            updateContainerAndNMToken(rmContainer, null);
         // Only add container to return list when it's not null.
         // updatedContainer could be null when generate token failed, it can be
         // caused by DNS resolving failed.
@@ -713,9 +729,142 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     } finally {
       writeLock.unlock();
     }
+  }
 
+  public void addToNewlyDemotedContainers(ContainerId containerId,
+      RMContainer rmContainer) {
+    newlyDemotedContainers.put(containerId, rmContainer);
   }
-  
+
+  protected synchronized void addToUpdateContainerErrors(
+      UpdateContainerError error) {
+    updateContainerErrors.add(error);
+  }
+
+  protected synchronized void addToNewlyAllocatedContainers(
+      SchedulerNode node, RMContainer rmContainer) {
+    if (oppContainerContext == null) {
+      newlyAllocatedContainers.add(rmContainer);
+      return;
+    }
+    ContainerId matchedContainerId =
+        getUpdateContext().matchContainerToOutstandingIncreaseReq(
+            node, rmContainer.getAllocatedSchedulerKey(), rmContainer);
+    if (matchedContainerId != null) {
+      if (ContainerUpdateContext.UNDEFINED == matchedContainerId) {
+        // This is a spurious allocation (relaxLocality = false
+        // resulted in the Container being allocated on an NM on the same host
+        // but not on the NM running the container to be updated. Can
+        // happen if more than one NM exists on the same host.. usually
+        // occurs when using MiniYARNCluster to test).
+        tempContainerToKill.add(rmContainer);
+      } else {
+        newlyPromotedContainers.put(matchedContainerId, rmContainer);
+      }
+    } else {
+      newlyAllocatedContainers.add(rmContainer);
+    }
+  }
+
+  public List<Container> pullNewlyPromotedContainers() {
+    return pullContainersWithUpdatedExecType(newlyPromotedContainers,
+        ContainerUpdateType.PROMOTE_EXECUTION_TYPE);
+  }
+
+  public List<Container> pullNewlyDemotedContainers() {
+    return pullContainersWithUpdatedExecType(newlyDemotedContainers,
+        ContainerUpdateType.DEMOTE_EXECUTION_TYPE);
+  }
+
+  public List<UpdateContainerError> pullUpdateContainerErrors() {
+    List<UpdateContainerError> errors =
+        new ArrayList<>(updateContainerErrors);
+    updateContainerErrors.clear();
+    return errors;
+  }
+
+  /**
+   * A container is promoted if its executionType is changed from
+   * OPPORTUNISTIC to GUARANTEED. It id demoted if the change is from
+   * GUARANTEED to OPPORTUNISTIC.
+   * @return Newly Promoted and Demoted containers
+   */
+  private List<Container> pullContainersWithUpdatedExecType(
+      Map<ContainerId, RMContainer> newlyUpdatedContainers,
+      ContainerUpdateType updateTpe) {
+    List<Container> updatedContainers = new ArrayList<>();
+    if (oppContainerContext == null) {
+      return updatedContainers;
+    }
+    try {
+      writeLock.lock();
+      Iterator<Map.Entry<ContainerId, RMContainer>> i =
+          newlyUpdatedContainers.entrySet().iterator();
+      while (i.hasNext()) {
+        Map.Entry<ContainerId, RMContainer> entry = i.next();
+        ContainerId matchedContainerId = entry.getKey();
+        RMContainer rmContainer = entry.getValue();
+
+        // swap containers
+        RMContainer existingRMContainer = swapContainer(
+            rmContainer, matchedContainerId);
+        getUpdateContext().removeFromOutstandingUpdate(
+            rmContainer.getAllocatedSchedulerKey(),
+            existingRMContainer.getContainer());
+        Container updatedContainer = updateContainerAndNMToken(
+            existingRMContainer, updateTpe);
+        updatedContainers.add(updatedContainer);
+
+        tempContainerToKill.add(rmContainer);
+        i.remove();
+      }
+      // Release all temporary containers
+      Iterator<RMContainer> tempIter = tempContainerToKill.iterator();
+      while (tempIter.hasNext()) {
+        RMContainer c = tempIter.next();
+        // Mark container for release (set RRs to null, so RM does not think
+        // it is a recoverable container)
+        ((RMContainerImpl) c).setResourceRequests(null);
+        ((AbstractYarnScheduler) rmContext.getScheduler()).completedContainer(c,
+            SchedulerUtils.createAbnormalContainerStatus(c.getContainerId(),
+                SchedulerUtils.UPDATED_CONTAINER),
+            RMContainerEventType.KILL);
+        tempIter.remove();
+      }
+      return updatedContainers;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private RMContainer swapContainer(RMContainer rmContainer, ContainerId
+      matchedContainerId) {
+    RMContainer existingRMContainer =
+        getRMContainer(matchedContainerId);
+    if (existingRMContainer != null) {
+      // Swap updated container with the existing container
+      Container updatedContainer = rmContainer.getContainer();
+
+      Container newContainer = Container.newInstance(matchedContainerId,
+          existingRMContainer.getContainer().getNodeId(),
+          existingRMContainer.getContainer().getNodeHttpAddress(),
+          updatedContainer.getResource(),
+          existingRMContainer.getContainer().getPriority(), null,
+          updatedContainer.getExecutionType());
+      newContainer.setAllocationRequestId(
+          existingRMContainer.getContainer().getAllocationRequestId());
+      newContainer.setVersion(existingRMContainer.getContainer().getVersion());
+
+      rmContainer.getContainer().setResource(
+          existingRMContainer.getContainer().getResource());
+      rmContainer.getContainer().setExecutionType(
+          existingRMContainer.getContainer().getExecutionType());
+
+      ((RMContainerImpl)existingRMContainer).setContainer(newContainer);
+    }
+    return existingRMContainer;
+  }
+
   private List<Container> pullNewlyUpdatedContainers(
       Map<ContainerId, RMContainer> updatedContainerMap, boolean increase) {
     try {
@@ -728,7 +877,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       while (i.hasNext()) {
         RMContainer rmContainer = i.next().getValue();
         Container updatedContainer = updateContainerAndNMToken(rmContainer,
-            false, increase);
+            increase ? ContainerUpdateType.INCREASE_RESOURCE :
+                ContainerUpdateType.DECREASE_RESOURCE);
         if (updatedContainer != null) {
           returnContainerList.add(updatedContainer);
           i.remove();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index b227523..5360665 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -41,7 +42,10 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AccessType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -57,6 +61,9 @@ public class SchedulerUtils {
 
   public static final String RELEASED_CONTAINER = 
       "Container released by application";
+
+  public static final String UPDATED_CONTAINER =
+      "Temporary container killed by application for ExeutionType update";
   
   public static final String LOST_CONTAINER = 
       "Container released on a *lost* node";
@@ -376,4 +383,19 @@ public class SchedulerUtils {
     }
     return hasPendingResourceRequest(rc, usage, partitionToLookAt, cluster);
   }
+
+  public static RMContainer createOpportunisticRmContainer(RMContext rmContext,
+      Container container, boolean isRemotelyAllocated) {
+    SchedulerApplicationAttempt appAttempt =
+        ((AbstractYarnScheduler) rmContext.getScheduler())
+            .getCurrentAttemptForContainer(container.getId());
+    RMContainer rmContainer = new RMContainerImpl(container,
+        SchedulerRequestKey.extractFrom(container),
+        appAttempt.getApplicationAttemptId(), container.getNodeId(),
+        appAttempt.getUser(), rmContext, isRemotelyAllocated);
+    appAttempt.addRMContainer(container.getId(), rmContainer);
+    ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(
+        container.getNodeId()).allocateContainer(rmContainer);
+    return rmContainer;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
index ea1ae60..3185dc1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -42,8 +43,6 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
-import org.apache.hadoop.yarn.api.records.AbstractResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -137,8 +136,7 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
    * @param release
    * @param blacklistAdditions 
    * @param blacklistRemovals 
-   * @param increaseRequests
-   * @param decreaseRequests
+   * @param updateRequests
    * @return the {@link Allocation} for the application
    */
   @Public
@@ -146,8 +144,7 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
   Allocation allocate(ApplicationAttemptId appAttemptId,
       List<ResourceRequest> ask, List<ContainerId> release,
       List<String> blacklistAdditions, List<String> blacklistRemovals,
-      List<UpdateContainerRequest> increaseRequests,
-      List<UpdateContainerRequest> decreaseRequests);
+      ContainerUpdates updateRequests);
 
   /**
    * Get node resource usage report.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index b862850..55ffe25 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -90,6 +90,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContai
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
+
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@@ -921,22 +924,27 @@ public class CapacityScheduler extends
   public Allocation allocate(ApplicationAttemptId applicationAttemptId,
       List<ResourceRequest> ask, List<ContainerId> release,
       List<String> blacklistAdditions, List<String> blacklistRemovals,
-      List<UpdateContainerRequest> increaseRequests,
-      List<UpdateContainerRequest> decreaseRequests) {
+      ContainerUpdates updateRequests) {
     FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
     if (application == null) {
       return EMPTY_ALLOCATION;
     }
 
+    // Handle promotions and demotions
+    handleExecutionTypeUpdates(
+        application, updateRequests.getPromotionRequests(),
+        updateRequests.getDemotionRequests());
+
     // Release containers
     releaseContainers(release, application);
 
     // update increase requests
-    LeafQueue updateDemandForQueue = updateIncreaseRequests(increaseRequests,
+    LeafQueue updateDemandForQueue =
+        updateIncreaseRequests(updateRequests.getIncreaseRequests(),
         application);
 
     // Decrease containers
-    decreaseContainers(decreaseRequests, application);
+    decreaseContainers(updateRequests.getDecreaseRequests(), application);
 
     // Sanity check for new allocation requests
     normalizeRequests(ask);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 1eb48bb..eeb0815 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -746,7 +746,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       // When reserving container
       RMContainer updatedContainer = reservedContainer;
       if (updatedContainer == null) {
-        updatedContainer = new RMContainerImpl(container,
+        updatedContainer = new RMContainerImpl(container, schedulerKey,
             application.getApplicationAttemptId(), node.getNodeID(),
             application.getAppSchedulingInfo().getUser(), rmContext);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index b14bc20..809446f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -222,7 +222,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       }
 
       // Create RMContainer
-      RMContainer rmContainer = new RMContainerImpl(container,
+      RMContainer rmContainer = new RMContainerImpl(container, schedulerKey,
           this.getApplicationAttemptId(), node.getNodeID(),
           appSchedulingInfo.getUser(), this.rmContext,
           request.getNodeLabelExpression());
@@ -554,12 +554,14 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
           // Update this application for the allocated container
           if (!allocation.isIncreasedAllocation()) {
             // Allocate a new container
-            newlyAllocatedContainers.add(rmContainer);
+            addToNewlyAllocatedContainers(
+                schedulerContainer.getSchedulerNode(), rmContainer);
             liveContainers.put(containerId, rmContainer);
 
             // Deduct pending resource requests
             List<ResourceRequest> requests = appSchedulingInfo.allocate(
-                allocation.getAllocationLocalityType(), schedulerContainer.getSchedulerNode(),
+                allocation.getAllocationLocalityType(),
+                schedulerContainer.getSchedulerNode(),
                 schedulerContainer.getSchedulerRequestKey(),
                 schedulerContainer.getRmContainer().getContainer());
             ((RMContainerImpl) rmContainer).setResourceRequests(requests);
@@ -751,12 +753,15 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       List<Container> newlyAllocatedContainers = pullNewlyAllocatedContainers();
       List<Container> newlyIncreasedContainers = pullNewlyIncreasedContainers();
       List<Container> newlyDecreasedContainers = pullNewlyDecreasedContainers();
+      List<Container> newlyPromotedContainers = pullNewlyPromotedContainers();
+      List<Container> newlyDemotedContainers = pullNewlyDemotedContainers();
       List<NMToken> updatedNMTokens = pullUpdatedNMTokens();
       Resource headroom = getHeadroom();
       setApplicationHeadroomForMetrics(headroom);
       return new Allocation(newlyAllocatedContainers, headroom, null,
           currentContPreemption, Collections.singletonList(rr), updatedNMTokens,
-          newlyIncreasedContainers, newlyDecreasedContainers);
+          newlyIncreasedContainers, newlyDecreasedContainers,
+          newlyPromotedContainers, newlyDemotedContainers);
     } finally {
       writeLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index a9591a5..0daa8a4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -448,13 +448,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       }
 
       // Create RMContainer
-      rmContainer = new RMContainerImpl(container,
+      rmContainer = new RMContainerImpl(container, schedulerKey,
           getApplicationAttemptId(), node.getNodeID(),
           appSchedulingInfo.getUser(), rmContext);
       ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
 
       // Add it to allContainers list.
-      newlyAllocatedContainers.add(rmContainer);
+      addToNewlyAllocatedContainers(node, rmContainer);
       liveContainers.put(container.getId(), rmContainer);
 
       // Update consumption and track allocations

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index e3af150..78dfe61 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -18,18 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -51,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -80,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@@ -102,8 +92,17 @@ import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * A scheduler that schedules resources between a set of queues. The scheduler
@@ -812,8 +811,7 @@ public class FairScheduler extends
   public Allocation allocate(ApplicationAttemptId appAttemptId,
       List<ResourceRequest> ask, List<ContainerId> release,
       List<String> blacklistAdditions, List<String> blacklistRemovals,
-      List<UpdateContainerRequest> increaseRequests,
-      List<UpdateContainerRequest> decreaseRequests) {
+      ContainerUpdates updateRequests) {
 
     // Make sure this application exists
     FSAppAttempt application = getSchedulerApp(appAttemptId);
@@ -823,6 +821,11 @@ public class FairScheduler extends
       return EMPTY_ALLOCATION;
     }
 
+    // Handle promotions and demotions
+    handleExecutionTypeUpdates(
+        application, updateRequests.getPromotionRequests(),
+        updateRequests.getDemotionRequests());
+
     // Sanity check
     normalizeRequests(ask);
 
@@ -879,7 +882,9 @@ public class FairScheduler extends
     application.setApplicationHeadroomForMetrics(headroom);
     return new Allocation(newlyAllocatedContainers, headroom,
         preemptionContainerIds, null, null,
-        application.pullUpdatedNMTokens());
+        application.pullUpdatedNMTokens(), null, null,
+        application.pullNewlyPromotedContainers(),
+        application.pullNewlyDemotedContainers());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
index e60f70e..fa61710 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
@@ -68,7 +68,7 @@ public class FifoAppAttempt extends FiCaSchedulerApp {
 
       // Create RMContainer
       RMContainer rmContainer = new RMContainerImpl(container,
-          this.getApplicationAttemptId(), node.getNodeID(),
+          schedulerKey, this.getApplicationAttemptId(), node.getNodeID(),
           appSchedulingInfo.getUser(), this.rmContext,
           request.getNodeLabelExpression());
       ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
@@ -76,7 +76,7 @@ public class FifoAppAttempt extends FiCaSchedulerApp {
       updateAMContainerDiagnostics(AMState.ASSIGNED, null);
 
       // Add it to allContainers list.
-      newlyAllocatedContainers.add(rmContainer);
+      addToNewlyAllocatedContainers(node, rmContainer);
 
       ContainerId containerId = container.getId();
       liveContainers.put(containerId, rmContainer);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 643199c..f4ab9c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -71,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -326,8 +326,7 @@ public class FifoScheduler extends
   public Allocation allocate(ApplicationAttemptId applicationAttemptId,
       List<ResourceRequest> ask, List<ContainerId> release,
       List<String> blacklistAdditions, List<String> blacklistRemovals,
-      List<UpdateContainerRequest> increaseRequests,
-      List<UpdateContainerRequest> decreaseRequests) {
+      ContainerUpdates updateRequests) {
     FifoAppAttempt application = getApplicationAttempt(applicationAttemptId);
     if (application == null) {
       LOG.error("Calling allocate on removed " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
index 9dbf024..157518e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
@@ -157,7 +157,8 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
     return resourceRequestMap.get(resourceName);
   }
 
-  private void decrementOutstanding(ResourceRequest offSwitchRequest) {
+  private void decrementOutstanding(SchedulerRequestKey schedulerRequestKey,
+      ResourceRequest offSwitchRequest) {
     int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
 
     // Do not remove ANY
@@ -166,8 +167,6 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
     // Do we have any outstanding requests?
     // If there is nothing, we need to deactivate this application
     if (numOffSwitchContainers == 0) {
-      SchedulerRequestKey schedulerRequestKey = SchedulerRequestKey.create(
-          offSwitchRequest);
       appSchedulingInfo.decrementSchedulerKeyReference(schedulerRequestKey);
       appSchedulingInfo.checkForDeactivation();
     }
@@ -177,11 +176,15 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
         offSwitchRequest.getCapability());
   }
 
-  private ResourceRequest cloneResourceRequest(ResourceRequest request) {
-    ResourceRequest newRequest =
-        ResourceRequest.newInstance(request.getPriority(),
-            request.getResourceName(), request.getCapability(), 1,
-            request.getRelaxLocality(), request.getNodeLabelExpression());
+  public ResourceRequest cloneResourceRequest(ResourceRequest request) {
+    ResourceRequest newRequest = ResourceRequest.newBuilder()
+        .priority(request.getPriority())
+        .allocationRequestId(request.getAllocationRequestId())
+        .resourceName(request.getResourceName())
+        .capability(request.getCapability())
+        .numContainers(1)
+        .relaxLocality(request.getRelaxLocality())
+        .nodeLabelExpression(request.getNodeLabelExpression()).build();
     return newRequest;
   }
 
@@ -189,15 +192,15 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
    * The {@link ResourceScheduler} is allocating data-local resources to the
    * application.
    */
-  private void allocateRackLocal(SchedulerNode node,
-      ResourceRequest rackLocalRequest,
+  private void allocateRackLocal(SchedulerRequestKey schedulerKey,
+      SchedulerNode node, ResourceRequest rackLocalRequest,
       List<ResourceRequest> resourceRequests) {
     // Update future requirements
     decResourceRequest(node.getRackName(), rackLocalRequest);
 
     ResourceRequest offRackRequest = resourceRequestMap.get(
         ResourceRequest.ANY);
-    decrementOutstanding(offRackRequest);
+    decrementOutstanding(schedulerKey, offRackRequest);
 
     // Update cloned RackLocal and OffRack requests for recovery
     resourceRequests.add(cloneResourceRequest(rackLocalRequest));
@@ -208,10 +211,11 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
    * The {@link ResourceScheduler} is allocating data-local resources to the
    * application.
    */
-  private void allocateOffSwitch(ResourceRequest offSwitchRequest,
+  private void allocateOffSwitch(SchedulerRequestKey schedulerKey,
+      ResourceRequest offSwitchRequest,
       List<ResourceRequest> resourceRequests) {
     // Update future requirements
-    decrementOutstanding(offSwitchRequest);
+    decrementOutstanding(schedulerKey, offSwitchRequest);
     // Update cloned OffRack requests for recovery
     resourceRequests.add(cloneResourceRequest(offSwitchRequest));
   }
@@ -221,8 +225,8 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
    * The {@link ResourceScheduler} is allocating data-local resources to the
    * application.
    */
-  private void allocateNodeLocal(SchedulerNode node,
-      ResourceRequest nodeLocalRequest,
+  private void allocateNodeLocal(SchedulerRequestKey schedulerKey,
+      SchedulerNode node, ResourceRequest nodeLocalRequest,
       List<ResourceRequest> resourceRequests) {
     // Update future requirements
     decResourceRequest(node.getNodeName(), nodeLocalRequest);
@@ -233,7 +237,7 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
 
     ResourceRequest offRackRequest = resourceRequestMap.get(
         ResourceRequest.ANY);
-    decrementOutstanding(offRackRequest);
+    decrementOutstanding(schedulerKey, offRackRequest);
 
     // Update cloned NodeLocal, RackLocal and OffRack requests for recovery
     resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
@@ -278,8 +282,8 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
   }
 
   @Override
-  public List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
-      ResourceRequest request) {
+  public List<ResourceRequest> allocate(SchedulerRequestKey schedulerKey,
+      NodeType type, SchedulerNode node, ResourceRequest request) {
     try {
       writeLock.lock();
 
@@ -296,11 +300,11 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
       }
 
       if (type == NodeType.NODE_LOCAL) {
-        allocateNodeLocal(node, request, resourceRequests);
+        allocateNodeLocal(schedulerKey, node, request, resourceRequests);
       } else if (type == NodeType.RACK_LOCAL) {
-        allocateRackLocal(node, request, resourceRequests);
+        allocateRackLocal(schedulerKey, node, request, resourceRequests);
       } else{
-        allocateOffSwitch(request, resourceRequests);
+        allocateOffSwitch(schedulerKey, request, resourceRequests);
       }
 
       return resourceRequests;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
index cdb7c04..3cf5fa2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java
@@ -78,13 +78,14 @@ public interface SchedulingPlacementSet<N extends SchedulerNode> {
 
   /**
    * Notify container allocated.
+   * @param schedulerKey SchedulerRequestKey for this ResourceRequest
    * @param type Type of the allocation
    * @param node Which node this container allocated on
    * @param request Which resource request to allocate
    * @return list of ResourceRequests deducted
    */
-  List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
-      ResourceRequest request);
+  List<ResourceRequest> allocate(SchedulerRequestKey schedulerKey,
+      NodeType type, SchedulerNode node, ResourceRequest request);
 
   /**
    * We can still have pending requirement for a given NodeType and node

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
index 3288d39..da1bc2e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 
@@ -331,7 +332,8 @@ public class Application {
     // Get resources from the ResourceManager
     Allocation allocation = resourceManager.getResourceScheduler().allocate(
         applicationAttemptId, new ArrayList<ResourceRequest>(ask),
-        new ArrayList<ContainerId>(), null, null, null, null);
+        new ArrayList<ContainerId>(), null, null,
+        new ContainerUpdates());
 
     if (LOG.isInfoEnabled()) {
       LOG.info("-=======" + applicationAttemptId + System.lineSeparator() +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index 593de08..fbeca7b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -251,6 +251,13 @@ public class MockAM {
     return allocate(req);
   }
 
+  public AllocateResponse sendContainerUpdateRequest(
+      List<UpdateContainerRequest> updateRequests) throws Exception {
+    final AllocateRequest req = AllocateRequest.newInstance(0, 0F, null, null,
+        null, updateRequests);
+    return allocate(req);
+  }
+
   public AllocateResponse allocate(AllocateRequest allocateRequest)
             throws Exception {
     UserGroupInformation ugi =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
index 32cdb1b..2d76127 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
@@ -195,6 +195,12 @@ public class MockNM {
         isHealthy, resId);
   }
 
+  public NodeHeartbeatResponse nodeHeartbeat(
+      List<ContainerStatus> updatedStats, boolean isHealthy) throws Exception {
+    return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(),
+        isHealthy, ++responseId);
+  }
+
   public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats,
       List<Container> increasedConts, boolean isHealthy, int resId)
           throws Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a55bd84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index cb57f39..c135384 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -142,6 +142,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Clock;
@@ -1072,7 +1074,8 @@ public class TestClientRMService {
     Container container = Container.newInstance(
         ContainerId.newContainerId(attemptId, 1), null, "", null, null, null);
     RMContainerImpl containerimpl = spy(new RMContainerImpl(container,
-        attemptId, null, "", rmContext));
+        SchedulerRequestKey.extractFrom(container), attemptId, null, "",
+        rmContext));
     Map<ApplicationAttemptId, RMAppAttempt> attempts = 
       new HashMap<ApplicationAttemptId, RMAppAttempt>();
     attempts.put(attemptId, rmAppAttemptImpl);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message