hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject [20/38] hadoop git commit: YARN-6216. Unify Container Resizing code paths with Container Updates making it scheduler agnostic. (Arun Suresh via wangda)
Date Fri, 03 Mar 2017 01:57:28 GMT
YARN-6216. Unify Container Resizing code paths with Container Updates making it scheduler agnostic. (Arun Suresh via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/eac6b4c3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/eac6b4c3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/eac6b4c3

Branch: refs/heads/YARN-5734
Commit: eac6b4c35c50e555c2f1b5f913bb2c4d839f1ff4
Parents: 480b4dd
Author: Wangda Tan <wangda@apache.org>
Authored: Tue Feb 28 10:35:50 2017 -0800
Committer: Wangda Tan <wangda@apache.org>
Committed: Tue Feb 28 10:35:50 2017 -0800

----------------------------------------------------------------------
 .../sls/scheduler/ResourceSchedulerWrapper.java |   8 -
 .../server/scheduler/SchedulerRequestKey.java   |  12 +-
 .../server/resourcemanager/RMServerUtils.java   |  27 +-
 .../rmcontainer/RMContainer.java                |   4 -
 .../RMContainerChangeResourceEvent.java         |  44 ---
 .../rmcontainer/RMContainerImpl.java            |  46 ---
 .../scheduler/AbstractYarnScheduler.java        | 171 +++++++---
 .../scheduler/AppSchedulingInfo.java            | 283 +---------------
 .../scheduler/ContainerUpdateContext.java       | 193 ++++++++---
 .../scheduler/SchedulerApplicationAttempt.java  | 212 ++++--------
 .../scheduler/SchedulerNode.java                |  44 ---
 .../scheduler/capacity/AbstractCSQueue.java     |  13 +-
 .../scheduler/capacity/CSQueue.java             |  15 -
 .../scheduler/capacity/CapacityScheduler.java   | 121 +------
 .../scheduler/capacity/LeafQueue.java           | 152 +--------
 .../scheduler/capacity/ParentQueue.java         |  53 +--
 .../capacity/allocator/ContainerAllocator.java  |  31 +-
 .../allocator/IncreaseContainerAllocator.java   | 337 -------------------
 .../common/ContainerAllocationProposal.java     |   9 +-
 .../scheduler/common/fica/FiCaSchedulerApp.java | 245 +++-----------
 .../common/fica/FiCaSchedulerNode.java          |  14 -
 .../scheduler/fair/FairScheduler.java           |  11 +-
 .../scheduler/fifo/FifoScheduler.java           |   8 -
 .../scheduler/capacity/TestChildQueueOrder.java |   4 +-
 .../capacity/TestContainerResizing.java         | 134 +-------
 .../capacity/TestIncreaseAllocationExpirer.java |  12 +-
 .../scheduler/capacity/TestLeafQueue.java       |   4 +-
 .../scheduler/capacity/TestParentQueue.java     |   4 +-
 28 files changed, 482 insertions(+), 1729 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
index 5517362..df8323a 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
@@ -969,12 +969,4 @@ final public class ResourceSchedulerWrapper
     return Priority.newInstance(0);
   }
 
-  @Override
-  protected void decreaseContainer(
-      SchedContainerChangeRequest decreaseRequest,
-      SchedulerApplicationAttempt attempt) {
-    // TODO Auto-generated method stub
-    
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
index 02539ba..c4f37f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java
@@ -116,7 +116,17 @@ public final class SchedulerRequestKey implements
     if (priorityCompare != 0) {
       return priorityCompare;
     }
-    return Long.compare(allocationRequestId, o.getAllocationRequestId());
+    int allocReqCompare = Long.compare(
+        allocationRequestId, o.getAllocationRequestId());
+
+    if (allocReqCompare != 0) {
+      return allocReqCompare;
+    }
+
+    if (this.containerToUpdate != null && o.containerToUpdate != null) {
+      return (this.containerToUpdate.compareTo(o.containerToUpdate));
+    }
+    return 0;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index e98141b..0aa7a2c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -152,26 +152,16 @@ public class RMServerUtils {
       if (msg == null) {
         if ((updateType != ContainerUpdateType.PROMOTE_EXECUTION_TYPE) &&
             (updateType !=ContainerUpdateType.DEMOTE_EXECUTION_TYPE)) {
-          Resource original = rmContainer.getContainer().getResource();
-          Resource target = updateReq.getCapability();
-          if (Resources.fitsIn(target, original)) {
-            // This is a decrease request
-            if (validateIncreaseDecreaseRequest(rmContext, updateReq,
-                maximumAllocation, false)) {
-              updateRequests.getDecreaseRequests().add(updateReq);
-              outstandingUpdate.add(updateReq.getContainerId());
-            } else {
-              msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
-            }
-          } else {
-            // This is an increase request
-            if (validateIncreaseDecreaseRequest(rmContext, updateReq,
-                maximumAllocation, true)) {
+          if (validateIncreaseDecreaseRequest(
+              rmContext, updateReq, maximumAllocation)) {
+            if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
               updateRequests.getIncreaseRequests().add(updateReq);
-              outstandingUpdate.add(updateReq.getContainerId());
             } else {
-              msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
+              updateRequests.getDecreaseRequests().add(updateReq);
             }
+            outstandingUpdate.add(updateReq.getContainerId());
+          } else {
+            msg = RESOURCE_OUTSIDE_ALLOWED_RANGE;
           }
         } else {
           ExecutionType original = rmContainer.getExecutionType();
@@ -329,8 +319,7 @@ public class RMServerUtils {
 
   // Sanity check and normalize target resource
   private static boolean validateIncreaseDecreaseRequest(RMContext rmContext,
-      UpdateContainerRequest request, Resource maximumAllocation,
-      boolean increase) {
+      UpdateContainerRequest request, Resource maximumAllocation) {
     if (request.getCapability().getMemorySize() < 0
         || request.getCapability().getMemorySize() > maximumAllocation
         .getMemorySize()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index 020764b..7ad381e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -91,10 +91,6 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
   String getNodeHttpAddress();
   
   String getNodeLabelExpression();
-  
-  boolean hasIncreaseReservation();
-  
-  void cancelIncreaseReservation();
 
   String getQueueName();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java
deleted file mode 100644
index 920cfdb..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerChangeResourceEvent.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-public class RMContainerChangeResourceEvent extends RMContainerEvent {
-  
-  final Resource targetResource;
-  final boolean increase;
-
-  public RMContainerChangeResourceEvent(ContainerId containerId,
-      Resource targetResource, boolean increase) {
-    super(containerId, RMContainerEventType.CHANGE_RESOURCE);
-
-    this.targetResource = targetResource;
-    this.increase = increase;
-  }
-  
-  public Resource getTargetResource() {
-    return targetResource;
-  }
-  
-  public boolean isIncrease() {
-    return increase;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 72ce1a0..12fbbea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -131,8 +131,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
     .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
         RMContainerEventType.RESERVED, new ContainerReservedTransition())
     .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
-        RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition())
-    .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
         RMContainerEventType.ACQUIRE_UPDATED_CONTAINER, 
         new ContainerAcquiredWhileRunningTransition())
     .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING,
@@ -183,7 +181,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
   private boolean isAMContainer;
   private List<ResourceRequest> resourceRequests;
 
-  private volatile boolean hasIncreaseReservation = false;
   // Only used for container resource increase and decrease. This is the
   // resource to rollback to should container resource increase token expires.
   private Resource lastConfirmedResource;
@@ -561,12 +558,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
       if (c != null) {
         c.setNodeId(container.reservedNode);
       }
-
-      if (!EnumSet.of(RMContainerState.NEW, RMContainerState.RESERVED)
-          .contains(container.getState())) {
-        // When container's state != NEW/RESERVED, it is an increase reservation
-        container.hasIncreaseReservation = true;
-      }
     }
   }
 
@@ -681,33 +672,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
       }
     }
   }
-  
-  private static final class ChangeResourceTransition extends BaseTransition {
-
-    @Override
-    public void transition(RMContainerImpl container, RMContainerEvent event) {
-      RMContainerChangeResourceEvent changeEvent = (RMContainerChangeResourceEvent)event;
-
-      Resource targetResource = changeEvent.getTargetResource();
-      Resource lastConfirmedResource = container.lastConfirmedResource;
-
-      if (!changeEvent.isIncrease()) {
-        // Only unregister from the containerAllocationExpirer when target
-        // resource is less than or equal to the last confirmed resource.
-        if (Resources.fitsIn(targetResource, lastConfirmedResource)) {
-          container.lastConfirmedResource = targetResource;
-          container.containerAllocationExpirer.unregister(
-              new AllocationExpirationInfo(event.getContainerId()));
-        }
-      }
-
-      container.container.setResource(targetResource);
-
-      // We reach here means we either allocated increase reservation OR
-      // decreased container, reservation will be cancelled anyway. 
-      container.hasIncreaseReservation = false;
-    }
-  }
 
   private static class FinishedTransition extends BaseTransition {
 
@@ -857,16 +821,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
     return -1;
   }
 
-  @Override
-  public boolean hasIncreaseReservation() {
-    return hasIncreaseReservation;
-  }
-
-  @Override
-  public void cancelIncreaseReservation() {
-    hasIncreaseReservation = false;
-  }
-
   public void setQueueName(String queueName) {
     this.queueName = queueName;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index ce6d2a2..213839d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
@@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
@@ -85,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Activi
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
 
 
+
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -597,6 +600,8 @@ public abstract class AbstractYarnScheduler
 
     if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
       completedContainerInternal(rmContainer, containerStatus, event);
+      completeOustandingUpdatesWhichAreReserved(
+          rmContainer, containerStatus, event);
     } else {
       ContainerId containerId = rmContainer.getContainerId();
       // Inform the container
@@ -622,6 +627,33 @@ public abstract class AbstractYarnScheduler
     recoverResourceRequestForContainer(rmContainer);
   }
 
+  // Optimization:
+  // Check if there are in-flight container updates and complete the
+  // associated temp containers. These are removed when the app completes,
+  // but removing them when the actual container completes would allow the
+  // scheduler to reallocate those resources sooner.
+  private void completeOustandingUpdatesWhichAreReserved(
+      RMContainer rmContainer, ContainerStatus containerStatus,
+      RMContainerEventType event) {
+    N schedulerNode = getSchedulerNode(rmContainer.getNodeId());
+    if (schedulerNode != null &&
+        schedulerNode.getReservedContainer() != null) {
+      RMContainer resContainer = schedulerNode.getReservedContainer();
+      if (resContainer.getReservedSchedulerKey() != null) {
+        ContainerId containerToUpdate = resContainer
+            .getReservedSchedulerKey().getContainerToUpdate();
+        if (containerToUpdate != null &&
+            containerToUpdate.equals(containerStatus.getContainerId())) {
+          completedContainerInternal(resContainer,
+              ContainerStatus.newInstance(resContainer.getContainerId(),
+                  containerStatus.getState(), containerStatus
+                      .getDiagnostics(),
+                  containerStatus.getExitStatus()), event);
+        }
+      }
+    }
+  }
+
   // clean up a completed container
   protected abstract void completedContainerInternal(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event);
@@ -650,28 +682,6 @@ public abstract class AbstractYarnScheduler
     }
   }
 
