hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [40/50] hadoop git commit: YARN-4519. Potential deadlock of CapacityScheduler between decrease container and assign containers. Contributed by Meng Ding
Date Fri, 29 Jan 2016 19:07:31 GMT
YARN-4519. Potential deadlock of CapacityScheduler between decrease container and assign containers.
Contributed by Meng Ding


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7f466364
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7f466364
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7f466364

Branch: refs/heads/HDFS-1312
Commit: 7f46636495e23693d588b0915f464fa7afd9102e
Parents: 41da9a0
Author: Jian He <jianhe@apache.org>
Authored: Wed Jan 27 15:38:32 2016 -0800
Committer: Jian He <jianhe@apache.org>
Committed: Thu Jan 28 14:51:00 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../server/resourcemanager/RMServerUtils.java   |  76 ++++++------
 .../scheduler/AbstractYarnScheduler.java        |  64 ++++------
 .../scheduler/AppSchedulingInfo.java            |  30 +++--
 .../scheduler/SchedContainerChangeRequest.java  |  33 +++--
 .../scheduler/capacity/CSQueue.java             |   3 +-
 .../scheduler/capacity/CapacityScheduler.java   | 119 ++++++++++---------
 .../scheduler/capacity/LeafQueue.java           |  83 ++++++++++---
 .../scheduler/capacity/ParentQueue.java         |   4 +-
 .../capacity/TestContainerResizing.java         |  87 ++++++++++++++
 10 files changed, 322 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f466364/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8eaed42..ee57e4b 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -170,6 +170,9 @@ Release 2.9.0 - UNRELEASED
     YARN-4633. Fix random test failure in TestRMRestart#testRMRestartAfterPreemption
     (Bibin A Chundatt via rohithsharmaks)
 
+    YARN-4519. Potential deadlock of CapacityScheduler between decrease container
+    and assign containers. (Meng Ding via jianhe)
+
 Release 2.8.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f466364/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index cc30593..e19d55e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -53,9 +53,10 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .SchedContainerChangeRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -114,43 +115,25 @@ public class RMServerUtils {
           queueName, scheduler, rmContext, queueInfo);
     }
   }
-  
+
   /**
-   * Normalize container increase/decrease request, it will normalize and update
-   * ContainerResourceChangeRequest.targetResource
+   * Validate increase/decrease request. This function must be called under
+   * the queue lock to make sure that the access to container resource is
+   * atomic. Refer to LeafQueue.decreaseContainer() and
+   * CapacityScheduelr.updateIncreaseRequests()
+   *
    * 
    * <pre>
    * - Throw exception when any other error happens
    * </pre>
    */