-  protected void decreaseContainers(
-      List<UpdateContainerRequest> decreaseRequests,
-      SchedulerApplicationAttempt attempt) {
-    if (null == decreaseRequests || decreaseRequests.isEmpty()) {
-      return;
-    }
-    // Pre-process decrease requests
-    List<SchedContainerChangeRequest> schedDecreaseRequests =
-        createSchedContainerChangeRequests(decreaseRequests, false);
-    for (SchedContainerChangeRequest request : schedDecreaseRequests) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Processing decrease request:" + request);
-      }
-      // handle decrease request
-      decreaseContainer(request, attempt);
-    }
-  }
-
-  protected abstract void decreaseContainer(
-      SchedContainerChangeRequest decreaseRequest,
-      SchedulerApplicationAttempt attempt);
-
   @Override
   public N getSchedulerNode(NodeId nodeId) {
     return nodeTracker.getNode(nodeId);
@@ -1074,21 +1084,39 @@ public abstract class AbstractYarnScheduler
     }
   }
 
-  protected void handleExecutionTypeUpdates(
-      SchedulerApplicationAttempt appAttempt,
-      List<UpdateContainerRequest> promotionRequests,
-      List<UpdateContainerRequest> demotionRequests) {
+  protected void handleContainerUpdates(
+      SchedulerApplicationAttempt appAttempt, ContainerUpdates updates) {
+    List<UpdateContainerRequest> promotionRequests =
+        updates.getPromotionRequests();
     if (promotionRequests != null && !promotionRequests.isEmpty()) {
       LOG.info("Promotion Update requests : " + promotionRequests);
-      handlePromotionRequests(appAttempt, promotionRequests);
+      // Promotion is technically an increase request from
+      // 0 resources to target resources.
+      handleIncreaseRequests(appAttempt, promotionRequests);
     }
+    List<UpdateContainerRequest> increaseRequests =
+        updates.getIncreaseRequests();
+    if (increaseRequests != null && !increaseRequests.isEmpty()) {
+      LOG.info("Resource increase requests : " + increaseRequests);
+      handleIncreaseRequests(appAttempt, increaseRequests);
+    }
+    List<UpdateContainerRequest> demotionRequests =
+        updates.getDemotionRequests();
     if (demotionRequests != null && !demotionRequests.isEmpty()) {
       LOG.info("Demotion Update requests : " + demotionRequests);
-      handleDemotionRequests(appAttempt, demotionRequests);
+      // Demotion is technically a decrease request from initial
+      // to 0 resources
+      handleDecreaseRequests(appAttempt, demotionRequests);
+    }
+    List<UpdateContainerRequest> decreaseRequests =
+        updates.getDecreaseRequests();
+    if (decreaseRequests != null && !decreaseRequests.isEmpty()) {
+      LOG.info("Resource decrease requests : " + decreaseRequests);
+      handleDecreaseRequests(appAttempt, decreaseRequests);
     }
   }
 
-  private void handlePromotionRequests(
+  private void handleIncreaseRequests(
       SchedulerApplicationAttempt applicationAttempt,
       List<UpdateContainerRequest> updateContainerRequests) {
     for (UpdateContainerRequest uReq : updateContainerRequests) {
@@ -1118,7 +1146,7 @@ public abstract class AbstractYarnScheduler
     }
   }
 
-  private void handleDemotionRequests(SchedulerApplicationAttempt appAttempt,
+  private void handleDecreaseRequests(SchedulerApplicationAttempt appAttempt,
       List<UpdateContainerRequest> demotionRequests) {
     OpportunisticContainerContext oppCntxt =
         appAttempt.getOpportunisticContainerContext();
@@ -1126,24 +1154,59 @@ public abstract class AbstractYarnScheduler
       RMContainer rmContainer =
           rmContext.getScheduler().getRMContainer(uReq.getContainerId());
       if (rmContainer != null) {
-        if (appAttempt.getUpdateContext().checkAndAddToOutstandingDecreases(
-            rmContainer.getContainer())) {
-          RMContainer demotedRMContainer =
-              createDemotedRMContainer(appAttempt, oppCntxt, rmContainer);
-          appAttempt.addToNewlyDemotedContainers(
-              uReq.getContainerId(), demotedRMContainer);
+        SchedulerNode schedulerNode = rmContext.getScheduler()
+            .getSchedulerNode(rmContainer.getContainer().getNodeId());
+        if (appAttempt.getUpdateContext()
+            .checkAndAddToOutstandingDecreases(uReq, schedulerNode,
+                rmContainer.getContainer())) {
+          if (ContainerUpdateType.DEMOTE_EXECUTION_TYPE ==
+              uReq.getContainerUpdateType()) {
+            RMContainer demotedRMContainer =
+                createDemotedRMContainer(appAttempt, oppCntxt, rmContainer);
+            appAttempt.addToNewlyDemotedContainers(
+                uReq.getContainerId(), demotedRMContainer);
+          } else {
+            RMContainer demotedRMContainer = createDecreasedRMContainer(
+                appAttempt, uReq, rmContainer);
+            appAttempt.addToNewlyDecreasedContainers(
+                uReq.getContainerId(), demotedRMContainer);
+          }
         } else {
           appAttempt.addToUpdateContainerErrors(
               UpdateContainerError.newInstance(
               RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq));
         }
       } else {
-        LOG.warn("Cannot demote non-existent (or completed) Container ["
-            + uReq.getContainerId() + "]");
+        LOG.warn("Cannot demote/decrease non-existent (or completed) " +
+            "Container [" + uReq.getContainerId() + "]");
       }
     }
   }
 
+  private RMContainer createDecreasedRMContainer(
+      SchedulerApplicationAttempt appAttempt, UpdateContainerRequest uReq,
+      RMContainer rmContainer) {
+    SchedulerRequestKey sk =
+        SchedulerRequestKey.extractFrom(rmContainer.getContainer());
+    Container decreasedContainer = BuilderUtils.newContainer(
+        ContainerId.newContainerId(appAttempt.getApplicationAttemptId(),
+            appAttempt.getNewContainerId()),
+        rmContainer.getContainer().getNodeId(),
+        rmContainer.getContainer().getNodeHttpAddress(),
+        Resources.none(),
+        sk.getPriority(), null, rmContainer.getExecutionType(),
+        sk.getAllocationRequestId());
+    decreasedContainer.setVersion(rmContainer.getContainer().getVersion());
+    RMContainer newRmContainer = new RMContainerImpl(decreasedContainer,
+        sk, appAttempt.getApplicationAttemptId(),
+        decreasedContainer.getNodeId(), appAttempt.getUser(), rmContext,
+        rmContainer.isRemotelyAllocated());
+    appAttempt.addRMContainer(decreasedContainer.getId(), rmContainer);
+    ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(
+        decreasedContainer.getNodeId()).allocateContainer(newRmContainer);
+    return newRmContainer;
+  }
+
   private RMContainer createDemotedRMContainer(
       SchedulerApplicationAttempt appAttempt,
       OpportunisticContainerContext oppCntxt,
@@ -1162,4 +1225,36 @@ public abstract class AbstractYarnScheduler
     return SchedulerUtils.createOpportunisticRmContainer(
         rmContext, demotedContainer, false);
   }
+
+  /**
+   * Rollback container update after expiry.
+   * @param containerId ContainerId.
+   */
+  protected void rollbackContainerUpdate(
+      ContainerId containerId) {
+    RMContainer rmContainer = getRMContainer(containerId);
+    if (rmContainer == null) {
+      LOG.info("Cannot rollback resource for container " + containerId
+          + ". The container does not exist.");
+      return;
+    }
+    T app = getCurrentAttemptForContainer(containerId);
+    if (getCurrentAttemptForContainer(containerId) == null) {
+      LOG.info("Cannot rollback resource for container " + containerId
+          + ". The application that the container "
+          + "belongs to does not exist.");
+      return;
+    }
+
+    if (Resources.fitsIn(rmContainer.getLastConfirmedResource(),
+        rmContainer.getContainer().getResource())) {
+      LOG.info("Roll back resource for container " + containerId);
+      handleDecreaseRequests(app, Arrays.asList(
+          UpdateContainerRequest.newInstance(
+              rmContainer.getContainer().getVersion(),
+              rmContainer.getContainerId(),
+              ContainerUpdateType.DECREASE_RESOURCE,
+              rmContainer.getLastConfirmedResource(), null)));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 48ecd2e..bff9c41 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -90,9 +90,7 @@ public class AppSchedulingInfo {
       schedulerKeys = new ConcurrentSkipListMap<>();
   final Map<SchedulerRequestKey, SchedulingPlacementSet<SchedulerNode>>
       schedulerKeyToPlacementSets = new ConcurrentHashMap<>();
-  final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId,
-      SchedContainerChangeRequest>>> containerIncreaseRequestMap =
-      new ConcurrentHashMap<>();
+
   private final ReentrantReadWriteLock.ReadLock readLock;
   private final ReentrantReadWriteLock.WriteLock writeLock;
 
@@ -158,137 +156,6 @@ public class AppSchedulingInfo {
     LOG.info("Application " + applicationId + " requests cleared");
   }
 
-  public boolean hasIncreaseRequest(NodeId nodeId) {
-    try {
-      this.readLock.lock();
-      Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
-          requestsOnNode = containerIncreaseRequestMap.get(nodeId);
-      return requestsOnNode == null ? false : requestsOnNode.size() > 0;
-    } finally {
-      this.readLock.unlock();
-    }
-  }
-
-  public Map<ContainerId, SchedContainerChangeRequest>
-      getIncreaseRequests(NodeId nodeId, SchedulerRequestKey schedulerKey) {
-    try {
-      this.readLock.lock();
-      Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
-          requestsOnNode = containerIncreaseRequestMap.get(nodeId);
-      return requestsOnNode == null ? null : requestsOnNode.get(
-          schedulerKey);
-    } finally {
-      this.readLock.unlock();
-    }
-  }
-
-  /**
-   * return true if any of the existing increase requests are updated,
-   *        false if none of them are updated
-   */
-  public boolean updateIncreaseRequests(
-      List<SchedContainerChangeRequest> increaseRequests) {
-    boolean resourceUpdated = false;
-
-    try {
-      this.writeLock.lock();
-      for (SchedContainerChangeRequest r : increaseRequests) {
-        if (r.getRMContainer().getState() != RMContainerState.RUNNING) {
-          LOG.warn("rmContainer's state is not RUNNING, for increase request"
-              + " with container-id=" + r.getContainerId());
-          continue;
-        }
-        try {
-          RMServerUtils.checkSchedContainerChangeRequest(r, true);
-        } catch (YarnException e) {
-          LOG.warn("Error happens when checking increase request, Ignoring.."
-              + " exception=", e);
-          continue;
-        }
-        NodeId nodeId = r.getRMContainer().getAllocatedNode();
-
-        Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
-            requestsOnNode = containerIncreaseRequestMap.get(nodeId);
-        if (null == requestsOnNode) {
-          requestsOnNode = new TreeMap<>();
-          containerIncreaseRequestMap.put(nodeId, requestsOnNode);
-        }
-
-        SchedContainerChangeRequest prevChangeRequest =
-            getIncreaseRequest(nodeId,
-                r.getRMContainer().getAllocatedSchedulerKey(),
-                r.getContainerId());
-        if (null != prevChangeRequest) {
-          if (Resources.equals(prevChangeRequest.getTargetCapacity(),
-              r.getTargetCapacity())) {
-            // increase request hasn't changed
-            continue;
-          }
-
-          // remove the old one, as we will use the new one going forward
-          removeIncreaseRequest(nodeId,
-              prevChangeRequest.getRMContainer().getAllocatedSchedulerKey(),
-              prevChangeRequest.getContainerId());
-        }
-
-        if (Resources.equals(r.getTargetCapacity(),
-            r.getRMContainer().getAllocatedResource())) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Trying to increase container " + r.getContainerId()
-                + ", target capacity = previous capacity = " + prevChangeRequest
-                + ". Will ignore this increase request.");
-          }
-          continue;
-        }
-
-        // add the new one
-        resourceUpdated = true;
-        insertIncreaseRequest(r);
-      }
-      return resourceUpdated;
-    } finally {
-      this.writeLock.unlock();
-    }
-  }
-
-  /**
-   * Insert increase request, adding any missing items in the data-structure
-   * hierarchy.
-   */
-  private void insertIncreaseRequest(SchedContainerChangeRequest request) {
-    NodeId nodeId = request.getNodeId();
-    SchedulerRequestKey schedulerKey =
-        request.getRMContainer().getAllocatedSchedulerKey();
-    ContainerId containerId = request.getContainerId();
-    
-    Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
-        requestsOnNode = containerIncreaseRequestMap.get(nodeId);
-    if (null == requestsOnNode) {
-      requestsOnNode = new HashMap<>();
-      containerIncreaseRequestMap.put(nodeId, requestsOnNode);
-    }
-
-    Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
-        requestsOnNode.get(schedulerKey);
-    if (null == requestsOnNodeWithPriority) {
-      requestsOnNodeWithPriority = new TreeMap<>();
-      requestsOnNode.put(schedulerKey, requestsOnNodeWithPriority);
-      incrementSchedulerKeyReference(schedulerKey);
-    }
-
-    requestsOnNodeWithPriority.put(containerId, request);
-
-    // update resources
-    String partition = request.getRMContainer().getNodeLabelExpression();
-    Resource delta = request.getDeltaCapacity();
-    appResourceUsage.incPending(partition, delta);
-    queue.incPendingResource(partition, delta);
-    
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Added increase request:" + request.getContainerId()
-          + " delta=" + delta);
-    }
-  }
 
   private void incrementSchedulerKeyReference(
       SchedulerRequestKey schedulerKey) {
@@ -312,73 +179,6 @@ public class AppSchedulingInfo {
     }
   }
 
-  public boolean removeIncreaseRequest(NodeId nodeId,
-      SchedulerRequestKey schedulerKey, ContainerId containerId) {
-    try {
-      this.writeLock.lock();
-      Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
-          requestsOnNode = containerIncreaseRequestMap.get(nodeId);
-      if (null == requestsOnNode) {
-        return false;
-      }
-
-      Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
-          requestsOnNode.get(schedulerKey);
-      if (null == requestsOnNodeWithPriority) {
-        return false;
-      }
-
-      SchedContainerChangeRequest request =
-          requestsOnNodeWithPriority.remove(containerId);
-    
-      // remove hierarchies if it becomes empty
-      if (requestsOnNodeWithPriority.isEmpty()) {
-        requestsOnNode.remove(schedulerKey);
-        decrementSchedulerKeyReference(schedulerKey);
-      }
-      if (requestsOnNode.isEmpty()) {
-        containerIncreaseRequestMap.remove(nodeId);
-      }
-
-      if (request == null) {
-        return false;
-      }
-
-      // update queue's pending resource if request exists
-      String partition = request.getRMContainer().getNodeLabelExpression();
-      Resource delta = request.getDeltaCapacity();
-      appResourceUsage.decPending(partition, delta);
-      queue.decPendingResource(partition, delta);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("remove increase request:" + request);
-      }
-
-      return true;
-    } finally {
-      this.writeLock.unlock();
-    }
-  }
-  
-  public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
-      SchedulerRequestKey schedulerKey, ContainerId containerId) {
-    try {
-      this.readLock.lock();
-      Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
-          requestsOnNode = containerIncreaseRequestMap.get(nodeId);
-      if (null == requestsOnNode) {
-        return null;
-      }
-
-      Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
-          requestsOnNode.get(schedulerKey);
-      return requestsOnNodeWithPriority == null ? null
-          : requestsOnNodeWithPriority.get(containerId);
-    } finally {
-      this.readLock.unlock();
-    }
-  }
-
   public ContainerUpdateContext getUpdateContext() {
     return updateContext;
   }
@@ -514,21 +314,6 @@ public class AppSchedulingInfo {
     appResourceUsage.decPending(partition, toDecrease);
   }
 
-  private boolean hasRequestLabelChanged(ResourceRequest requestOne,
-      ResourceRequest requestTwo) {
-    String requestOneLabelExp = requestOne.getNodeLabelExpression();
-    String requestTwoLabelExp = requestTwo.getNodeLabelExpression();
-    // First request label expression can be null and second request
-    // is not null then we have to consider it as changed.
-    if ((null == requestOneLabelExp) && (null != requestTwoLabelExp)) {
-      return true;
-    }
-    // If the label is not matching between both request when
-    // requestOneLabelExp is not null.
-    return ((null != requestOneLabelExp) && !(requestOneLabelExp
-        .equals(requestTwoLabelExp)));
-  }
-
   /**
    * The ApplicationMaster is updating the placesBlacklistedByApp used for
    * containers other than AMs.
@@ -601,22 +386,6 @@ public class AppSchedulingInfo {
     return ret;
   }
 
-  public SchedulingPlacementSet getFirstSchedulingPlacementSet() {
-    try {
-      readLock.lock();
-      for (SchedulerRequestKey key : schedulerKeys.keySet()) {
-        SchedulingPlacementSet ps = schedulerKeyToPlacementSets.get(key);
-        if (null != ps) {
-          return ps;
-        }
-      }
-      return null;
-    } finally {
-      readLock.unlock();
-    }
-
-  }
-
   public PendingAsk getNextPendingAsk() {
     try {
       readLock.lock();
@@ -666,56 +435,6 @@ public class AppSchedulingInfo {
     }
   }
 
-  public void increaseContainer(SchedContainerChangeRequest increaseRequest) {
-    NodeId nodeId = increaseRequest.getNodeId();
-    SchedulerRequestKey schedulerKey =
-        increaseRequest.getRMContainer().getAllocatedSchedulerKey();
-    ContainerId containerId = increaseRequest.getContainerId();
-    Resource deltaCapacity = increaseRequest.getDeltaCapacity();
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("allocated increase request : applicationId=" + applicationId
-          + " container=" + containerId + " host="
-          + increaseRequest.getNodeId() + " user=" + user + " resource="
-          + deltaCapacity);
-    }
-    try {
-      this.writeLock.lock();
-      // Set queue metrics
-      queue.getMetrics().allocateResources(user, deltaCapacity);
-      // remove the increase request from pending increase request map
-      removeIncreaseRequest(nodeId, schedulerKey, containerId);
-      // update usage
-      appResourceUsage.incUsed(increaseRequest.getNodePartition(),
-          deltaCapacity);
-    } finally {
-      this.writeLock.unlock();
-    }
-  }
-  
-  public void decreaseContainer(SchedContainerChangeRequest decreaseRequest) {
-    // Delta is negative when it's a decrease request
-    Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Decrease container : applicationId=" + applicationId
-          + " container=" + decreaseRequest.getContainerId() + " host="
-          + decreaseRequest.getNodeId() + " user=" + user + " resource="
-          + absDelta);
-    }
-
-    try {
-      this.writeLock.lock();
-      // Set queue metrics
-      queue.getMetrics().releaseResources(user, absDelta);
-
-      // update usage
-      appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta);
-    } finally {
-      this.writeLock.unlock();
-    }
-  }
-
   public List<ResourceRequest> allocate(NodeType type,
       SchedulerNode node, SchedulerRequestKey schedulerKey,
       Container containerAllocated) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
index 7381250..5ac2ac5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
@@ -28,17 +28,19 @@ import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.UpdateContainerError;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
+    .RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
+    .SchedulingPlacementSet;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -58,43 +60,37 @@ public class ContainerUpdateContext {
   private final Map<SchedulerRequestKey, Map<Resource,
       Map<NodeId, Set<ContainerId>>>> outstandingIncreases = new HashMap<>();
 
-  private final Set<ContainerId> outstandingDecreases = new HashSet<>();
+  private final Map<ContainerId, Resource> outstandingDecreases =
+      new HashMap<>();
   private final AppSchedulingInfo appSchedulingInfo;
 
   ContainerUpdateContext(AppSchedulingInfo appSchedulingInfo) {
     this.appSchedulingInfo = appSchedulingInfo;
   }
 
-  private synchronized boolean isBeingIncreased(Container container) {
-    Map<Resource, Map<NodeId, Set<ContainerId>>> resourceMap =
-        outstandingIncreases.get(
-            new SchedulerRequestKey(container.getPriority(),
-                container.getAllocationRequestId(), container.getId()));
-    if (resourceMap != null) {
-      Map<NodeId, Set<ContainerId>> locationMap =
-          resourceMap.get(container.getResource());
-      if (locationMap != null) {
-        Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
-        if (containerIds != null && !containerIds.isEmpty()) {
-          return containerIds.contains(container.getId());
-        }
-      }
-    }
-    return false;
-  }
-
   /**
    * Add the container to outstanding decreases.
+   * @param updateReq UpdateContainerRequest.
+   * @param schedulerNode SchedulerNode.
    * @param container Container.
-   * @return true if updated to outstanding decreases was successful.
+   * @return If it was possible to decrease the container.
    */
   public synchronized boolean checkAndAddToOutstandingDecreases(
+      UpdateContainerRequest updateReq, SchedulerNode schedulerNode,
       Container container) {
-    if (isBeingIncreased(container)
-        || outstandingDecreases.contains(container.getId())) {
+    if (outstandingDecreases.containsKey(container.getId())) {
       return false;
     }
-    outstandingDecreases.add(container.getId());
+    if (ContainerUpdateType.DECREASE_RESOURCE ==
+        updateReq.getContainerUpdateType()) {
+      SchedulerRequestKey updateKey = new SchedulerRequestKey
+          (container.getPriority(),
+              container.getAllocationRequestId(), container.getId());
+      cancelPreviousRequest(schedulerNode, updateKey);
+      outstandingDecreases.put(container.getId(), updateReq.getCapability());
+    } else {
+      outstandingDecreases.put(container.getId(), container.getResource());
+    }
     return true;
   }
 
@@ -117,35 +113,63 @@ public class ContainerUpdateContext {
     if (resourceMap == null) {
       resourceMap = new HashMap<>();
       outstandingIncreases.put(schedulerKey, resourceMap);
+    } else {
+      // Updating Resource for and existing increase container
+      if (ContainerUpdateType.INCREASE_RESOURCE ==
+          updateRequest.getContainerUpdateType()) {
+        cancelPreviousRequest(schedulerNode, schedulerKey);
+      } else {
+        return false;
+      }
     }
+    Resource resToIncrease = getResourceToIncrease(updateRequest, rmContainer);
     Map<NodeId, Set<ContainerId>> locationMap =
-        resourceMap.get(container.getResource());
+        resourceMap.get(resToIncrease);
     if (locationMap == null) {
       locationMap = new HashMap<>();
-      resourceMap.put(container.getResource(), locationMap);
+      resourceMap.put(resToIncrease, locationMap);
     }
     Set<ContainerId> containerIds = locationMap.get(container.getNodeId());
     if (containerIds == null) {
       containerIds = new HashSet<>();
       locationMap.put(container.getNodeId(), containerIds);
     }
-    if (containerIds.contains(container.getId())
-        || outstandingDecreases.contains(container.getId())) {
+    if (outstandingDecreases.containsKey(container.getId())) {
       return false;
     }
-    containerIds.add(container.getId());
 
-    Map<SchedulerRequestKey, Map<String, ResourceRequest>> updateResReqs =
-        new HashMap<>();
-    Resource resToIncrease = getResourceToIncrease(updateRequest, rmContainer);
-    Map<String, ResourceRequest> resMap =
-        createResourceRequests(rmContainer, schedulerNode,
-            schedulerKey, resToIncrease);
-    updateResReqs.put(schedulerKey, resMap);
-    appSchedulingInfo.addToPlacementSets(false, updateResReqs);
+    containerIds.add(container.getId());
+    if (!Resources.isNone(resToIncrease)) {
+      Map<SchedulerRequestKey, Map<String, ResourceRequest>> updateResReqs =
+          new HashMap<>();
+      Map<String, ResourceRequest> resMap =
+          createResourceRequests(rmContainer, schedulerNode,
+              schedulerKey, resToIncrease);
+      updateResReqs.put(schedulerKey, resMap);
+      appSchedulingInfo.addToPlacementSets(false, updateResReqs);
+    }
     return true;
   }
 
+  private void cancelPreviousRequest(SchedulerNode schedulerNode,
+      SchedulerRequestKey schedulerKey) {
+    SchedulingPlacementSet<SchedulerNode> schedulingPlacementSet =
+        appSchedulingInfo.getSchedulingPlacementSet(schedulerKey);
+    if (schedulingPlacementSet != null) {
+      Map<String, ResourceRequest> resourceRequests = schedulingPlacementSet
+          .getResourceRequests();
+      ResourceRequest prevReq = resourceRequests.get(ResourceRequest.ANY);
+      // Decrement the pending using a dummy RR with
+      // resource = prev update req capability
+      if (prevReq != null) {
+        appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode,
+            schedulerKey, Container.newInstance(UNDEFINED,
+                schedulerNode.getNodeID(), "host:port",
+                prevReq.getCapability(), schedulerKey.getPriority(), null));
+      }
+    }
+  }
+
   private Map<String, ResourceRequest> createResourceRequests(
       RMContainer rmContainer, SchedulerNode schedulerNode,
       SchedulerRequestKey schedulerKey, Resource resToIncrease) {
@@ -171,10 +195,16 @@ public class ContainerUpdateContext {
         ContainerUpdateType.PROMOTE_EXECUTION_TYPE) {
       return rmContainer.getContainer().getResource();
     }
-    // TODO: Fix this for container increase..
-    //       This has to equal the Resources in excess of fitsIn()
-    //       for container increase and is equal to the container total
-    //       resource for Promotion.
+    if (updateReq.getContainerUpdateType() ==
+        ContainerUpdateType.INCREASE_RESOURCE) {
+      //       This has to equal the Resources in excess of fitsIn()
+      //       for container increase and is equal to the container total
+      //       resource for Promotion.
+      Resource maxCap = Resources.componentwiseMax(updateReq.getCapability(),
+          rmContainer.getContainer().getResource());
+      return Resources.add(maxCap,
+          Resources.negate(rmContainer.getContainer().getResource()));
+    }
     return null;
   }
 
@@ -228,6 +258,7 @@ public class ContainerUpdateContext {
   /**
    * Check if a new container is to be matched up against an outstanding
    * Container increase request.
+   * @param node SchedulerNode.
    * @param schedulerKey SchedulerRequestKey.
    * @param rmContainer RMContainer.
    * @return ContainerId.
@@ -264,4 +295,80 @@ public class ContainerUpdateContext {
     }
     return retVal;
   }
+
+  /**
+   * Swaps the existing RMContainer's and the temp RMContainers internal
+   * container references after adjusting the resources in each.
+   * @param tempRMContainer Temp RMContainer.
+   * @param existingRMContainer Existing RMContainer.
+   * @param updateType Update Type.
+   * @return Existing RMContainer after swapping the container references.
+   */
+  public RMContainer swapContainer(RMContainer tempRMContainer,
+      RMContainer existingRMContainer, ContainerUpdateType updateType) {
+    ContainerId matchedContainerId = existingRMContainer.getContainerId();
+    // Swap updated container with the existing container
+    Container tempContainer = tempRMContainer.getContainer();
+
+    Resource updatedResource = createUpdatedResource(
+        tempContainer, existingRMContainer.getContainer(), updateType);
+    Resource resourceToRelease = createResourceToRelease(
+        existingRMContainer.getContainer(), updateType);
+    Container newContainer = Container.newInstance(matchedContainerId,
+        existingRMContainer.getContainer().getNodeId(),
+        existingRMContainer.getContainer().getNodeHttpAddress(),
+        updatedResource,
+        existingRMContainer.getContainer().getPriority(), null,
+        tempContainer.getExecutionType());
+    newContainer.setAllocationRequestId(
+        existingRMContainer.getContainer().getAllocationRequestId());
+    newContainer.setVersion(existingRMContainer.getContainer().getVersion());
+
+    tempRMContainer.getContainer().setResource(resourceToRelease);
+    tempRMContainer.getContainer().setExecutionType(
+        existingRMContainer.getContainer().getExecutionType());
+
+    ((RMContainerImpl)existingRMContainer).setContainer(newContainer);
+    return existingRMContainer;
+  }
+
+  /**
+   * Returns the resource that the container will finally be assigned with
+   * at the end of the update operation.
+   * @param tempContainer Temporary Container created for the operation.
+   * @param existingContainer Existing Container.
+   * @param updateType Update Type.
+   * @return Final Resource.
+   */
+  private Resource createUpdatedResource(Container tempContainer,
+      Container existingContainer, ContainerUpdateType updateType) {
+    if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
+      return Resources.add(existingContainer.getResource(),
+          tempContainer.getResource());
+    } else if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
+      return outstandingDecreases.get(existingContainer.getId());
+    } else {
+      return existingContainer.getResource();
+    }
+  }
+
+  /**
+   * Returns the resources that need to be released at the end of the update
+   * operation.
+   * @param existingContainer Existing Container.
+   * @param updateType Updated type.
+   * @return Resources to be released.
+   */
+  private Resource createResourceToRelease(Container existingContainer,
+      ContainerUpdateType updateType) {
+    if (ContainerUpdateType.INCREASE_RESOURCE == updateType) {
+      return Resources.none();
+    } else if (ContainerUpdateType.DECREASE_RESOURCE == updateType){
+      return Resources.add(existingContainer.getResource(),
+          Resources.negate(
+              outstandingDecreases.get(existingContainer.getId())));
+    } else {
+      return existingContainer.getResource();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 0e79838..f894a40 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -65,7 +66,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppR
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerChangeResourceEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
@@ -73,6 +73,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRese
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode
+    .RMNodeDecreaseContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
@@ -136,9 +139,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   private AtomicLong firstContainerAllocatedTime = new AtomicLong(0);
 
   protected List<RMContainer> newlyAllocatedContainers = new ArrayList<>();
+  protected List<RMContainer> tempContainerToKill = new ArrayList<>();
   protected Map<ContainerId, RMContainer> newlyPromotedContainers = new HashMap<>();
   protected Map<ContainerId, RMContainer> newlyDemotedContainers = new HashMap<>();
-  protected List<RMContainer> tempContainerToKill = new ArrayList<>();
   protected Map<ContainerId, RMContainer> newlyDecreasedContainers = new HashMap<>();
   protected Map<ContainerId, RMContainer> newlyIncreasedContainers = new HashMap<>();
   protected Set<NMToken> updatedNMTokens = new HashSet<>();
@@ -670,6 +673,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
           rmContainer.getContainerId(),
           ContainerUpdateType.INCREASE_RESOURCE == updateType));