-  public static void checkAndNormalizeContainerChangeRequest(
-      RMContext rmContext, ContainerResourceChangeRequest request,
-      boolean increase) throws InvalidResourceRequestException {
+  public static void checkSchedContainerChangeRequest(
+      SchedContainerChangeRequest request, boolean increase)
+      throws InvalidResourceRequestException {
+    RMContext rmContext = request.getRmContext();
     ContainerId containerId = request.getContainerId();
-    ResourceScheduler scheduler = rmContext.getScheduler();
-    RMContainer rmContainer = scheduler.getRMContainer(containerId);
-    ResourceCalculator rc = scheduler.getResourceCalculator();
-    
-    if (null == rmContainer) {
-      String msg =
-          "Failed to get rmContainer for "
-              + (increase ? "increase" : "decrease")
-              + " request, with container-id=" + containerId;
-      throw new InvalidResourceRequestException(msg);
-    }
-
-    if (rmContainer.getState() != RMContainerState.RUNNING) {
-      String msg =
-          "rmContainer's state is not RUNNING, for "
-              + (increase ? "increase" : "decrease")
-              + " request, with container-id=" + containerId;
-      throw new InvalidResourceRequestException(msg);
-    }
-
-    Resource targetResource = Resources.normalize(rc, request.getCapability(),
-        scheduler.getMinimumResourceCapability(),
-        scheduler.getMaximumResourceCapability(),
-        scheduler.getMinimumResourceCapability());
+    RMContainer rmContainer = request.getRMContainer();
+    Resource targetResource = request.getTargetCapacity();
 
     // Compare targetResource and original resource
     Resource originalResource = rmContainer.getAllocatedResource();
@@ -181,10 +164,10 @@ public class RMServerUtils {
         throw new InvalidResourceRequestException(msg);
       }
     }
-    
-    RMNode rmNode = rmContext.getRMNodes().get(rmContainer.getAllocatedNode());
-    
+
     // Target resource of the increase request is more than NM can offer
+    ResourceScheduler scheduler = rmContext.getScheduler();
+    RMNode rmNode = request.getSchedulerNode().getRMNode();
     if (!Resources.fitsIn(scheduler.getResourceCalculator(),
         scheduler.getClusterResource(), targetResource,
         rmNode.getTotalCapability())) {
@@ -193,9 +176,6 @@ public class RMServerUtils {
           + rmNode.getTotalCapability();
       throw new InvalidResourceRequestException(msg);
     }
-
-    // Update normalized target resource
-    request.setCapability(targetResource);
   }
 
   /*
@@ -253,7 +233,8 @@ public class RMServerUtils {
       }
     }
   }
-  
+
+  // Sanity check and normalize target resource
   private static void validateIncreaseDecreaseRequest(RMContext rmContext,
       List<ContainerResourceChangeRequest> requests, Resource maximumAllocation,
       boolean increase)
@@ -283,8 +264,23 @@ public class RMServerUtils {
             + request.getCapability().getVirtualCores() + ", maxVirtualCores="
             + maximumAllocation.getVirtualCores());
       }
-      
-      checkAndNormalizeContainerChangeRequest(rmContext, request, increase);
+      ContainerId containerId = request.getContainerId();
+      ResourceScheduler scheduler = rmContext.getScheduler();
+      RMContainer rmContainer = scheduler.getRMContainer(containerId);
+      if (null == rmContainer) {
+        String msg =
+            "Failed to get rmContainer for "
+                + (increase ? "increase" : "decrease")
+                + " request, with container-id=" + containerId;
+        throw new InvalidResourceRequestException(msg);
+      }
+      ResourceCalculator rc = scheduler.getResourceCalculator();
+      Resource targetResource = Resources.normalize(rc, request.getCapability(),
+          scheduler.getMinimumResourceCapability(),
+          scheduler.getMaximumResourceCapability(),
+          scheduler.getMinimumResourceCapability());
+      // Update normalized target resource
+      request.setCapability(targetResource);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f466364/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 41a04f2..27d4f91 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
@@ -55,13 +54,13 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -74,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
+
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
@@ -618,28 +618,20 @@ public abstract class AbstractYarnScheduler
           SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
     }
   }
-  
+
   protected void decreaseContainers(
-      List<SchedContainerChangeRequest> decreaseRequests,
+      List<ContainerResourceChangeRequest> decreaseRequests,
       SchedulerApplicationAttempt attempt) {
-    for (SchedContainerChangeRequest request : decreaseRequests) {
+    if (null == decreaseRequests || decreaseRequests.isEmpty()) {
+      return;
+    }
+    // Pre-process decrease requests
+    List<SchedContainerChangeRequest> schedDecreaseRequests =
+        createSchedContainerChangeRequests(decreaseRequests, false);
+    for (SchedContainerChangeRequest request : schedDecreaseRequests) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Processing decrease request:" + request);
       }
-      
-      boolean hasIncreaseRequest =
-          attempt.removeIncreaseRequest(request.getNodeId(),
-              request.getPriority(), request.getContainerId());
-      
-      if (hasIncreaseRequest) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("While processing decrease request, found a increase request "
-              + "for the same container "
-              + request.getContainerId()
-              + ", removed the increase request");
-        }
-      }
-      
       // handle decrease request
       decreaseContainer(request, attempt);
     }
@@ -877,7 +869,7 @@ public abstract class AbstractYarnScheduler
   }
   
   /**
-   * Normalize container increase/decrease request, and return
+   * Sanity check increase/decrease request, and return
    * SchedulerContainerResourceChangeRequest according to given
    * ContainerResourceChangeRequest.
    * 
@@ -886,37 +878,34 @@ public abstract class AbstractYarnScheduler
    * - Throw exception when any other error happens
    * </pre>
    */
-  private SchedContainerChangeRequest
-      checkAndNormalizeContainerChangeRequest(
-          ContainerResourceChangeRequest request, boolean increase)
-          throws YarnException {
-    // We have done a check in ApplicationMasterService, but RMContainer status
-    // / Node resource could change since AMS won't acquire lock of scheduler.
-    RMServerUtils.checkAndNormalizeContainerChangeRequest(rmContext, request,
-        increase);
+  private SchedContainerChangeRequest createSchedContainerChangeRequest(
+      ContainerResourceChangeRequest request, boolean increase)
+      throws YarnException {
     ContainerId containerId = request.getContainerId();
     RMContainer rmContainer = getRMContainer(containerId);
+    if (null == rmContainer) {
+      String msg =
+          "Failed to get rmContainer for "
+              + (increase ? "increase" : "decrease")
+              + " request, with container-id=" + containerId;
+      throw new InvalidResourceRequestException(msg);
+    }
     SchedulerNode schedulerNode =
         getSchedulerNode(rmContainer.getAllocatedNode());
-    
-    return new SchedContainerChangeRequest(schedulerNode, rmContainer,
-        request.getCapability());
+    return new SchedContainerChangeRequest(
+        this.rmContext, schedulerNode, rmContainer, request.getCapability());
   }
 
   protected List<SchedContainerChangeRequest>
-      checkAndNormalizeContainerChangeRequests(
+      createSchedContainerChangeRequests(
           List<ContainerResourceChangeRequest> changeRequests,
           boolean increase) {
-    if (null == changeRequests || changeRequests.isEmpty()) {
-      return Collections.EMPTY_LIST;
-    }
-    
     List<SchedContainerChangeRequest> schedulerChangeRequests =
         new ArrayList<SchedContainerChangeRequest>();
     for (ContainerResourceChangeRequest r : changeRequests) {
       SchedContainerChangeRequest sr = null;
       try {
-        sr = checkAndNormalizeContainerChangeRequest(r, increase);
+        sr = createSchedContainerChangeRequest(r, increase);
       } catch (YarnException e) {
         LOG.warn("Error happens when checking increase request, Ignoring.."
             + " exception=", e);
@@ -924,7 +913,6 @@ public abstract class AbstractYarnScheduler
       }
       schedulerChangeRequests.add(sr);
     }
-
     return schedulerChangeRequests;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f466364/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 07f3d8b..a61001e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -148,6 +150,18 @@ public class AppSchedulingInfo {
     boolean resourceUpdated = false;
 
     for (SchedContainerChangeRequest r : increaseRequests) {
+      if (r.getRMContainer().getState() != RMContainerState.RUNNING) {
+        LOG.warn("rmContainer's state is not RUNNING, for increase request with"
+            + " container-id=" + r.getContainerId());
+        continue;
+      }
+      try {
+        RMServerUtils.checkSchedContainerChangeRequest(r, true);
+      } catch (YarnException e) {
+        LOG.warn("Error happens when checking increase request, Ignoring.."
+            + " exception=", e);
+        continue;
+      }
       NodeId nodeId = r.getRMContainer().getAllocatedNode();
 
       Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode
=
@@ -221,7 +235,7 @@ public class AppSchedulingInfo {
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("Added increase request:" + request.getContainerId()
-          + " delta=" + request.getDeltaCapacity());
+          + " delta=" + delta);
     }
     
     // update priorities
@@ -520,24 +534,20 @@ public class AppSchedulingInfo {
     NodeId nodeId = increaseRequest.getNodeId();
     Priority priority = increaseRequest.getPriority();
     ContainerId containerId = increaseRequest.getContainerId();
-    
+    Resource deltaCapacity = increaseRequest.getDeltaCapacity();
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("allocated increase request : applicationId=" + applicationId
           + " container=" + containerId + " host="
           + increaseRequest.getNodeId() + " user=" + user + " resource="
-          + increaseRequest.getDeltaCapacity());
+          + deltaCapacity);
     }
-    
     // Set queue metrics
-    queue.getMetrics().allocateResources(user,
-        increaseRequest.getDeltaCapacity());
-    
+    queue.getMetrics().allocateResources(user, deltaCapacity);
     // remove the increase request from pending increase request map
     removeIncreaseRequest(nodeId, priority, containerId);
-    
     // update usage
-    appResourceUsage.incUsed(increaseRequest.getNodePartition(),
-        increaseRequest.getDeltaCapacity());
+    appResourceUsage.incUsed(increaseRequest.getNodePartition(), deltaCapacity);
   }
   
   public synchronized void decreaseContainer(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f466364/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java
index ea109fd..e4ab3a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -32,18 +33,19 @@ import org.apache.hadoop.yarn.util.resource.Resources;
  */
 public class SchedContainerChangeRequest implements
     Comparable<SchedContainerChangeRequest> {
-  RMContainer rmContainer;
-  Resource targetCapacity;
-  SchedulerNode schedulerNode;
-  Resource deltaCapacity;
+  private RMContext rmContext;
+  private RMContainer rmContainer;
+  private Resource targetCapacity;
+  private SchedulerNode schedulerNode;
+  private Resource deltaCapacity;
 
-  public SchedContainerChangeRequest(SchedulerNode schedulerNode,
+  public SchedContainerChangeRequest(
+      RMContext rmContext, SchedulerNode schedulerNode,
       RMContainer rmContainer, Resource targetCapacity) {
+    this.rmContext = rmContext;
     this.rmContainer = rmContainer;
     this.targetCapacity = targetCapacity;
     this.schedulerNode = schedulerNode;
-    deltaCapacity = Resources.subtract(targetCapacity,
-        rmContainer.getAllocatedResource());
   }
   
   public NodeId getNodeId() {
@@ -58,11 +60,19 @@ public class SchedContainerChangeRequest implements
     return this.targetCapacity;
   }
 
+  public RMContext getRmContext() {
+    return this.rmContext;
+  }
   /**
-   * Delta capacity = before - target, so if it is a decrease request, delta
+   * Delta capacity = target - before, so if it is a decrease request, delta
    * capacity will be negative
    */
-  public Resource getDeltaCapacity() {
+  public synchronized Resource getDeltaCapacity() {
+    // Only calculate deltaCapacity once
+    if (deltaCapacity == null) {
+      deltaCapacity = Resources.subtract(
+          targetCapacity, rmContainer.getAllocatedResource());
+    }
     return deltaCapacity;
   }
   
@@ -81,7 +91,7 @@ public class SchedContainerChangeRequest implements
   public SchedulerNode getSchedulerNode() {
     return schedulerNode;
   }
-  
+
   @Override
   public int hashCode() {
     return (getContainerId().hashCode() << 16) + targetCapacity.hashCode();
@@ -112,7 +122,6 @@ public class SchedContainerChangeRequest implements
   @Override
   public String toString() {
     return "<container=" + getContainerId() + ", targetCapacity="
-        + targetCapacity + ", delta=" + deltaCapacity + ", node="
-        + getNodeId().toString() + ">";
+        + targetCapacity + ", node=" + getNodeId().toString() + ">";
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f466364/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 6ffba02..daf7790 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.security.PrivilegedEntity;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -332,7 +333,7 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue
{
    */
   public void decreaseContainer(Resource clusterResource,
       SchedContainerChangeRequest decreaseRequest,
-      FiCaSchedulerApp app);
+      FiCaSchedulerApp app) throws InvalidResourceRequestException;
 
   /**
    * Get valid Node Labels for this queue

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f466364/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index e773384..dcb60fc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -21,8 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
@@ -895,9 +896,36 @@ public class CapacityScheduler extends
     }
   }
 
+  // It is crucial to acquire leaf queue lock first to prevent:
+  // 1. Race condition when calculating the delta resource in
+  //    SchedContainerChangeRequest
+  // 2. Deadlock with the scheduling thread.
+  private LeafQueue updateIncreaseRequests(
+      List<ContainerResourceChangeRequest> increaseRequests,
+      FiCaSchedulerApp app) {
+    if (null == increaseRequests || increaseRequests.isEmpty()) {
+      return null;
+    }
+    // Pre-process increase requests
+    List<SchedContainerChangeRequest> schedIncreaseRequests =
+        createSchedContainerChangeRequests(increaseRequests, true);
+    LeafQueue leafQueue = (LeafQueue) app.getQueue();
+    synchronized(leafQueue) {
+      // make sure we aren't stopping/removing the application
+      // when the allocate comes in
+      if (app.isStopped()) {
+        return null;
+      }
+      // Process increase resource requests
+      if (app.updateIncreaseRequests(schedIncreaseRequests)) {
+        return leafQueue;
+      }
+      return null;
+    }
+  }
+
   @Override
-  // Note: when AM asks to decrease container or release container, we will
-  // acquire scheduler lock
+  // Note: when AM asks to release container, we will acquire scheduler lock
   @Lock(Lock.NoLock.class)
   public Allocation allocate(ApplicationAttemptId applicationAttemptId,
       List<ResourceRequest> ask, List<ContainerId> release,
@@ -909,26 +937,23 @@ public class CapacityScheduler extends
     if (application == null) {
       return EMPTY_ALLOCATION;
     }
-    
-    // Sanity check
-    SchedulerUtils.normalizeRequests(
-        ask, getResourceCalculator(), getClusterResource(),
-        getMinimumResourceCapability(), getMaximumResourceCapability());
-    
-    // Pre-process increase requests
-    List<SchedContainerChangeRequest> normalizedIncreaseRequests =
-        checkAndNormalizeContainerChangeRequests(increaseRequests, true);
-    
-    // Pre-process decrease requests
-    List<SchedContainerChangeRequest> normalizedDecreaseRequests =
-        checkAndNormalizeContainerChangeRequests(decreaseRequests, false);
 
     // Release containers
     releaseContainers(release, application);
 
-    Allocation allocation;
+    // update increase requests
+    LeafQueue updateDemandForQueue =
+        updateIncreaseRequests(increaseRequests, application);
+
+    // Decrease containers
+    decreaseContainers(decreaseRequests, application);
 
-    LeafQueue updateDemandForQueue = null;
+    // Sanity check for new allocation requests
+    SchedulerUtils.normalizeRequests(
+        ask, getResourceCalculator(), getClusterResource(),
+        getMinimumResourceCapability(), getMaximumResourceCapability());
+
+    Allocation allocation;
 
     synchronized (application) {
 
@@ -947,7 +972,8 @@ public class CapacityScheduler extends
         }
 
         // Update application requests
-        if (application.updateResourceRequests(ask)) {
+        if (application.updateResourceRequests(ask)
+            && (updateDemandForQueue == null)) {
           updateDemandForQueue = (LeafQueue) application.getQueue();
         }
 
@@ -957,12 +983,6 @@ public class CapacityScheduler extends
         }
       }
       
-      // Process increase resource requests
-      if (application.updateIncreaseRequests(normalizedIncreaseRequests)
-          && (updateDemandForQueue == null)) {
-        updateDemandForQueue = (LeafQueue) application.getQueue();
-      }
-
       if (application.isWaitingForAMContainer()) {
         // Allocate is for AM and update AM blacklist for this
         application.updateAMBlacklist(
@@ -971,8 +991,6 @@ public class CapacityScheduler extends
         application.updateBlacklist(blacklistAdditions, blacklistRemovals);
       }
       
-      // Decrease containers
-      decreaseContainers(normalizedDecreaseRequests, application);
 
       allocation = application.getAllocation(getResourceCalculator(),
                    clusterResource, getMinimumResourceCapability());
@@ -1167,7 +1185,8 @@ public class CapacityScheduler extends
       .getAssignmentInformation().getReserved());
  }
 
-  private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
+  @VisibleForTesting
+  protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
     if (rmContext.isWorkPreservingRecoveryEnabled()
         && !rmContext.isSchedulerReadyForAllocatingContainers()) {
       return;
@@ -1517,48 +1536,30 @@ public class CapacityScheduler extends
     }
   }
   
-  @Lock(CapacityScheduler.class)
   @Override
-  protected synchronized void decreaseContainer(
-      SchedContainerChangeRequest decreaseRequest,
+  protected void decreaseContainer(SchedContainerChangeRequest decreaseRequest,
       SchedulerApplicationAttempt attempt) {
     RMContainer rmContainer = decreaseRequest.getRMContainer();
-
     // Check container status before doing decrease
     if (rmContainer.getState() != RMContainerState.RUNNING) {
       LOG.info("Trying to decrease a container not in RUNNING state, container="
           + rmContainer + " state=" + rmContainer.getState().name());
       return;
     }
-    
-    // Delta capacity of this decrease request is 0, this decrease request may
-    // just to cancel increase request
-    if (Resources.equals(decreaseRequest.getDeltaCapacity(), Resources.none())) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Decrease target resource equals to existing resource for container:"
-            + decreaseRequest.getContainerId()
-            + " ignore this decrease request.");
-      }
-      return;
-    }
-
-    // Save resource before decrease
-    Resource resourceBeforeDecrease =
-        Resources.clone(rmContainer.getContainer().getResource());
-
     FiCaSchedulerApp app = (FiCaSchedulerApp)attempt;
     LeafQueue queue = (LeafQueue) attempt.getQueue();
-    queue.decreaseContainer(clusterResource, decreaseRequest, app);
-    
-    // Notify RMNode the container will be decreased
-    this.rmContext.getDispatcher().getEventHandler()
-        .handle(new RMNodeDecreaseContainerEvent(decreaseRequest.getNodeId(),
-            Arrays.asList(rmContainer.getContainer())));
-    
-    LOG.info("Application attempt " + app.getApplicationAttemptId()
-        + " decreased container:" + decreaseRequest.getContainerId() + " from "
-        + resourceBeforeDecrease + " to "
-        + decreaseRequest.getTargetCapacity());
+    try {
+      queue.decreaseContainer(clusterResource, decreaseRequest, app);
+      // Notify RMNode that the container can be pulled by NodeManager in the
+      // next heartbeat
+      this.rmContext.getDispatcher().getEventHandler()
+          .handle(new RMNodeDecreaseContainerEvent(
+              decreaseRequest.getNodeId(),
+              Collections.singletonList(rmContainer.getContainer())));
+    } catch (InvalidResourceRequestException e) {
+      LOG.warn("Error happens when checking decrease request, Ignoring.."
+          + " exception=", e);
+    }
   }
 
   @Lock(Lock.NoLock.class)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f466364/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 9e64b42..56e4502 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -47,10 +47,12 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.security.AccessType;
+import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -1676,11 +1678,17 @@ public class LeafQueue extends AbstractCSQueue {
   public Priority getDefaultApplicationPriority() {
     return defaultAppPriorityPerQueue;
   }
-  
+
+  /**
+   *
+   * @param clusterResource Total cluster resource
+   * @param decreaseRequest The decrease request
+   * @param app The application of interest
+   */
   @Override
   public void decreaseContainer(Resource clusterResource,
       SchedContainerChangeRequest decreaseRequest,
-      FiCaSchedulerApp app) {
+      FiCaSchedulerApp app) throws InvalidResourceRequestException {
     // If the container being decreased is reserved, we need to unreserve it
     // first.
     RMContainer rmContainer = decreaseRequest.getRMContainer();
@@ -1688,25 +1696,62 @@ public class LeafQueue extends AbstractCSQueue {
       unreserveIncreasedContainer(clusterResource, app,
           (FiCaSchedulerNode)decreaseRequest.getSchedulerNode(), rmContainer);
     }
-    
-    // Delta capacity is negative when it's a decrease request
-    Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
-    
+    boolean resourceDecreased = false;
+    Resource resourceBeforeDecrease;
+    // Grab queue lock to avoid race condition when getting container resource
     synchronized (this) {
-      // Delta is negative when it's a decrease request
-      releaseResource(clusterResource, app, absDelta,
-          decreaseRequest.getNodePartition(), decreaseRequest.getRMContainer(),
-          true);
-      // Notify application
-      app.decreaseContainer(decreaseRequest);
-      // Notify node
-      decreaseRequest.getSchedulerNode()
-          .decreaseContainer(decreaseRequest.getContainerId(), absDelta);
-    }
-
-    // Notify parent
-    if (getParent() != null) {
+      // Make sure the decrease request is valid in terms of current resource
+      // and target resource. This must be done under the leaf queue lock.
+      // Throws exception if the check fails.
+      RMServerUtils.checkSchedContainerChangeRequest(decreaseRequest, false);
+      // Save resource before decrease for debug log
+      resourceBeforeDecrease =
+          Resources.clone(rmContainer.getAllocatedResource());
+      // Do we have increase request for the same container? If so, remove it
+      boolean hasIncreaseRequest =
+          app.removeIncreaseRequest(decreaseRequest.getNodeId(),
+              decreaseRequest.getPriority(), decreaseRequest.getContainerId());
+      if (hasIncreaseRequest) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("While processing decrease requests, found an increase"
+              + " request for the same container "
+              + decreaseRequest.getContainerId()
+              + ", removed the increase request");
+        }
+      }
+      // Delta capacity is negative when it's a decrease request
+      Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
+      if (Resources.equals(absDelta, Resources.none())) {
+        // If delta capacity of this decrease request is 0, this decrease
+        // request serves the purpose of cancelling an existing increase request
+        // if any
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Decrease target resource equals to existing resource for"
+              + " container:" + decreaseRequest.getContainerId()
+              + " ignore this decrease request.");
+        }
+      } else {
+        // Release the delta resource
+        releaseResource(clusterResource, app, absDelta,
+            decreaseRequest.getNodePartition(),
+            decreaseRequest.getRMContainer(),
+            true);
+        // Notify application
+        app.decreaseContainer(decreaseRequest);
+        // Notify node
+        decreaseRequest.getSchedulerNode()
+            .decreaseContainer(decreaseRequest.getContainerId(), absDelta);
+        resourceDecreased = true;
+      }
+    }
+
+    if (resourceDecreased) {
+      // Notify parent queue outside of leaf queue lock
       getParent().decreaseContainer(clusterResource, decreaseRequest, app);
+      LOG.info("Application attempt " + app.getApplicationAttemptId()
+          + " decreased container:" + decreaseRequest.getContainerId()
+          + " from " + resourceBeforeDecrease + " to "
+          + decreaseRequest.getTargetCapacity());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f466364/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index badab72..a7d8796 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AccessType;
@@ -656,7 +657,8 @@ public class ParentQueue extends AbstractCSQueue {
   
   @Override
   public void decreaseContainer(Resource clusterResource,
-      SchedContainerChangeRequest decreaseRequest, FiCaSchedulerApp app) {
+      SchedContainerChangeRequest decreaseRequest, FiCaSchedulerApp app)
+      throws InvalidResourceRequestException {
     // delta capacity is negative when it's a decrease request
     Resource absDeltaCapacity =
         Resources.negate(decreaseRequest.getDeltaCapacity());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f466364/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
index 672af64..c08af9d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
@@ -21,8 +21,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -47,8 +50,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
+    .FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -57,12 +64,48 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class TestContainerResizing {
+  private static final Log LOG = LogFactory.getLog(TestContainerResizing.class);
   private final int GB = 1024;
 
   private YarnConfiguration conf;
 
   RMNodeLabelsManager mgr;
 
+  class MyScheduler extends CapacityScheduler {
+    /*
+     * A Mock Scheduler to simulate the potential effect of deadlock between:
+     * 1. The AbstractYarnScheduler.decreaseContainers() call (from
+     *    ApplicationMasterService thread)
+     * 2. The CapacityScheduler.allocateContainersToNode() call (from the
+     *    scheduler thread)
+     */
+    MyScheduler() {
+      super();
+    }
+
+    @Override
+    protected void decreaseContainers(
+        List<ContainerResourceChangeRequest> decreaseRequests,
+        SchedulerApplicationAttempt attempt) {
+      try {
+        Thread.sleep(1000);
+      } catch(InterruptedException e) {
+        LOG.debug("Thread interrupted.");
+      }
+      super.decreaseContainers(decreaseRequests, attempt);
+    }
+
+    @Override
+    public synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
+      try {
+        Thread.sleep(1000);
+      } catch(InterruptedException e) {
+        LOG.debug("Thread interrupted.");
+      }
+      super.allocateContainersToNode(node);
+    }
+  }
+
   @Before
   public void setUp() throws Exception {
     conf = new YarnConfiguration();
@@ -958,6 +1001,50 @@ public class TestContainerResizing {
     rm1.close();
   }
 
+  @Test (timeout = 60000)
+  public void testDecreaseContainerWillNotDeadlockContainerAllocation()
+      throws Exception {
+    // create and start MockRM with our MyScheduler
+    MockRM rm = new MockRM() {
+      @Override
+      public ResourceScheduler createScheduler() {
+        CapacityScheduler cs = new MyScheduler();
+        cs.setConf(conf);
+        return cs;
+      }
+    };
+    rm.start();
+    // register a node
+    MockNM nm = rm.registerNode("h1:1234", 20 * GB);
+    // submit an application -> app1
+    RMApp app1 = rm.submitApp(3 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
+    // making sure resource is allocated
+    checkUsedResource(rm, "default", 3 * GB, null);
+    FiCaSchedulerApp app = getFiCaSchedulerApp(rm, app1.getApplicationId());
+    Assert.assertEquals(3 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+    // making sure container is launched
+    ContainerId containerId1 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+    sentRMContainerLaunched(rm, containerId1);
+    // submit allocation request for a new container
+    am1.allocate(Collections.singletonList(ResourceRequest.newInstance(
+        Priority.newInstance(1), "*", Resources.createResource(2 * GB), 1)),
+        null);
+    // nm reports status update and triggers container allocation
+    nm.nodeHeartbeat(true);
+    // *In the mean time*, am1 asks to decrease its AM container resource from
+    // 3GB to 1GB
+    AllocateResponse response = am1.sendContainerResizingRequest(null,
+        Collections.singletonList(ContainerResourceChangeRequest
+            .newInstance(containerId1, Resources.createResource(GB))));
+    // verify that the containe resource is decreased
+    verifyContainerDecreased(response, containerId1, GB);
+
+    rm.close();
+  }
+
   private void checkPendingResource(MockRM rm, String queueName, int memory,
       String label) {
     CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();


Mime
View raw message