+      if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
+        this.rmContext.getDispatcher().getEventHandler().handle(
+            new RMNodeDecreaseContainerEvent(rmContainer.getNodeId(),
+                Collections.singletonList(rmContainer.getContainer())));
+      }
     }
     return container;
   }
@@ -717,11 +725,16 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     }
   }
 
-  public void addToNewlyDemotedContainers(ContainerId containerId,
+  public synchronized void addToNewlyDemotedContainers(ContainerId containerId,
       RMContainer rmContainer) {
     newlyDemotedContainers.put(containerId, rmContainer);
   }
 
+  public synchronized void addToNewlyDecreasedContainers(
+      ContainerId containerId, RMContainer rmContainer) {
+    newlyDecreasedContainers.put(containerId, rmContainer);
+  }
+
   protected synchronized void addToUpdateContainerErrors(
       UpdateContainerError error) {
     updateContainerErrors.add(error);
@@ -729,10 +742,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
 
   protected synchronized void addToNewlyAllocatedContainers(
       SchedulerNode node, RMContainer rmContainer) {
-    if (oppContainerContext == null) {
-      newlyAllocatedContainers.add(rmContainer);
-      return;
-    }
     ContainerId matchedContainerId =
         getUpdateContext().matchContainerToOutstandingIncreaseReq(
             node, rmContainer.getAllocatedSchedulerKey(), rmContainer);
@@ -745,7 +754,21 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
         // occurs when using MiniYARNCluster to test).
         tempContainerToKill.add(rmContainer);
       } else {
-        newlyPromotedContainers.put(matchedContainerId, rmContainer);
+        RMContainer existingContainer = getRMContainer(matchedContainerId);
+        // If this container was already GUARANTEED, then it is an
+        // increase, else its a promotion
+        if (existingContainer == null ||
+            EnumSet.of(RMContainerState.COMPLETED, RMContainerState.KILLED,
+                RMContainerState.EXPIRED, RMContainerState.RELEASED).contains(
+                    existingContainer.getState())) {
+          tempContainerToKill.add(rmContainer);
+        } else {
+          if (ExecutionType.GUARANTEED == existingContainer.getExecutionType()) {
+            newlyIncreasedContainers.put(matchedContainerId, rmContainer);
+          } else {
+            newlyPromotedContainers.put(matchedContainerId, rmContainer);
+          }
+        }
       }
     } else {
       newlyAllocatedContainers.add(rmContainer);
@@ -753,15 +776,25 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   }
 
   public List<Container> pullNewlyPromotedContainers() {
-    return pullContainersWithUpdatedExecType(newlyPromotedContainers,
+    return pullNewlyUpdatedContainers(newlyPromotedContainers,
         ContainerUpdateType.PROMOTE_EXECUTION_TYPE);
   }
 
   public List<Container> pullNewlyDemotedContainers() {
-    return pullContainersWithUpdatedExecType(newlyDemotedContainers,
+    return pullNewlyUpdatedContainers(newlyDemotedContainers,
         ContainerUpdateType.DEMOTE_EXECUTION_TYPE);
   }
 
+  public List<Container> pullNewlyIncreasedContainers() {
+    return pullNewlyUpdatedContainers(newlyIncreasedContainers,
+        ContainerUpdateType.INCREASE_RESOURCE);
+  }
+
+  public List<Container> pullNewlyDecreasedContainers() {
+    return pullNewlyUpdatedContainers(newlyDecreasedContainers,
+        ContainerUpdateType.DECREASE_RESOURCE);
+  }
+
   public List<UpdateContainerError> pullUpdateContainerErrors() {
     List<UpdateContainerError> errors =
         new ArrayList<>(updateContainerErrors);
@@ -775,11 +808,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
    * GUARANTEED to OPPORTUNISTIC.
    * @return Newly Promoted and Demoted containers
    */
-  private List<Container> pullContainersWithUpdatedExecType(
+  private List<Container> pullNewlyUpdatedContainers(
       Map<ContainerId, RMContainer> newlyUpdatedContainers,
       ContainerUpdateType updateTpe) {
     List<Container> updatedContainers = new ArrayList<>();
-    if (oppContainerContext == null) {
+    if (oppContainerContext == null &&
+        (ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateTpe
+            || ContainerUpdateType.PROMOTE_EXECUTION_TYPE == updateTpe)) {
       return updatedContainers;
     }
     try {
@@ -789,19 +824,22 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       while (i.hasNext()) {
         Map.Entry<ContainerId, RMContainer> entry = i.next();
         ContainerId matchedContainerId = entry.getKey();
-        RMContainer rmContainer = entry.getValue();
-
-        // swap containers
-        RMContainer existingRMContainer = swapContainer(
-            rmContainer, matchedContainerId);
-        getUpdateContext().removeFromOutstandingUpdate(
-            rmContainer.getAllocatedSchedulerKey(),
-            existingRMContainer.getContainer());
-        Container updatedContainer = updateContainerAndNMToken(
-            existingRMContainer, updateTpe);
-        updatedContainers.add(updatedContainer);
-
-        tempContainerToKill.add(rmContainer);
+        RMContainer tempRMContainer = entry.getValue();
+
+        RMContainer existingRMContainer =
+            getRMContainer(matchedContainerId);
+        if (existingRMContainer != null) {
+          // swap containers
+          existingRMContainer = getUpdateContext().swapContainer(
+              tempRMContainer, existingRMContainer, updateTpe);
+          getUpdateContext().removeFromOutstandingUpdate(
+              tempRMContainer.getAllocatedSchedulerKey(),
+              existingRMContainer.getContainer());
+          Container updatedContainer = updateContainerAndNMToken(
+              existingRMContainer, updateTpe);
+          updatedContainers.add(updatedContainer);
+        }
+        tempContainerToKill.add(tempRMContainer);
         i.remove();
       }
       // Release all temporary containers
@@ -823,68 +861,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     }
   }
 
-  private RMContainer swapContainer(RMContainer rmContainer, ContainerId
-      matchedContainerId) {
-    RMContainer existingRMContainer =
-        getRMContainer(matchedContainerId);
-    if (existingRMContainer != null) {
-      // Swap updated container with the existing container
-      Container updatedContainer = rmContainer.getContainer();
-
-      Container newContainer = Container.newInstance(matchedContainerId,
-          existingRMContainer.getContainer().getNodeId(),
-          existingRMContainer.getContainer().getNodeHttpAddress(),
-          updatedContainer.getResource(),
-          existingRMContainer.getContainer().getPriority(), null,
-          updatedContainer.getExecutionType());
-      newContainer.setAllocationRequestId(
-          existingRMContainer.getContainer().getAllocationRequestId());
-      newContainer.setVersion(existingRMContainer.getContainer().getVersion());
-
-      rmContainer.getContainer().setResource(
-          existingRMContainer.getContainer().getResource());
-      rmContainer.getContainer().setExecutionType(
-          existingRMContainer.getContainer().getExecutionType());
-
-      ((RMContainerImpl)existingRMContainer).setContainer(newContainer);
-    }
-    return existingRMContainer;
-  }
-
-  private List<Container> pullNewlyUpdatedContainers(
-      Map<ContainerId, RMContainer> updatedContainerMap, boolean increase) {
-    try {
-      writeLock.lock();
-      List <Container> returnContainerList = new ArrayList <Container>(
-          updatedContainerMap.size());
-
-      Iterator<Entry<ContainerId, RMContainer>> i =
-          updatedContainerMap.entrySet().iterator();
-      while (i.hasNext()) {
-        RMContainer rmContainer = i.next().getValue();
-        Container updatedContainer = updateContainerAndNMToken(rmContainer,
-            increase ? ContainerUpdateType.INCREASE_RESOURCE :
-                ContainerUpdateType.DECREASE_RESOURCE);
-        if (updatedContainer != null) {
-          returnContainerList.add(updatedContainer);
-          i.remove();
-        }
-      }
-      return returnContainerList;
-    } finally {
-      writeLock.unlock();
-    }
-
-  }
-
-  public List<Container> pullNewlyIncreasedContainers() {
-    return pullNewlyUpdatedContainers(newlyIncreasedContainers, true);
-  }
-  
-  public List<Container> pullNewlyDecreasedContainers() {
-    return pullNewlyUpdatedContainers(newlyDecreasedContainers, false);
-  }
-  
   public List<NMToken> pullUpdatedNMTokens() {
     try {
       writeLock.lock();
@@ -1252,68 +1228,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   public ResourceUsage getSchedulingResourceUsage() {
     return attemptResourceUsage;
   }
-  
-  public boolean removeIncreaseRequest(NodeId nodeId,
-      SchedulerRequestKey schedulerKey, ContainerId containerId) {
-    try {
-      writeLock.lock();
-      return appSchedulingInfo.removeIncreaseRequest(nodeId, schedulerKey,
-          containerId);
-    } finally {
-      writeLock.unlock();
-    }
-  }
-  
-  public boolean updateIncreaseRequests(
-      List<SchedContainerChangeRequest> increaseRequests) {
-    try {
-      writeLock.lock();
-      return appSchedulingInfo.updateIncreaseRequests(increaseRequests);
-    } finally {
-      writeLock.unlock();
-    }
-  }
-  
-  private void changeContainerResource(
-      SchedContainerChangeRequest changeRequest, boolean increase) {
-    try {
-      writeLock.lock();
-      if (increase) {
-        appSchedulingInfo.increaseContainer(changeRequest);
-      } else{
-        appSchedulingInfo.decreaseContainer(changeRequest);
-      }
-
-      RMContainer changedRMContainer = changeRequest.getRMContainer();
-      changedRMContainer.handle(
-          new RMContainerChangeResourceEvent(changeRequest.getContainerId(),
-              changeRequest.getTargetCapacity(), increase));
-
-      // remove pending and not pulled by AM newly-increased or
-      // decreased-containers and add the new one
-      if (increase) {
-        newlyDecreasedContainers.remove(changeRequest.getContainerId());
-        newlyIncreasedContainers.put(changeRequest.getContainerId(),
-            changedRMContainer);
-      } else{
-        newlyIncreasedContainers.remove(changeRequest.getContainerId());
-        newlyDecreasedContainers.put(changeRequest.getContainerId(),
-            changedRMContainer);
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-  
-  public void decreaseContainer(
-      SchedContainerChangeRequest decreaseRequest) {
-    changeContainerResource(decreaseRequest, false);
-  }
-  
-  public void increaseContainer(
-      SchedContainerChangeRequest increaseRequest) {
-    changeContainerResource(increaseRequest, true);
-  }
 
   public void setAppAMNodePartitionName(String partitionName) {
     this.appAMNodePartitionName = partitionName;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 9c2dff3..db17b42 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -180,49 +180,6 @@ public abstract class SchedulerNode {
   }
 
   /**
-   * Change the resources allocated for a container.
-   * @param containerId Identifier of the container to change.
-   * @param deltaResource Change in the resource allocation.
-   * @param increase True if the change is an increase of allocation.
-   */
-  protected synchronized void changeContainerResource(ContainerId containerId,
-      Resource deltaResource, boolean increase) {
-    if (increase) {
-      deductUnallocatedResource(deltaResource);
-    } else {
-      addUnallocatedResource(deltaResource);
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug((increase ? "Increased" : "Decreased") + " container "
-              + containerId + " of capacity " + deltaResource + " on host "
-              + rmNode.getNodeAddress() + ", which has " + numContainers
-              + " containers, " + getAllocatedResource() + " used and "
-              + getUnallocatedResource() + " available after allocation");
-    }
-  }
-
-  /**
-   * Increase the resources allocated to a container.
-   * @param containerId Identifier of the container to change.
-   * @param deltaResource Increase of resource allocation.
-   */
-  public synchronized void increaseContainer(ContainerId containerId,
-      Resource deltaResource) {
-    changeContainerResource(containerId, deltaResource, true);
-  }
-
-  /**
-   * Decrease the resources allocated to a container.
-   * @param containerId Identifier of the container to change.
-   * @param deltaResource Decrease of resource allocation.
-   */
-  public synchronized void decreaseContainer(ContainerId containerId,
-      Resource deltaResource) {
-    changeContainerResource(containerId, deltaResource, false);
-  }
-
-  /**
    * Get unallocated resources on the node.
    * @return Unallocated resources on the node
    */
@@ -280,7 +237,6 @@ public abstract class SchedulerNode {
     if (info == null) {
       return;
     }
-
     if (!releasedByNode && info.launchedOnNode) {
       // wait until node reports container has completed
       return;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index e9ef319..aa60c9c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -453,14 +453,13 @@ public abstract class AbstractCSQueue implements CSQueue {
   }
   
   void allocateResource(Resource clusterResource,
-      Resource resource, String nodePartition, boolean changeContainerResource) {
+      Resource resource, String nodePartition) {
     try {
       writeLock.lock();
       queueUsage.incUsed(nodePartition, resource);
 
-      if (!changeContainerResource) {
-        ++numContainers;
-      }
+      ++numContainers;
+
       CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
           minimumAllocation, this, labelManager, nodePartition);
     } finally {
@@ -469,7 +468,7 @@ public abstract class AbstractCSQueue implements CSQueue {
   }
   
   protected void releaseResource(Resource clusterResource,
-      Resource resource, String nodePartition, boolean changeContainerResource) {
+      Resource resource, String nodePartition) {
     try {
       writeLock.lock();
       queueUsage.decUsed(nodePartition, resource);
@@ -477,9 +476,7 @@ public abstract class AbstractCSQueue implements CSQueue {
       CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
           minimumAllocation, this, labelManager, nodePartition);
 
-      if (!changeContainerResource) {
-        --numContainers;
-      }
+      --numContainers;
     } finally {
       writeLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index a65b3d2..6d30386 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -231,14 +231,6 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
       boolean sortQueues);
 
   /**
-   * We have a reserved increased container in the queue, we need to unreserve
-   * it. Since we just want to cancel the reserved increase request instead of
-   * stop the container, we shouldn't call completedContainer for such purpose.
-   */
-  public void unreserveIncreasedContainer(Resource clusterResource,
-      FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer);
-
-  /**
    * Get the number of applications in the queue.
    * @return number of applications
    */
@@ -333,13 +325,6 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
    *          new resource asked
    */
   public void decPendingResource(String nodeLabel, Resource resourceToDec);
-  
-  /**
-   * Decrease container resource in the queue
-   */
-  public void decreaseContainer(Resource clusterResource,
-      SchedContainerChangeRequest decreaseRequest,
-      FiCaSchedulerApp app) throws InvalidResourceRequestException;
 
   /**
    * Get valid Node Labels for this queue

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eac6b4c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 3517764..20ea607 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
@@ -60,9 +59,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@@ -85,7 +82,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
@@ -99,7 +95,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@@ -872,43 +868,6 @@ public class CapacityScheduler extends
     }
   }
 
-  private LeafQueue updateIncreaseRequests(
-      List<UpdateContainerRequest> increaseRequests, FiCaSchedulerApp app) {
-    // When application has some pending to-be-removed resource requests,
-    app.removedToBeRemovedIncreaseRequests();
-
-    if (null == increaseRequests || increaseRequests.isEmpty()) {
-      return null;
-    }
-
-    // Pre-process increase requests
-    List<SchedContainerChangeRequest> schedIncreaseRequests =
-        createSchedContainerChangeRequests(increaseRequests, true);
-    LeafQueue leafQueue = (LeafQueue) app.getQueue();
-
-    try {
-      /*
-       * Acquire application's lock here to make sure application won't
-       * finish when updateIncreaseRequest is called.
-       */
-      app.getWriteLock().lock();
-      // make sure we aren't stopping/removing the application
-      // when the allocate comes in
-      if (app.isStopped()) {
-        return null;
-      }
-      // Process increase resource requests
-      if (app.updateIncreaseRequests(schedIncreaseRequests)) {
-        return leafQueue;
-      }
-    } finally {
-      app.getWriteLock().unlock();
-    }
-
-
-    return null;
-  }
-
   @Override
   @Lock(Lock.NoLock.class)
   public Allocation allocate(ApplicationAttemptId applicationAttemptId,
@@ -920,21 +879,13 @@ public class CapacityScheduler extends
       return EMPTY_ALLOCATION;
     }
 
-    // Handle promotions and demotions
-    handleExecutionTypeUpdates(
-        application, updateRequests.getPromotionRequests(),
-        updateRequests.getDemotionRequests());
+    // Handle all container updates
+    handleContainerUpdates(application, updateRequests);
 
     // Release containers
     releaseContainers(release, application);
 
-    // update increase requests
-    LeafQueue updateDemandForQueue =
-        updateIncreaseRequests(updateRequests.getIncreaseRequests(),
-        application);
-
-    // Decrease containers
-    decreaseContainers(updateRequests.getDecreaseRequests(), application);
+    LeafQueue updateDemandForQueue = null;
 
     // Sanity check for new allocation requests
     normalizeRequests(ask);
@@ -959,8 +910,7 @@ public class CapacityScheduler extends
         }
 
         // Update application requests
-        if (application.updateResourceRequests(ask) && (updateDemandForQueue
-            == null)) {
+        if (application.updateResourceRequests(ask)) {
           updateDemandForQueue = (LeafQueue) application.getQueue();
         }
 
@@ -1466,7 +1416,7 @@ public class CapacityScheduler extends
           (ContainerExpiredSchedulerEvent) event;
       ContainerId containerId = containerExpiredEvent.getContainerId();
       if (containerExpiredEvent.isIncrease()) {
-        rollbackContainerResource(containerId);
+        rollbackContainerUpdate(containerId);
       } else {
         completedContainer(getRMContainer(containerId),
             SchedulerUtils.createAbnormalContainerStatus(
@@ -1618,31 +1568,6 @@ public class CapacityScheduler extends
     }
   }
 
-  private void rollbackContainerResource(
-      ContainerId containerId) {
-    RMContainer rmContainer = getRMContainer(containerId);
-    if (rmContainer == null) {
-      LOG.info("Cannot rollback resource for container " + containerId
-          + ". The container does not exist.");
-      return;
-    }
-    FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
-    if (application == null) {
-      LOG.info("Cannot rollback resource for container " + containerId
-          + ". The application that the container "
-          + "belongs to does not exist.");
-      return;
-    }
-    LOG.info("Roll back resource for container " + containerId);
-
-    SchedulerNode schedulerNode = getSchedulerNode(
-        rmContainer.getAllocatedNode());
-    SchedContainerChangeRequest decreaseRequest =
-        new SchedContainerChangeRequest(this.rmContext, schedulerNode,
-            rmContainer, rmContainer.getLastConfirmedResource());
-    decreaseContainer(decreaseRequest, application);
-  }
-
   @Override
   protected void completedContainerInternal(
       RMContainer rmContainer, ContainerStatus containerStatus,
@@ -1676,32 +1601,6 @@ public class CapacityScheduler extends
         rmContainer, containerStatus, event, null, true);
   }
 
-  @Override
-  protected void decreaseContainer(SchedContainerChangeRequest decreaseRequest,
-      SchedulerApplicationAttempt attempt) {
-    RMContainer rmContainer = decreaseRequest.getRMContainer();
-    // Check container status before doing decrease
-    if (rmContainer.getState() != RMContainerState.RUNNING) {
-      LOG.info(
-          "Trying to decrease a container not in RUNNING state, container="
-              + rmContainer + " state=" + rmContainer.getState().name());
-      return;
-    }
-    FiCaSchedulerApp app = (FiCaSchedulerApp) attempt;
-    LeafQueue queue = (LeafQueue) attempt.getQueue();
-    try {
-      queue.decreaseContainer(getClusterResource(), decreaseRequest, app);
-      // Notify RMNode that the container can be pulled by NodeManager in the
-      // next heartbeat
-      this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMNodeDecreaseContainerEvent(decreaseRequest.getNodeId(),
-              Collections.singletonList(rmContainer.getContainer())));
-    } catch (InvalidResourceRequestException e) {
-      LOG.warn("Error happens when checking decrease request, Ignoring.."
-          + " exception=", e);
-    }
-  }
-
   @Lock(Lock.NoLock.class)
   @VisibleForTesting
   @Override
@@ -2386,8 +2285,8 @@ public class CapacityScheduler extends
             getSchedulerContainer(rmContainer, true),
             getSchedulerContainersToRelease(csAssignment),
             getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
-                false), csAssignment.isIncreasedAllocation(),
-            csAssignment.getType(), csAssignment.getRequestLocalityType(),
+                false), csAssignment.getType(),
+            csAssignment.getRequestLocalityType(),
             csAssignment.getSchedulingMode() != null ?
                 csAssignment.getSchedulingMode() :
                 SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
@@ -2403,8 +2302,8 @@ public class CapacityScheduler extends
             getSchedulerContainer(rmContainer, false),
             getSchedulerContainersToRelease(csAssignment),
             getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
-                false), csAssignment.isIncreasedAllocation(),
-            csAssignment.getType(), csAssignment.getRequestLocalityType(),
+                false), csAssignment.getType(),
+            csAssignment.getRequestLocalityType(),
             csAssignment.getSchedulingMode() != null ?
                 csAssignment.getSchedulingMode() :
                 SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message